Skip to content

Convert Operate to a builder trait#756

Merged
frankmcsherry merged 3 commits intoTimelyDataflow:masterfrom
frankmcsherry:operate_tidying
Mar 8, 2026
Merged

Convert Operate to a builder trait#756
frankmcsherry merged 3 commits intoTimelyDataflow:masterfrom
frankmcsherry:operate_tidying

Conversation

@frankmcsherry
Copy link
Member

@frankmcsherry frankmcsherry commented Mar 8, 2026

This tidies some parts of Operate, but then re-imagines it as a builder trait whose job at initialization is to create a Box<dyn Schedule>, as well as other metadata.

This intends to have the code match assumptions about the behavior, for example that initialization happens once, and when it happens certain qualities of the implementor are locked (its topology, scheduling needs, etc).

At the moment, this is a pretty simple change, as there was no runtime behavior that depended on the Operate<T> implementation, and in particular nothing depending on T. This does close the door a bit on the possibility of replacing the SharedProgress<T> struct with a function call, communicating via the stack rather than through these types. Personally, I rather like the types as they are a durable home for information that ensures it is not lost; like a channel, but updating in place.

@antiguru
Copy link
Member

antiguru commented Mar 8, 2026

I think we might be able to simplify the types a bit, what about this diff?

diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs
index fe530c49..82d6833e 100644
--- a/timely/src/dataflow/operators/core/input.rs
+++ b/timely/src/dataflow/operators/core/input.rs
@@ -162,14 +162,14 @@ impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
 
         let copies = self.peers();
 
-        self.add_operator_with_index(Box::new(Operator {
+        self.add_operator_with_index(Operator {
             name: "Input".to_owned(),
             address,
             shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
             progress,
             messages: produced,
             copies,
-        }), index);
+        }, index);
 
         Stream::new(Source::new(index, 0), registrar, self.clone())
     }
@@ -204,7 +204,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
     fn inputs(&self) -> usize { 0 }
     fn outputs(&self) -> usize { 1 }
 
-    fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
+    fn initialize(self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, impl Schedule) {
         self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
         (Vec::new(), Rc::clone(&self.shared_progress), self)
     }
diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs
index 0ed84cef..b957c26c 100644
--- a/timely/src/dataflow/operators/core/unordered_input.rs
+++ b/timely/src/dataflow/operators/core/unordered_input.rs
@@ -96,14 +96,14 @@ impl<G: Scope> UnorderedInput<G> for G {
 
         let helper = UnorderedHandle::new(counter, Rc::clone(&address), self.activations());
 
-        self.add_operator_with_index(Box::new(UnorderedOperator {
+        self.add_operator_with_index(UnorderedOperator {
             name: "UnorderedInput".to_owned(),
             address,
             shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
             internal,
             produced,
             peers,
-        }), index);
+        }, index);
 
         ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone()))
     }
@@ -133,7 +133,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
     fn inputs(&self) -> usize { 0 }
     fn outputs(&self) -> usize { 1 }
 
-    fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
+    fn initialize(self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, impl Schedule) {
         for (time, count) in self.internal.borrow_mut().drain() {
             self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
         }
diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs
index 2c735065..472b89da 100644
--- a/timely/src/dataflow/operators/generic/builder_raw.rs
+++ b/timely/src/dataflow/operators/generic/builder_raw.rs
@@ -161,7 +161,7 @@ impl<G: Scope> OperatorBuilder<G> {
             summary: self.summary,
         };
 
-        self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
+        self.scope.add_operator_with_indices(operator, self.index, self.global);
     }
 
     /// Information describing the operator.
@@ -205,7 +205,7 @@ where
     fn outputs(&self) -> usize { self.shape.outputs }
 
     // announce internal topology as fully connected, and hold all default capabilities.
-    fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
+    fn initialize(self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, impl Schedule) {
 
         // Request the operator to be scheduled at least once.
         self.activations.borrow_mut().activate(&self.address[..]);
diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs
index ee3cbb12..7001c2a2 100644
--- a/timely/src/dataflow/scopes/child.rs
+++ b/timely/src/dataflow/scopes/child.rs
@@ -116,7 +116,7 @@ where
         self.subgraph.borrow_mut().connect(source, target);
     }
 
-    fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize) {
+    fn add_operator_with_indices(&mut self, operator: impl Operate<Self::Timestamp>, local: usize, global: usize) {
         self.subgraph.borrow_mut().add_child(operator, local, global);
     }
 
@@ -150,7 +150,7 @@ where
         };
         let subscope = subscope.into_inner().build(self);
 
-        self.add_operator_with_indices(Box::new(subscope), index, identifier);
+        self.add_operator_with_indices(subscope, index, identifier);
 
         result
     }
diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs
index 106d5f9d..085363ba 100644
--- a/timely/src/dataflow/scopes/mod.rs
+++ b/timely/src/dataflow/scopes/mod.rs
@@ -43,7 +43,7 @@ pub trait Scope: ScopeParent {
     fn add_edge(&self, source: Source, target: Target);
 
     /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
-    fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
+    fn add_operator(&mut self, operator: impl Operate<Self::Timestamp>) -> usize {
         let index = self.allocate_operator_index();
         let global = self.new_identifier();
         self.add_operator_with_indices(operator, index, global);
@@ -61,7 +61,7 @@ pub trait Scope: ScopeParent {
     ///
     /// This is used internally when there is a gap between allocate a child identifier and adding the
     /// child, as happens in subgraph creation.
-    fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
+    fn add_operator_with_index(&mut self, operator: impl Operate<Self::Timestamp>, index: usize) {
         let global = self.new_identifier();
         self.add_operator_with_indices(operator, index, global);
     }
@@ -69,7 +69,7 @@ pub trait Scope: ScopeParent {
     /// Adds a child `Operate` to the builder's scope using supplied indices.
     ///
     /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
-    fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
+    fn add_operator_with_indices(&mut self, operator: impl Operate<Self::Timestamp>, local: usize, global: usize);
 
     /// Creates a dataflow subgraph.
     ///
diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs
index 40ee2d9d..ed96c350 100644
--- a/timely/src/progress/operate.rs
+++ b/timely/src/progress/operate.rs
@@ -53,7 +53,7 @@ pub trait Operate<T: Timestamp> {
     /// Importantly, it also indicates the initial internal capabilities for all of its outputs.
     /// This must happen at this moment, as it is the only moment where an operator is allowed to
     /// safely "create" capabilities without basing them on other, prior capabilities.
-    fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);
+    fn initialize(self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, impl Schedule);
 
     /// Indicates if the operator should be invoked on the basis of input frontier transitions.
     ///
diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs
index 7a0a662c..f24b4aa1 100644
--- a/timely/src/progress/subgraph.rs
+++ b/timely/src/progress/subgraph.rs
@@ -133,7 +133,7 @@ where
     }
 
     /// Adds a new child to the subgraph.
-    pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
+    pub fn add_child(&mut self, child: impl Operate<TInner>, index: usize, identifier: usize) {
         let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging);
         if let Some(l) = &mut self.logging {
             let mut child_path = Vec::with_capacity(self.path.len() + 1);
@@ -546,7 +546,7 @@ where
 
     // produces connectivity summaries from inputs to outputs, and reports initial internal
     // capabilities on each of the outputs (projecting capabilities from contained scopes).
-    fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {
+    fn initialize(mut self) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, impl Schedule) {
 
         // double-check that child 0 (the outside world) is correctly shaped.
         assert_eq!(self.children[0].outputs, self.inputs());
@@ -633,7 +633,7 @@ impl<T: Timestamp> PerOperatorState<T> {
     }
 
     pub fn new(
-        scope: Box<dyn Operate<T>>,
+        scope: impl Operate<T>,
         index: usize,
         identifier: usize,
         logging: Option<Logger>,
@@ -668,7 +668,7 @@ impl<T: Timestamp> PerOperatorState<T> {
 
         PerOperatorState {
             name:               operator.name().to_owned(),
-            operator:           Some(operator),
+            operator:           Some(Box::new(operator)),
             index,
             id:                 identifier,
             local,
diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs
index fef24d1f..7fd07df7 100644
--- a/timely/src/scheduling/mod.rs
+++ b/timely/src/scheduling/mod.rs
@@ -8,7 +8,7 @@ pub mod activate;
 pub use self::activate::{Activations, Activator, ActivateOnDrop, SyncActivator};
 
 /// A type that can be scheduled.
-pub trait Schedule {
+pub trait Schedule : 'static {
     /// A descriptive name for the operator
     fn name(&self) -> &str;
     /// An address identifying the operator.
diff --git a/timely/src/worker.rs b/timely/src/worker.rs
index da92aa03..77b75aba 100644
--- a/timely/src/worker.rs
+++ b/timely/src/worker.rs
@@ -687,7 +687,7 @@ impl<A: Allocate> Worker<A> {
         let wrapper = Wrapper {
             logging,
             identifier,
-            operate: Some(operator),
+            operate: Some(Box::new(operator)),
             resources: Some(Box::new(resources)),
             channel_ids,
         };

Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just some bikeshedding around boxes vs. impl traits.

@frankmcsherry
Copy link
Member Author

Discussed async, with the thinking being that the boxing could be impl Trait instead, which seems to check out. At the same time, it's off the critical path and (likely) adds compilation time to use the generics to save on vcalls during initialization.

@frankmcsherry frankmcsherry merged commit 39ba5a7 into TimelyDataflow:master Mar 8, 2026
8 checks passed
@frankmcsherry frankmcsherry deleted the operate_tidying branch March 8, 2026 19:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants