Add bug audit and API consistency analysis#757
Add bug audit and API consistency analysis#757antiguru wants to merge 2 commits intoTimelyDataflow:masterfrom
Conversation
Systematic review of all source files across all crates (bytes, container, logging, communication, timely) identifying 5 bugs and 11 API inconsistencies. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Four new systematic audits of the timely-dataflow codebase: - Panic audit: 12 thematic findings including network I/O panics, Drop panics causing abort, and user-facing APIs without safe alternatives - Silent error swallowing: 9 findings including an infinite spin-loop bug in header parsing (allocator.rs, allocator_process.rs) - Allocation audit: 16 findings including broadcast O(peers*records), per-message event push without batching, and unbounded buffer growth - Hot loop optimization: 21 findings including per-message Counter+Tee overhead, exchange random cache access, and ChangeBatch re-sorting clean prefix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
frankmcsherry
left a comment
There was a problem hiding this comment.
I think these are great. Most are on target, some are perhaps not (the panics and fate-sharing are often intentional, in that the goal is not to be a bullet-proof system as much as a predictable tool that reliably self-terminates in the face of problems). I left some notes around places that I either disagreed or was uncertain.
|
|
||
| **Severity: medium** | ||
|
|
||
| * `timely/src/progress/reachability.rs:645-702` — The worklist uses `BinaryHeap` with O(log n) per push/pop. Many entries are duplicates that cancel immediately. A sort-based approach (accumulate in Vec, sort once, process in order) would have better cache locality and constants. |
There was a problem hiding this comment.
The challenge here is that we add more updates as we go, so we cannot sort only once. We could use an LSM architecture though, which would consolidate in place and allow log n access like the heap.
|
|
||
| ### 7. `PortConnectivity` uses `BTreeMap` for typically 1-2 ports | ||
|
|
||
| **Severity: medium** |
There was a problem hiding this comment.
This is about half of the memory use in progress tracking, so happy to bump the severity here.
| Per message this costs: 2 `RefCell::borrow_mut()` checks, 1-2 vtable calls, 1 timestamp clone, 1 `ChangeBatch::update`. | ||
|
|
||
| * `timely/src/dataflow/channels/pushers/tee.rs:86` — `RefCell::borrow_mut()` on every message through every Tee. | ||
| * `timely/src/dataflow/channels/pushers/tee.rs:87` — Virtual dispatch through `Box<dyn PushSet>` on every message. Could be an enum (`PushOne | PushMany`) to allow inlining. |
There was a problem hiding this comment.
I'm not certain this is possible, in that the trait constraint C: Clone for the Push implementation for PushMany but not for PushOne does not seem possible with an enum. The boxing is concealing that detail from others. Happy to be wrong though.
|
|
||
| **Severity: low-medium** | ||
|
|
||
| * `timely/src/dataflow/operators/generic/handles.rs:56-71` — Sorts entire staging deque by time on every call. Messages from a single source (pipeline pact) are already time-ordered; could skip the sort in that case. |
There was a problem hiding this comment.
I'm not clear why they would be time ordered. The sender could (afaict) send them in arbitrary timestamp order.
| * `timely/src/dataflow/operators/capability.rs:486` — `CapabilitySet::downgrade` panics. Use `try_downgrade`. | ||
| * `timely/src/dataflow/operators/capability.rs:287` — `InputCapability::delayed` panics on invalid time or disconnected output. No `try_*` alternative exists for this one. | ||
|
|
||
| ### 6. `try_regenerate` panics despite `try_` naming convention |
| * `communication/src/allocator/zero_copy/allocator_process.rs:70,77` — same pattern | ||
| * `communication/src/allocator/zero_copy/tcp.rs:48,151` — same pattern | ||
|
|
||
| ### 9. Mutex poisoning propagation |
There was a problem hiding this comment.
I'm not sure I understand the issue here, but the fate-sharing is intentional. If one worker goes down, we want them all to go down in order to shut down in a prompt and (largely) predictable fashion. Could be worth explaining in the docs, though!
|
|
||
| * `timely/src/scheduling/activate.rs:280` — `SyncActivator::activate()` clones `self.path` (a `Vec<usize>`) on every call. | ||
| * `timely/src/scheduling/activate.rs:87` — `activate_after` allocates `path.to_vec()` per delayed activation. | ||
| Using `Rc<[usize]>` would avoid per-call allocation. |
There was a problem hiding this comment.
I'm not sure this can work out. It is sent to SyncActivations which then sends through an MPSC. Perhaps an Arc<[usize]>.
| **Severity: high** | ||
|
|
||
| * `communication/src/allocator/counters.rs:47-49` — `Pusher::push` does `events.borrow_mut().push(self.index)` per element. Commented-out code shows a batching strategy that would reduce to O(flushes). | ||
| * `communication/src/allocator/counters.rs:99,102` — `ArcPusher::push` does `events.send(self.index)` (mpsc send, involves mutex) and `buzzer.buzz()` (thread unpark syscall) per element. Batching to flush boundaries would save ~999 mutex acquisitions and syscalls per 1000-element batch. |
There was a problem hiding this comment.
Maybe unclear from the context, but the things that are "pushed" are containers of data, intended to be of sufficient size that they amortize out these per-element (batch) costs.
Systematic review of all source files across all crates (bytes, container, logging, communication, timely).
Bugs found (5):
ChangeBatch::compact()skips single zero-valued entriesbroadcast.rsrecv logging uses wrong capacity variableOrderReversedinconsistentPartialEq/Ordsplits notification countsConfig::from_matchesdefaults toEagerinstead ofDemandallocator.rsusessize_of::<MessageHeader>()instead ofheader.header_bytes()API inconsistencies found (11):
Handle::epoch()andtime()return identical valuesAntichain::from_elemvsMutableAntichain::new_bottomnamingBranchWhenin vec module but operates on genericStream<S, C>vec::Partitionputs closure type in trait generics (unlike all other traits)MutableAntichainmissingwith_capacityAntichain::extendreturns bool, blockingstd::iter::Extendcore::Map/vec::Maphave non-overlapping extensionscore::OkErrhas no vec wrapperBytes/BytesMutasymmetrictry_merge🤖 Generated with Claude Code