From 9e94f78c58a25987b60739b4e3a4f569ab6e9d24 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 19:34:16 -0400 Subject: [PATCH 1/3] Use ChangeBatch::extend --- timely/src/progress/broadcast.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index e043e8143..28e835aa9 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -144,9 +144,7 @@ impl Progcaster { }); // We clone rather than drain to avoid deserialization. - for &(ref update, delta) in recv_changes.iter() { - changes.update(update.clone(), delta); - } + changes.extend(recv_changes.iter().map(|(u,d)| (u.clone(), *d))); } } From 5c433aa8809b8a2a5639fff2f88d27f2b6890dcc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 19:39:30 -0400 Subject: [PATCH 2/3] Rework vec::Partition trait --- timely/src/dataflow/operators/vec/partition.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/timely/src/dataflow/operators/vec/partition.rs b/timely/src/dataflow/operators/vec/partition.rs index 1a8548468..3589cd71f 100644 --- a/timely/src/dataflow/operators/vec/partition.rs +++ b/timely/src/dataflow/operators/vec/partition.rs @@ -5,7 +5,7 @@ use crate::dataflow::operators::core::Partition as PartitionCore; use crate::dataflow::{Scope, StreamVec}; /// Partition a stream of records into multiple streams. -pub trait Partition (u64, D2)> { +pub trait Partition { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -21,11 +21,11 @@ pub trait Partition (u64, D2)> { /// streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x)); /// }); /// ``` - fn partition(self, parts: u64, route: F) -> Vec>; + fn partition (u64, D2)+'static>(self, parts: u64, route: F) -> Vec>; } -impl(u64, D2)+'static> Partition for StreamVec { - fn partition(self, parts: u64, route: F) -> Vec> { +impl Partition for StreamVec { + fn partition(u64, D2)+'static>(self, parts: u64, route: F) -> Vec> { PartitionCore::partition::, _, _>(self, parts, route) } } From 79b0992cf037675a3e4b857f4f29a37302e078ed Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 19:59:11 -0400 Subject: [PATCH 3/3] Remove panic from BytesMut::try_regenerate --- bytes/src/lib.rs | 13 +++++++------ communication/src/allocator/zero_copy/bytes_slab.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index be3a98b17..2d371b4d3 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -107,7 +107,8 @@ pub mod arc { /// /// If uniquely held, this method recovers the initial pointer and length /// of the sequestered allocation and re-initializes the BytesMut. The return - /// value indicates whether this occurred. + /// value indicates whether this occurred. A `None` value indicates that the + /// downcast to `B` failed and the type is not correct. /// /// # Examples /// @@ -123,19 +124,19 @@ pub mod arc { /// drop(shared3); /// drop(shared2); /// drop(shared4); - /// assert!(shared1.try_regenerate::>()); + /// assert_eq!(shared1.try_regenerate::>(), Some(true)); /// assert!(shared1.len() == 1024); /// ``` - pub fn try_regenerate(&mut self) -> bool where B: DerefMut+'static { + pub fn try_regenerate(&mut self) -> Option where B: DerefMut+'static { // Only possible if this is the only reference to the sequestered allocation. if let Some(boxed) = Arc::get_mut(&mut self.sequestered) { - let downcast = boxed.downcast_mut::().expect("Downcast failed"); + let downcast = boxed.downcast_mut::()?; self.ptr = downcast.as_mut_ptr(); self.len = downcast.len(); - true + Some(true) } else { - false + Some(false) } } diff --git a/communication/src/allocator/zero_copy/bytes_slab.rs b/communication/src/allocator/zero_copy/bytes_slab.rs index 9b244a94f..e7eaac4b9 100644 --- a/communication/src/allocator/zero_copy/bytes_slab.rs +++ b/communication/src/allocator/zero_copy/bytes_slab.rs @@ -80,7 +80,7 @@ impl BytesSlab { if self.stash.is_empty() { for shared in self.in_progress.iter_mut() { if let Some(mut bytes) = shared.take() { - if bytes.try_regenerate::() { + if bytes.try_regenerate::() == Some(true) { // NOTE: Test should be redundant, but better safe... if bytes.len() == (1 << self.shift) { self.stash.push(bytes);