diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 5deb5fed8..b9cbaf6c7 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -28,6 +28,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" smallvec = { version = "1.13.2", features = ["serde", "const_generics"] } +columnar = { git = "https://github.com/frankmcsherry/columnar" } [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index ee3b52515..886d6b35b 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -34,7 +34,7 @@ fn main() { println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); - for round in 0 .. { + for round in 0 .. 10 { let dataflow = round % dataflows; if record { inputs[dataflow].send(()); diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 0380216c8..c223c448f 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -75,6 +75,9 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; +use columnar::{Len, Index}; +use columnar::Vecs; + use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; @@ -84,6 +87,49 @@ use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::timestamp::PathSummary; +use antichains::Antichains; + +/// A stand-in for `Vec>`. +mod antichains { + + use columnar::{Len, Index, Push}; + use columnar::Vecs; + + use crate::progress::Antichain; + + #[derive(Clone, Debug)] + pub struct Antichains (Vecs>); + + impl Default for Antichains { + fn default() -> Self { + Self (Default::default()) + } + } + + impl Len for Antichains { + #[inline(always)] fn len(&self) -> usize { self.0.len() } + } + + impl Push> for Antichains { + #[inline(always)] + fn push(&mut self, item: Antichain) { + columnar::Push::extend(&mut self.0.values, item); + self.0.bounds.push(self.0.values.len()); + } + } + + impl<'a, T> Index for &'a Antichains { + type Ref = <&'a Vecs> as Index>::Ref; + + #[inline(always)] + fn get(&self, index: usize) -> Self::Ref { + (&self.0).get(index) + } + } +} + + + /// A topology builder, which can summarize reachability along paths. /// /// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles @@ -132,14 +178,14 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - pub nodes: Vec>>>, + nodes: Vecs>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - pub edges: Vec>>, + edges: Vec>>, /// Numbers of inputs and outputs for each node. - pub shape: Vec<(usize, usize)>, + shape: Vec<(usize, usize)>, } impl Builder { @@ -147,7 +193,7 @@ impl Builder { /// Create a new empty topology builder. pub fn new() -> Self { Builder { - nodes: Vec::new(), + nodes: Default::default(), edges: Vec::new(), shape: Vec::new(), } @@ -155,20 +201,20 @@ impl Builder { /// Add links internal to operators. /// - /// This method overwrites any existing summary, instead of anything more sophisticated. + /// Nodes must be added in strictly increasing order of `index`. pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec>>) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } - while self.nodes.len() <= index { - self.nodes.push(Vec::new()); - self.edges.push(Vec::new()); - self.shape.push((0, 0)); - } + assert_eq!(self.nodes.len(), index); + + use columnar::Push; + self.nodes.push(summary); + self.edges.push(Vec::new()); + self.shape.push((0, 0)); - self.nodes[index] = summary; if self.edges[index].len() != outputs { self.edges[index] = vec![Vec::new(); outputs]; } @@ -270,7 +316,7 @@ impl Builder { // Load edges as default summaries. for (index, ports) in self.edges.iter().enumerate() { - for (output, targets) in ports.iter().enumerate() { + for (output, targets) in (*ports).iter().enumerate() { let source = Location::new_source(index, output); in_degree.entry(source).or_insert(0); for &target in targets.iter() { @@ -281,13 +327,13 @@ impl Builder { } // Load default intra-node summaries. - for (index, summary) in self.nodes.iter().enumerate() { - for (input, outputs) in summary.iter().enumerate() { + for (index, summary) in (&self.nodes).into_iter().enumerate() { + for (input, outputs) in summary.into_iter().enumerate() { let target = Location::new_target(index, input); in_degree.entry(target).or_insert(0); - for (output, summaries) in outputs.iter().enumerate() { + for (output, summaries) in outputs.into_iter().enumerate() { let source = Location::new_source(index, output); - for summary in summaries.elements().iter() { + for summary in summaries.into_iter() { if summary == &Default::default() { *in_degree.entry(source).or_insert(0) += 1; } @@ -322,9 +368,9 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter().enumerate() { + for (output, summaries) in (&self.nodes).get(node).get(port).into_iter().enumerate() { let source = Location::new_source(node, output); - for summary in summaries.elements().iter() { + for summary in summaries.into_iter() { if summary == &Default::default() { *in_degree.get_mut(&source).unwrap() -= 1; if in_degree[&source] == 0 { @@ -361,12 +407,12 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: Vec>>>, + nodes: Vecs>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - edges: Vec>>, + edges: Vecs>>, // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations @@ -544,10 +590,16 @@ impl Tracker { let scope_outputs = builder.shape[0].0; let output_changes = vec![ChangeBatch::new(); scope_outputs]; + use columnar::Push; + let mut edges: Vecs>> = Default::default(); + for edge in builder.edges { + edges.push(edge); + } + let tracker = Tracker { nodes: builder.nodes, - edges: builder.edges, + edges, per_operator, target_changes: ChangeBatch::new(), source_changes: ChangeBatch::new(), @@ -663,10 +715,10 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - let nodes = &self.nodes[location.node][port_index]; - for (output_port, summaries) in nodes.iter().enumerate() { + let nodes = &(&self.nodes).get(location.node).get(port_index); + for (output_port, summaries) in nodes.into_iter().enumerate() { let source = Location { node: location.node, port: Port::Source(output_port) }; - for summary in summaries.elements().iter() { + for summary in summaries.into_iter() { if let Some(new_time) = summary.results_in(&time) { self.worklist.push(Reverse((new_time, source, diff))); } @@ -686,7 +738,7 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - for new_target in self.edges[location.node][port_index].iter() { + for new_target in (&self.edges).get(location.node).get(port_index).into_iter() { self.worklist.push(Reverse(( time.clone(), Location::from(*new_target), @@ -738,14 +790,14 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &Vec>>>, + nodes: &Vecs>>, edges: &Vec>>, ) -> HashMap>> { // A reverse edge map, to allow us to walk back up the dataflow graph. let mut reverse = HashMap::new(); - for (node, outputs) in edges.iter().enumerate() { - for (output, targets) in outputs.iter().enumerate() { + for (node, outputs) in columnar::Index::into_iter(edges).enumerate() { + for (output, targets) in columnar::Index::into_iter(outputs).enumerate() { for target in targets.iter() { reverse.insert( Location::from(*target), @@ -759,10 +811,9 @@ fn summarize_outputs( let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); let outputs = - edges - .iter() - .flat_map(|x| x.iter()) - .flat_map(|x| x.iter()) + columnar::Index::into_iter(edges) + .flat_map(|x| columnar::Index::into_iter(x)) + .flat_map(|x| columnar::Index::into_iter(x)) .filter(|target| target.node == 0); // The scope may have no outputs, in which case we can do no work. @@ -780,7 +831,7 @@ fn summarize_outputs( Port::Source(output_port) => { // Consider each input port of the associated operator. - for (input_port, summaries) in nodes[location.node].iter().enumerate() { + for (input_port, summaries) in nodes.get(location.node).into_iter().enumerate() { // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; @@ -792,7 +843,7 @@ fn summarize_outputs( while antichains.len() <= output { antichains.push(Antichain::new()); } // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries[output_port].elements().iter() { + for operator_summary in summaries.get(output_port).into_iter() { if let Some(combined) = operator_summary.followed_by(&summary) { if antichains[output].insert(combined.clone()) { worklist.push_back((location, output, combined));