diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 42c080c7b..fe530c49a 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -204,9 +204,9 @@ impl Operate for Operator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { + fn initialize(self: Box) -> (Connectivity<::Summary>, Rc>>, Box) { self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); - (Vec::new(), Rc::clone(&self.shared_progress)) + (Vec::new(), Rc::clone(&self.shared_progress), self) } fn notify_me(&self) -> bool { false } diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 0ceeef37b..d78f467c4 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -96,6 +96,8 @@ impl Probe for Stream { let (tee, stream) = builder.new_output(); let mut output = PushCounter::new(tee); + // Conservatively introduce a minimal time to the handle. + // This will be relaxed when the operator is first scheduled and can see its frontier. handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1))); let shared_frontier = Rc::downgrade(&handle.frontier); @@ -104,15 +106,17 @@ impl Probe for Stream { builder.build( move |progress| { - // surface all frontier changes to the shared frontier. + // Mirror presented frontier changes into the shared handle. if let Some(shared_frontier) = shared_frontier.upgrade() { let mut borrow = shared_frontier.borrow_mut(); borrow.update_iter(progress.frontiers[0].drain()); } + // At initialization, we have a few tasks. if !started { - // discard initial capability. + // We must discard the capability held by `OpereratorCore`. progress.internals[0].update(G::Timestamp::minimum(), -1); + // We must retract the conservative hold in the shared handle. if let Some(shared_frontier) = shared_frontier.upgrade() { let mut borrow = shared_frontier.borrow_mut(); borrow.update_iter(std::iter::once((Timestamp::minimum(), -1))); diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index c7ab1d899..0ed84cef2 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -133,12 +133,11 @@ impl Operate for UnorderedOperator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } - fn get_internal_summary(&mut self) -> (Connectivity<::Summary>, Rc>>) { - let mut borrow = self.internal.borrow_mut(); - for (time, count) in borrow.drain() { + fn initialize(self: Box) -> (Connectivity<::Summary>, Rc>>, Box) { + for (time, count) in self.internal.borrow_mut().drain() { self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); } - (Vec::new(), Rc::clone(&self.shared_progress)) + (Vec::new(), Rc::clone(&self.shared_progress), self) } fn notify_me(&self) -> bool { false } diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index b41048533..2c735065c 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -24,7 +24,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo; pub struct OperatorShape { name: String, // A meaningful name for the operator. notify: bool, // Does the operator require progress notifications. - peers: usize, // The total number of workers in the computation. + peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude. inputs: usize, // The number of input ports. outputs: usize, // The number of output ports. } @@ -42,14 +42,10 @@ impl OperatorShape { } /// The number of inputs of this operator - pub fn inputs(&self) -> usize { - self.inputs - } + pub fn inputs(&self) -> usize { self.inputs } /// The number of outputs of this operator - pub fn outputs(&self) -> usize { - self.outputs - } + pub fn outputs(&self) -> usize { self.outputs } } /// Builds operators with generic shape. @@ -84,24 +80,16 @@ impl OperatorBuilder { } /// The operator's scope-local index. - pub fn index(&self) -> usize { - self.index - } + pub fn index(&self) -> usize { self.index } /// The operator's worker-unique identifier. - pub fn global(&self) -> usize { - self.global - } + pub fn global(&self) -> usize { self.global } /// Return a reference to the operator's shape - pub fn shape(&self) -> &OperatorShape { - &self.shape - } + pub fn shape(&self) -> &OperatorShape { &self.shape } /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { - self.shape.notify = notify; - } + pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller @@ -134,7 +122,6 @@ impl OperatorBuilder { /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. pub fn new_output(&mut self) -> (Tee, Stream) { - let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -218,7 +205,7 @@ where fn outputs(&self) -> usize { self.shape.outputs } // announce internal topology as fully connected, and hold all default capabilities. - fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { + fn initialize(self: Box) -> (Connectivity, Rc>>, Box) { // Request the operator to be scheduled at least once. self.activations.borrow_mut().activate(&self.address[..]); @@ -230,7 +217,7 @@ where .iter_mut() .for_each(|output| output.update(T::minimum(), self.shape.peers as i64)); - (self.summary.clone(), Rc::clone(&self.shared_progress)) + (self.summary.clone(), Rc::clone(&self.shared_progress), self) } fn notify_me(&self) -> bool { self.shape.notify } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index e89edc8af..522819349 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -49,9 +49,7 @@ impl OperatorBuilder { } /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { - self.builder.set_notify(notify); - } + pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore @@ -197,24 +195,16 @@ impl OperatorBuilder { } /// Get the identifier assigned to the operator being constructed - pub fn index(&self) -> usize { - self.builder.index() - } + pub fn index(&self) -> usize { self.builder.index() } /// The operator's worker-unique identifier. - pub fn global(&self) -> usize { - self.builder.global() - } + pub fn global(&self) -> usize { self.builder.global() } /// Return a reference to the operator's shape - pub fn shape(&self) -> &OperatorShape { - self.builder.shape() - } + pub fn shape(&self) -> &OperatorShape { self.builder.shape() } /// Creates operator info for the operator. - pub fn operator_info(&self) -> OperatorInfo { - self.builder.operator_info() - } + pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() } } diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 61c683377..40ee2d9d4 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -6,8 +6,14 @@ use std::cell::RefCell; use crate::scheduling::Schedule; use crate::progress::{Timestamp, ChangeBatch, Antichain}; -/// Methods for describing an operators topology, and the progress it makes. -pub trait Operate : Schedule { +/// A dataflow operator that progress with a specific timestamp type. +/// +/// This trait describes the methods necessary to present as a dataflow operator. +/// This trait is a "builder" for operators, in that it reveals the structure of the operator +/// and its requirements, but then (through `initialize`) consumes itself to produce a boxed +/// schedulable object. At the moment of initialization, the values of the other methods are +/// captured and frozen. +pub trait Operate { /// Indicates if the operator is strictly local to this worker. /// @@ -33,20 +39,27 @@ pub trait Operate : Schedule { /// The number of outputs. fn outputs(&self) -> usize; - /// Fetches summary information about internal structure of the operator. + /// Initializes the operator, converting the operator builder to a schedulable object. /// - /// Each operator must summarize its internal structure by a map from pairs `(input, output)` - /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may - /// be transformed to timestamps on any of its outputs. + /// In addition, initialization produces internal connectivity, and a shared progress conduit + /// which must contain any initial output capabilities the operator would like to hold. /// - /// Each operator must also indicate whether it initially holds any capabilities on any of its - /// outputs, so that the parent operator can properly initialize its progress information. + /// The internal connectivity summarizes the operator by a map from pairs `(input, output)` + /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may + /// be transformed to timestamps on any of its outputs. The conservative and most common result + /// is full connectivity between all inputs and outputs, each with the identity summary. /// - /// The default behavior is to indicate that timestamps on any input can emerge unchanged on - /// any output, and no initial capabilities are held. - fn get_internal_summary(&mut self) -> (Connectivity, Rc>>); + /// The shared progress object allows information to move between the host and the schedulable. + /// Importantly, it also indicates the initial internal capabilities for all of its outputs. + /// This must happen at this moment, as it is the only moment where an operator is allowed to + /// safely "create" capabilities without basing them on other, prior capabilities. + fn initialize(self: Box) -> (Connectivity, Rc>>, Box); - /// Indicates of whether the operator requires `push_external_progress` information or not. + /// Indicates if the operator should be invoked on the basis of input frontier transitions. + /// + /// This value is conservatively set to `true`, but operators that know they are oblivious to + /// frontier information can indicate this with `false`, and they will not be scheduled on the + /// basis of their input frontiers changing. fn notify_me(&self) -> bool { true } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index b5909b23a..2576d67f1 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -130,7 +130,7 @@ pub struct Builder { /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the - /// same format returned by `get_internal_summary`, as if we simply appended + /// same format returned by `initialize`, as if we simply appended /// all of the summaries for the hosted nodes. pub nodes: Vec>, /// Direct connections from sources to targets. @@ -359,7 +359,7 @@ pub struct Tracker { /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the - /// same format returned by `get_internal_summary`, as if we simply appended + /// same format returned by `initialize`, as if we simply appended /// all of the summaries for the hosted nodes. nodes: Vec>, /// Direct connections from sources to targets. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 3f82ef440..7a0a662c6 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -134,6 +134,7 @@ where /// Adds a new child to the subgraph. pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { + let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging); if let Some(l) = &mut self.logging { let mut child_path = Vec::with_capacity(self.path.len() + 1); child_path.extend_from_slice(&self.path[..]); @@ -142,10 +143,10 @@ where l.log(crate::logging::OperatesEvent { id: identifier, addr: child_path, - name: child.name().to_owned(), + name: child.name.to_owned(), }); } - self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging)); + self.children.push(child); } /// Now that initialization is complete, actually build a subgraph. @@ -545,7 +546,7 @@ where // produces connectivity summaries from inputs to outputs, and reports initial internal // capabilities on each of the outputs (projecting capabilities from contained scopes). - fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { + fn initialize(mut self: Box) -> (Connectivity, Rc>>, Box) { // double-check that child 0 (the outside world) is correctly shaped. assert_eq!(self.children[0].outputs, self.inputs()); @@ -583,7 +584,7 @@ where self.propagate_pointstamps(); // Propagate expressed capabilities to output frontiers. // Return summaries and shared progress information. - (internal_summary, Rc::clone(&self.shared_progress)) + (internal_summary, Rc::clone(&self.shared_progress), self) } } @@ -598,13 +599,13 @@ struct PerOperatorState { inputs: usize, // number of inputs to the operator outputs: usize, // number of outputs from the operator - operator: Option>>, + operator: Option>, edges: Vec>, // edges from the outputs of the operator shared_progress: Rc>>, - internal_summary: Connectivity, // cached result from get_internal_summary. + internal_summary: Connectivity, // cached result from initialize. logging: Option, } @@ -632,7 +633,7 @@ impl PerOperatorState { } pub fn new( - mut scope: Box>, + scope: Box>, index: usize, identifier: usize, logging: Option, @@ -644,7 +645,7 @@ impl PerOperatorState { let outputs = scope.outputs(); let notify = scope.notify_me(); - let (internal_summary, shared_progress) = scope.get_internal_summary(); + let (internal_summary, shared_progress, operator) = scope.initialize(); if let Some(l) = summary_logging { l.log(crate::logging::OperatesSummaryEvent { @@ -666,8 +667,8 @@ impl PerOperatorState { ); PerOperatorState { - name: scope.name().to_owned(), - operator: Some(scope), + name: operator.name().to_owned(), + operator: Some(operator), index, id: identifier, local, diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index c94d7434d..02114ce08 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -18,7 +18,7 @@ use std::sync::mpsc::{Sender, Receiver}; /// There is no known harm to "spurious wake-ups" where a not-active path is /// returned through `extensions()`. pub trait Scheduler { - /// Mark a path as immediately scheduleable. + /// Mark a path as immediately schedulable. fn activate(&mut self, path: &[usize]); /// Populates `dest` with next identifiers on active extensions of `path`. /// diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 3b1c87336..da92aa032 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -668,7 +668,7 @@ impl Worker { func(&mut resources, &mut builder) }; - let mut operator = subscope.into_inner().build(self); + let operator = subscope.into_inner().build(self); if let Some(l) = logging.as_mut() { l.log(crate::logging::OperatesEvent { @@ -679,7 +679,7 @@ impl Worker { l.flush(); } - operator.get_internal_summary(); + let (_, _, operator) = Box::new(operator).initialize(); let mut temp_channel_ids = self.temp_channel_ids.borrow_mut(); let channel_ids = temp_channel_ids.drain(..).collect::>(); @@ -687,7 +687,7 @@ impl Worker { let wrapper = Wrapper { logging, identifier, - operate: Some(Box::new(operator)), + operate: Some(operator), resources: Some(Box::new(resources)), channel_ids, };