Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
(Vec::new(), Rc::clone(&self.shared_progress))
(Vec::new(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { false }
Expand Down
8 changes: 6 additions & 2 deletions timely/src/dataflow/operators/core/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ impl<G: Scope, C: Container> Probe<G, C> for Stream<G, C> {
let (tee, stream) = builder.new_output();
let mut output = PushCounter::new(tee);

// Conservatively introduce a minimal time to the handle.
// This will be relaxed when the operator is first scheduled and can see its frontier.
handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1)));

let shared_frontier = Rc::downgrade(&handle.frontier);
Expand All @@ -104,15 +106,17 @@ impl<G: Scope, C: Container> Probe<G, C> for Stream<G, C> {
builder.build(
move |progress| {

// surface all frontier changes to the shared frontier.
// Mirror presented frontier changes into the shared handle.
if let Some(shared_frontier) = shared_frontier.upgrade() {
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
}

// At initialization, we have a few tasks.
if !started {
// discard initial capability.
// We must discard the capability held by `OpereratorCore`.
progress.internals[0].update(G::Timestamp::minimum(), -1);
// We must retract the conservative hold in the shared handle.
if let Some(shared_frontier) = shared_frontier.upgrade() {
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(std::iter::once((Timestamp::minimum(), -1)));
Expand Down
7 changes: 3 additions & 4 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,11 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
let mut borrow = self.internal.borrow_mut();
for (time, count) in borrow.drain() {
fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
for (time, count) in self.internal.borrow_mut().drain() {
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
}
(Vec::new(), Rc::clone(&self.shared_progress))
(Vec::new(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { false }
Expand Down
31 changes: 9 additions & 22 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo;
pub struct OperatorShape {
name: String, // A meaningful name for the operator.
notify: bool, // Does the operator require progress notifications.
peers: usize, // The total number of workers in the computation.
peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude.
inputs: usize, // The number of input ports.
outputs: usize, // The number of output ports.
}
Expand All @@ -42,14 +42,10 @@ impl OperatorShape {
}

/// The number of inputs of this operator
pub fn inputs(&self) -> usize {
self.inputs
}
pub fn inputs(&self) -> usize { self.inputs }

/// The number of outputs of this operator
pub fn outputs(&self) -> usize {
self.outputs
}
pub fn outputs(&self) -> usize { self.outputs }
}

/// Builds operators with generic shape.
Expand Down Expand Up @@ -84,24 +80,16 @@ impl<G: Scope> OperatorBuilder<G> {
}

/// The operator's scope-local index.
pub fn index(&self) -> usize {
self.index
}
pub fn index(&self) -> usize { self.index }

/// The operator's worker-unique identifier.
pub fn global(&self) -> usize {
self.global
}
pub fn global(&self) -> usize { self.global }

/// Return a reference to the operator's shape
pub fn shape(&self) -> &OperatorShape {
&self.shape
}
pub fn shape(&self) -> &OperatorShape { &self.shape }

/// Indicates whether the operator requires frontier information.
pub fn set_notify(&mut self, notify: bool) {
self.shape.notify = notify;
}
pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; }

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> P::Puller
Expand Down Expand Up @@ -134,7 +122,6 @@ impl<G: Scope> OperatorBuilder<G> {

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, Stream<G, C>) {

let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}
Expand Down Expand Up @@ -218,7 +205,7 @@ where
fn outputs(&self) -> usize { self.shape.outputs }

// announce internal topology as fully connected, and hold all default capabilities.
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {

// Request the operator to be scheduled at least once.
self.activations.borrow_mut().activate(&self.address[..]);
Expand All @@ -230,7 +217,7 @@ where
.iter_mut()
.for_each(|output| output.update(T::minimum(), self.shape.peers as i64));

(self.summary.clone(), Rc::clone(&self.shared_progress))
(self.summary.clone(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { self.shape.notify }
Expand Down
20 changes: 5 additions & 15 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ impl<G: Scope> OperatorBuilder<G> {
}

/// Indicates whether the operator requires frontier information.
pub fn set_notify(&mut self, notify: bool) {
self.builder.set_notify(notify);
}
pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); }

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
Expand Down Expand Up @@ -197,24 +195,16 @@ impl<G: Scope> OperatorBuilder<G> {
}

/// Get the identifier assigned to the operator being constructed
pub fn index(&self) -> usize {
self.builder.index()
}
pub fn index(&self) -> usize { self.builder.index() }

/// The operator's worker-unique identifier.
pub fn global(&self) -> usize {
self.builder.global()
}
pub fn global(&self) -> usize { self.builder.global() }

/// Return a reference to the operator's shape
pub fn shape(&self) -> &OperatorShape {
self.builder.shape()
}
pub fn shape(&self) -> &OperatorShape { self.builder.shape() }

/// Creates operator info for the operator.
pub fn operator_info(&self) -> OperatorInfo {
self.builder.operator_info()
}
pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() }
}


Expand Down
37 changes: 25 additions & 12 deletions timely/src/progress/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ use std::cell::RefCell;
use crate::scheduling::Schedule;
use crate::progress::{Timestamp, ChangeBatch, Antichain};

/// Methods for describing an operators topology, and the progress it makes.
pub trait Operate<T: Timestamp> : Schedule {
/// A dataflow operator that progress with a specific timestamp type.
///
/// This trait describes the methods necessary to present as a dataflow operator.
/// This trait is a "builder" for operators, in that it reveals the structure of the operator
/// and its requirements, but then (through `initialize`) consumes itself to produce a boxed
/// schedulable object. At the moment of initialization, the values of the other methods are
/// captured and frozen.
pub trait Operate<T: Timestamp> {

/// Indicates if the operator is strictly local to this worker.
///
Expand All @@ -33,20 +39,27 @@ pub trait Operate<T: Timestamp> : Schedule {
/// The number of outputs.
fn outputs(&self) -> usize;

/// Fetches summary information about internal structure of the operator.
/// Initializes the operator, converting the operator builder to a schedulable object.
///
/// Each operator must summarize its internal structure by a map from pairs `(input, output)`
/// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
/// be transformed to timestamps on any of its outputs.
/// In addition, initialization produces internal connectivity, and a shared progress conduit
/// which must contain any initial output capabilities the operator would like to hold.
///
/// Each operator must also indicate whether it initially holds any capabilities on any of its
/// outputs, so that the parent operator can properly initialize its progress information.
/// The internal connectivity summarizes the operator by a map from pairs `(input, output)`
/// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
/// be transformed to timestamps on any of its outputs. The conservative and most common result
/// is full connectivity between all inputs and outputs, each with the identity summary.
///
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
/// any output, and no initial capabilities are held.
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
/// The shared progress object allows information to move between the host and the schedulable.
/// Importantly, it also indicates the initial internal capabilities for all of its outputs.
/// This must happen at this moment, as it is the only moment where an operator is allowed to
/// safely "create" capabilities without basing them on other, prior capabilities.
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);

/// Indicates of whether the operator requires `push_external_progress` information or not.
/// Indicates if the operator should be invoked on the basis of input frontier transitions.
///
/// This value is conservatively set to `true`, but operators that know they are oblivious to
/// frontier information can indicate this with `false`, and they will not be scheduled on the
/// basis of their input frontiers changing.
fn notify_me(&self) -> bool { true }
}

Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
/// Internal connections within hosted operators.
///
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// same format returned by `initialize`, as if we simply appended
/// all of the summaries for the hosted nodes.
pub nodes: Vec<Connectivity<T::Summary>>,
/// Direct connections from sources to targets.
Expand Down Expand Up @@ -287,7 +287,7 @@
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter_ports() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {

Check warning on line 290 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`summary` shadows a previous, unrelated binding
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -359,7 +359,7 @@
/// Internal connections within hosted operators.
///
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// same format returned by `initialize`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: Vec<Connectivity<T::Summary>>,
/// Direct connections from sources to targets.
Expand Down Expand Up @@ -663,7 +663,7 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 666 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 666 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
let nodes = &self.nodes[location.node][port_index];
for (output_port, summaries) in nodes.iter_ports() {
let source = Location { node: location.node, port: Port::Source(output_port) };
Expand All @@ -686,7 +686,7 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 689 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 689 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
for new_target in self.edges[location.node][port_index].iter() {
self.worklist.push(Reverse((
time.clone(),
Expand Down
21 changes: 11 additions & 10 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@

/// Adds a new child to the subgraph.
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging);
if let Some(l) = &mut self.logging {
let mut child_path = Vec::with_capacity(self.path.len() + 1);
child_path.extend_from_slice(&self.path[..]);
Expand All @@ -142,10 +143,10 @@
l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
name: child.name.to_owned(),
});
}
self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging));
self.children.push(child);
}

/// Now that initialization is complete, actually build a subgraph.
Expand Down Expand Up @@ -545,7 +546,7 @@

// produces connectivity summaries from inputs to outputs, and reports initial internal
// capabilities on each of the outputs (projecting capabilities from contained scopes).
fn get_internal_summary(&mut self) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>) {
fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {

// double-check that child 0 (the outside world) is correctly shaped.
assert_eq!(self.children[0].outputs, self.inputs());
Expand Down Expand Up @@ -583,7 +584,7 @@
self.propagate_pointstamps(); // Propagate expressed capabilities to output frontiers.

// Return summaries and shared progress information.
(internal_summary, Rc::clone(&self.shared_progress))
(internal_summary, Rc::clone(&self.shared_progress), self)
}
}

Expand All @@ -598,13 +599,13 @@
inputs: usize, // number of inputs to the operator
outputs: usize, // number of outputs from the operator

operator: Option<Box<dyn Operate<T>>>,
operator: Option<Box<dyn Schedule>>,

edges: Vec<Vec<Target>>, // edges from the outputs of the operator

shared_progress: Rc<RefCell<SharedProgress<T>>>,

internal_summary: Connectivity<T::Summary>, // cached result from get_internal_summary.
internal_summary: Connectivity<T::Summary>, // cached result from initialize.

logging: Option<Logger>,
}
Expand Down Expand Up @@ -632,7 +633,7 @@
}

pub fn new(
mut scope: Box<dyn Operate<T>>,
scope: Box<dyn Operate<T>>,
index: usize,
identifier: usize,
logging: Option<Logger>,
Expand All @@ -644,7 +645,7 @@
let outputs = scope.outputs();
let notify = scope.notify_me();

let (internal_summary, shared_progress) = scope.get_internal_summary();
let (internal_summary, shared_progress, operator) = scope.initialize();

if let Some(l) = summary_logging {
l.log(crate::logging::OperatesSummaryEvent {
Expand All @@ -666,8 +667,8 @@
);

PerOperatorState {
name: scope.name().to_owned(),
operator: Some(scope),
name: operator.name().to_owned(),
operator: Some(operator),
index,
id: identifier,
local,
Expand Down Expand Up @@ -775,7 +776,7 @@
for (time, diff) in internal.iter() {
if *diff > 0 {
let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
let internal = child_state.sources[output].implications.less_equal(time);

Check warning on line 779 in timely/src/progress/subgraph.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`internal` shadows a previous, unrelated binding
if !consumed && !internal {
println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
panic!("Progress error; internal {:?}", self.name);
Expand Down
2 changes: 1 addition & 1 deletion timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::mpsc::{Sender, Receiver};
/// There is no known harm to "spurious wake-ups" where a not-active path is
/// returned through `extensions()`.
pub trait Scheduler {
/// Mark a path as immediately scheduleable.
/// Mark a path as immediately schedulable.
fn activate(&mut self, path: &[usize]);
/// Populates `dest` with next identifiers on active extensions of `path`.
///
Expand Down
6 changes: 3 additions & 3 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ impl<A: Allocate> Worker<A> {
func(&mut resources, &mut builder)
};

let mut operator = subscope.into_inner().build(self);
let operator = subscope.into_inner().build(self);

if let Some(l) = logging.as_mut() {
l.log(crate::logging::OperatesEvent {
Expand All @@ -679,15 +679,15 @@ impl<A: Allocate> Worker<A> {
l.flush();
}

operator.get_internal_summary();
let (_, _, operator) = Box::new(operator).initialize();

let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();

let wrapper = Wrapper {
logging,
identifier,
operate: Some(Box::new(operator)),
operate: Some(operator),
resources: Some(Box::new(resources)),
channel_ids,
};
Expand Down
Loading