Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions log.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Bug audit log

## `timely/src/progress/change_batch.rs`

### `compact()` does not remove single zero-valued entries

`compact()` at line 292 guards the compaction body with `self.updates.len() > 1`.
A batch with a single `(key, 0)` entry will never have that entry removed.
Subsequent `is_empty()` returns `false` for a logically empty batch because the `clean > len/2` fast path fires.

Reproduction: `ChangeBatch::<usize>::new_from(17, 0)` followed by `is_empty()` returns `false`.

Severity: low — zero-valued updates are uncommon in practice, but the invariant that "compact entries are non-zero" is violated.

Fix: change the guard to `self.updates.len() > 1` → `self.updates.len() >= 1`, or handle the single-element case separately with a retain check.

## `timely/src/progress/broadcast.rs`

### `recv` logging pre-allocates with wrong variable

In `recv()` at line 120-121, the logging closure allocates:
```rust
let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
```
`changes` is the *output accumulator*, not the received message (`recv_changes`).
Compare with `send()` at line 66-67 where `changes` correctly refers to the data being sent.

Severity: very low — wrong capacity hint in a logging-only path; no correctness impact.

## `timely/src/dataflow/operators/generic/notificator.rs`

### `OrderReversed` has inconsistent `PartialEq` and `Ord`, causing incorrect notification counts

`OrderReversed` derives `PartialEq` (compares both `element: Capability<T>` and `value: u64`) but implements `Ord` comparing only by `element.time()`.
`Capability<T>::PartialEq` additionally checks `Rc::ptr_eq` on the internal change batch.

In `next_count()` (line 342-348), the loop `while self.available.peek() == Some(&front)` uses `PartialEq` to merge same-time entries from the BinaryHeap.
This fails to merge entries that have:
* Different `value` fields (e.g., one consolidated with count 3, another directly inserted with count 1).
* Different `Rc` pointers in their `Capability` (capabilities from different cloning chains).

Reproduction scenario:
1. `make_available` consolidates two pending notifications for time T into `(T, count=2)` and pushes to `available`.
2. `notify_at_frontiered` adds `(T, count=1)` directly to `available`.
3. `next_count` pops `(T, 2)`, peeks at `(T, 1)`, comparison returns false (different `value`), returns `(cap, 2)` instead of `(cap, 3)`.

The next `next_count` call returns the leftover `(cap, 1)`, so the total is eventually correct but split across calls.

Severity: medium — notification counts are inaccurate.
Users relying on `count` in `for_each(|cap, count, _| ...)` may see split notifications for the same time.
Functional correctness of the dataflow is unaffected (all notifications are delivered), but the count semantic is broken.

Fix: change the `peek` comparison to compare only by time:
```rust
while self.available.peek().map(|x| x.element.time() == front.element.time()).unwrap_or(false) {
```

## `timely/src/worker.rs`

### `Config::from_matches` uses wrong default for `progress_mode`

`Config::from_matches` at line 115 uses `ProgressMode::Eager` as the fallback when `--progress-mode` is not specified:
```rust
let progress_mode = matches
.opt_get_default("progress-mode", ProgressMode::Eager)?;
```

However, the `Default` impl for `ProgressMode` (line 64) is `Demand`:
```rust
#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
pub enum ProgressMode {
Eager,
#[default]
Demand,
}
```

This means `Config::thread()` and `Config::process(n)` use `Demand` (via `Config::default()`), but `execute_from_args` without `--progress-mode` uses `Eager`. The documentation explicitly recommends `Demand` as the safer default.

Reproduction: calling `execute_from_args(std::env::args(), ...)` without `--progress-mode` yields `Eager`, while `execute(Config::process(n), ...)` yields `Demand`.

Severity: low-medium — the two entry points silently use different progress modes. Users of `execute_from_args` get the less robust `Eager` mode by default, which risks saturating the system with progress messages.

Fix: change line 115 to use `ProgressMode::Demand` (or `ProgressMode::default()`) as the fallback:
```rust
let progress_mode = matches
.opt_get_default("progress-mode", ProgressMode::default())?;
```

## `communication/src/allocator/zero_copy/allocator.rs`

### `receive()` uses `size_of::<MessageHeader>()` instead of `header.header_bytes()`

In `receive()` at line 270, the header is stripped from the payload using:
```rust
let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
```

`MessageHeader` has 6 `usize` fields, so `size_of::<MessageHeader>()` is platform-dependent (48 on 64-bit, 24 on 32-bit).
The wire format always uses 6 `u64` values, so the correct strip size is `header.header_bytes()` which returns `size_of::<u64>() * 6 = 48` unconditionally.

Compare with `allocator_process.rs` line 197 which correctly uses:
```rust
let _ = peel.extract_to(header.header_bytes());
```

On 64-bit platforms the values coincide (48 = 48), so the bug is latent.
On 32-bit platforms, only 24 bytes would be stripped, leaving 24 bytes of header data mixed into the payload, corrupting every deserialized message received over TCP.

Reproduction: compile and run a multi-process timely computation on a 32-bit target. All inter-process messages will deserialize incorrectly.

Severity: low — 32-bit deployments are rare, and the two values coincide on the dominant 64-bit platform. The inconsistency with `allocator_process.rs` indicates the intent was to use `header.header_bytes()`.

Fix: change line 270 to:
```rust
let _ = peel.extract_to(header.header_bytes());
```
125 changes: 125 additions & 0 deletions log_alloc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Allocation audit log

## Findings by theme

### 1. Broadcast clones every record `peers` times

**Severity: medium-high**

* `timely/src/dataflow/operators/vec/broadcast.rs:31` — The broadcast operator is implemented as `flat_map(|x| (0..peers).map(|i| (i, x.clone()))).exchange(|ix| ix.0).map(|(_i,x)| x)`.
Each record is cloned `peers` times, wrapped in tuples, exchanged, then unwrapped.
This is O(peers * records) in allocations.
The comment acknowledges: "Simplified implementation... Optimize once they have settled down."

### 2. Per-message event notifications without batching

**Severity: medium**

The inter-thread communication path does three operations per message push with no batching:
* `communication/src/allocator/counters.rs:47-49` — `events.push(self.index)` appends to a shared Vec on every push, growing O(messages) between drains.
* `communication/src/allocator/counters.rs:102` — `self.buzzer.buzz()` calls unpark/condvar on every push, even when the target thread is already awake.
* `communication/src/allocator/process.rs:189-194` — `receive()` drains all pending mpsc messages into the events Vec in one shot, no bound or backpressure.

Commented-out code in `counters.rs:34-44` shows a batching strategy was considered but not completed.

### 3. Unbounded buffer growth throughout the communication layer

**Severity: medium**

Multiple buffers grow to their high-water mark and never shrink:

* `communication/src/allocator/zero_copy/bytes_slab.rs:106` — `in_progress` Vec grows as buffers are retired, never shrinks. Slow consumers cause monotonic growth.
* `communication/src/allocator/zero_copy/bytes_exchange.rs:31` — `MergeQueue` VecDeque grows without backpressure under producer-consumer imbalance.
* `communication/src/allocator/zero_copy/allocator.rs:277-289` and `allocator_process.rs:204-216` — Per-channel `VecDeque<Bytes>` grows without limit if consumers are slow.
* `communication/src/allocator/zero_copy/tcp.rs:53-56` — `stageds` inner Vecs retain peak capacity.
* `communication/src/allocator/zero_copy/allocator.rs:128` and `allocator_process.rs:118` — `staged` Vec retains high-water-mark capacity.

### 4. Capability operations are heavier than necessary

**Severity: medium**

* `timely/src/dataflow/operators/capability.rs:154-161` — `try_downgrade` creates a new intermediate `Capability` (incrementing ChangeBatch), then drops the old one (decrementing). Two `borrow_mut` + `update` calls when one in-place update would suffice.
* `timely/src/dataflow/operators/generic/notificator.rs:323` — `make_available` clones capabilities from `pending` instead of moving them. A TODO comment acknowledges this.
* `timely/src/dataflow/operators/capability.rs:167-171` — `Capability::drop` clones the time to call `update(time.clone(), -1)` because `update` takes ownership.

### 5. Repeated string-based logger lookup on every step

**Severity: medium**

* `timely/src/worker.rs:391,401` — `self.logging()` is called multiple times per `step_or_park()`. Each call goes through `self.log_register()` → `borrow()` → `HashMap::get("timely")`, performing a string lookup on every worker step.
Should be cached in the `Worker` struct.

### 6. EventLink allocates one Rc per captured event

**Severity: medium**

* `timely/src/dataflow/operators/core/capture/event.rs:75` — Every pushed event creates a new `Rc<EventLink>`. For high-throughput capture, this is one heap allocation per event.
A pre-allocated ring buffer or arena would be more efficient.

### 7. Reclock operator has O(n^2) stash behavior

**Severity: medium**

* `timely/src/dataflow/operators/core/reclock.rs:55-79` — The stash is a `Vec` scanned linearly per notification, then `retain` shifts elements. With many distinct timestamps this becomes O(n^2).
A `BTreeMap<T, Vec<C>>` would give O(log n) lookups and efficient range removal.

### 8. Logging allocates Vecs in the hot path

**Severity: medium (when logging enabled)**

* `timely/src/progress/broadcast.rs:66-67,120-121` — Every `send()`/`recv()` allocates two `Vec`s for logging that are transferred to the logger by ownership.
* `timely/src/progress/reachability.rs:852,867` — `log_source_updates`/`log_target_updates` collect into new Vecs, cloning every timestamp.
* `timely/src/logging.rs:51` — `BatchLogger::publish_batch` allocates a 2-element Vec per progress frontier advance.

### 9. `BytesRefill` double indirection

**Severity: medium**

* `communication/src/initialize.rs:157` — Default refill closure creates `Box::new(vec![0_u8; size])`: the Vec already heap-allocates its buffer, then Box adds another heap allocation and pointer indirection.
`vec![0u8; size].into_boxed_slice()` would eliminate the Vec metadata overhead.

### 10. Unnecessary clone in TCP receive unicast path

**Severity: low-medium**

* `communication/src/allocator/zero_copy/tcp.rs:99-101` — `bytes.clone()` for every target in the range. For unicast messages (the common case where `target_upper - target_lower == 1`), the original `bytes` could be moved instead of cloned, saving one atomic refcount increment/decrement pair per message.

### 11. `SyncActivator` and delayed activations allocate path Vecs

**Severity: low-medium**

* `timely/src/scheduling/activate.rs:280` — `SyncActivator::activate()` clones `self.path` (a `Vec<usize>`) on every call.
* `timely/src/scheduling/activate.rs:87` — `activate_after` allocates `path.to_vec()` per delayed activation.
Using `Rc<[usize]>` would avoid per-call allocation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this can work out. It is sent to SyncActivations which then sends through an MPSC. Perhaps an Arc<[usize]>.


### 12. Exchange partition clones time per container extraction

**Severity: low-medium**

* `timely/src/dataflow/channels/pushers/exchange.rs:57,67` — `time.clone()` inside the per-container extraction loop. For complex product timestamps, this adds up.

### 13. Sequencer inefficiencies

**Severity: low-medium**

* `timely/src/synchronization/sequence.rs:185` — Sink re-sorts the entire `recvd` vector each invocation, including already-sorted elements. Should sort only new elements and merge.
* `timely/src/synchronization/sequence.rs:153` — Clones each element `peers - 1` times; the last iteration could move.

### 14. Thread allocator dead code

**Severity: low**

* `communication/src/allocator/thread.rs:61` — The shared tuple contains two VecDeques but the recycling code using the second one (lines 97-102) is commented out. The second VecDeque is allocated but never used.

### 15. Partition operator intermediate buffering

**Severity: low-medium**

* `timely/src/dataflow/operators/core/partition.rs:61-67` — Creates a `BTreeMap<u64, Vec<_>>` to buffer data per partition before pushing to outputs. Data could be pushed directly to per-output container builders without the intermediate collection.

### 16. Minor findings

* `container/src/lib.rs:150` — `CapacityContainerBuilder::pending` VecDeque grows but never shrinks. `relax()` is a no-op.
* `container/src/lib.rs:216-219` — `ensure_capacity` computes `reserve(preferred - capacity())` but should use `reserve(preferred - len())`.
* Various `BinaryHeap` and `Vec` instances across the codebase that drain but never shrink (standard amortized pattern, acceptable in most cases).
* `timely/src/synchronization/barrier.rs:23` — `Worker::clone()` deep-clones `Config` (which contains a `HashMap`), but Config could be `Arc`-wrapped.
119 changes: 119 additions & 0 deletions log_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# API consistency audit log

## `core::Input::Handle` — `epoch()` and `time()` return the identical value

`Handle<T, CB>` at `timely/src/dataflow/operators/core/input.rs:473-480` defines two methods:
```rust
pub fn epoch(&self) -> &T { &self.now_at }
pub fn time(&self) -> &T { &self.now_at }
```

Both return a reference to the same field.
Users have no way to know which to call, and the existence of both suggests they might differ.

Suggestion: deprecate `epoch()` in favor of `time()`, which is the name used everywhere else (e.g. `Capability::time()`, `probe::Handle::less_than(time)`).

## `Antichain::from_elem` vs `MutableAntichain::new_bottom` — inconsistent singleton constructors

`Antichain::from_elem(element)` at `frontier.rs:222` and `MutableAntichain::new_bottom(bottom)` at `frontier.rs:447` both create a singleton antichain, but use different naming conventions.

`from_elem` follows Rust standard library conventions.
`new_bottom` uses domain-specific naming.

Suggestion: add `MutableAntichain::from_elem` as an alias, or rename `new_bottom` to `from_elem` for consistency.

## `core` vs `vec` — operators with no `core` equivalent

Several operators exist only in `vec` with no `core`-level generalization:

* `vec::Delay` (`delay`, `delay_total`, `delay_batch`)
* `vec::Broadcast`
* `vec::Branch` (data-dependent branching; note: `BranchWhen` works on generic `Stream<S, C>` but lives in `vec::branch`)
* `vec::count::Accumulate` (`accumulate`, `count`)
* `vec::ResultStream` (`ok`, `err`, `map_ok`, `map_err`, `and_then`, `unwrap_or_else`)
* `vec::flow_controlled::iterator_source`

These operators are only available for `StreamVec<G, D>`, not for arbitrary container types.
Users who switch from `Vec` to a custom container must reimplement this functionality.

The most impactful gaps are `Delay` and `Branch`, which are fundamental dataflow operations.

## `vec::BranchWhen` — lives in `vec` module but operates on generic `Stream<S, C>`

`BranchWhen` at `vec/branch.rs:102` is implemented for `Stream<S, C>` (generic containers), not `StreamVec`:
```rust
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for Stream<S, C> { ... }
```

It is defined in the `vec` module but does not depend on `Vec` containers.
It should be in `core` alongside `OkErr`, `Partition`, etc.

## `vec::Partition` — closure type as trait parameter

`vec::Partition` at `vec/partition.rs:8` puts the closure type in the trait generics:
```rust
pub trait Partition<G: Scope, D: 'static, D2: 'static, F: Fn(D) -> (u64, D2)> {
fn partition(self, parts: u64, route: F) -> Vec<StreamVec<G, D2>>;
}
```

`core::Partition` at `core/partition.rs:11` keeps the closure as a method-level generic:
```rust
pub trait Partition<G: Scope, C: DrainContainer> {
fn partition<CB, D2, F>(self, parts: u64, route: F) -> Vec<Stream<G, CB::Container>>
where ...;
}
```

The `vec` style makes the trait harder to import and use, because the user must specify all type parameters.
Every other operator trait in both `core` and `vec` uses method-level generics for closures.

## `Antichain` has `with_capacity` but `MutableAntichain` does not

`Antichain::with_capacity(capacity)` at `frontier.rs:207` pre-allocates space.
`MutableAntichain` has no `with_capacity` constructor, despite wrapping internal collections that support it.

This is a minor gap, but inconsistent across the two related types.

## `Antichain::extend` shadows `std::iter::Extend`

`Antichain::extend` at `frontier.rs:118` has signature:
```rust
pub fn extend<I: IntoIterator<Item=T>>(&mut self, iterator: I) -> bool
```

This returns `bool` (whether any element was inserted), which conflicts with the `std::iter::Extend` trait (which returns `()`).
As a result, `Antichain` cannot implement `std::iter::Extend`, though it does implement `FromIterator`.

Users expecting the standard `Extend` trait to work will be surprised.

## `core::Map` has `flat_map_builder`, `vec::Map` has `map_in_place` — non-overlapping extensions

`core::Map` provides `map`, `flat_map`, and `flat_map_builder`.
`vec::Map` provides `map`, `flat_map`, and `map_in_place`.

`flat_map_builder` (a zero-cost iterator combinator pattern) has no vec counterpart.
`map_in_place` (mutation without allocation) has no core counterpart.

These are both useful optimizations that are only available in one of the two module hierarchies.

## `core::OkErr` has no `vec` counterpart

`core::OkErr` splits a stream by a closure returning `Result<D1, D2>`.
The `vec` module has no `OkErr` trait — instead it has `ResultStream` which operates on streams of `Result` values.
These serve different purposes:
* `OkErr` splits any stream into two streams (general routing)
* `ResultStream` processes streams whose data is already `Result<T, E>`

A `vec::OkErr` wrapper (delegating to `core::OkErr`) would be consistent with how `vec::Filter`, `vec::Partition`, etc. wrap their core counterparts.

## `Bytes` and `BytesMut` — asymmetric `try_merge`

`Bytes` has `try_merge(&mut self, other: Bytes) -> Result<(), Bytes>` at `bytes/src/lib.rs:238`.
`BytesMut` has no `try_merge` method.

To merge `BytesMut` slices, one must first `freeze()` them into `Bytes`, merge, then work with the result.
The module-level doc example demonstrates this pattern, but it is an API asymmetry.

The crate docs note this: "The crate is currently minimalist rather than maximalist."
Still, the asymmetry means `BytesMut` users must navigate a more complex workflow.
Loading
Loading