From 512a92310fdd0d26c7f90a522a083c22583e31b0 Mon Sep 17 00:00:00 2001 From: Dustin Carlino Date: Fri, 31 Mar 2023 12:51:23 +0100 Subject: [PATCH] Parallelize with rayon. Split step drops from 22s to 11s, and aggregating properties from 3.5s to 2s --- rust/Cargo.lock | 107 +++++++++++++++++++++++++++++++++++++++ rust/cli/Cargo.toml | 1 + rust/cli/src/main.rs | 62 ++++++++++++----------- rust/overline/Cargo.toml | 1 + rust/overline/src/lib.rs | 92 +++++++++++++++++---------------- 5 files changed, 190 insertions(+), 73 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6bf3225..a74a0d9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -52,6 +52,7 @@ dependencies = [ "geo", "geojson", "overline", + "rayon", ] [[package]] @@ -60,6 +61,55 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" +[[package]] +name = "crossbeam-channel" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2b3e8478797446514c91ef04bafcb59faba183e621ad488df88983cc14128c" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "either" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" + [[package]] name = "float_next_after" version = "1.0.0" @@ -137,6 +187,15 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.6" @@ -149,6 +208,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "libc" +version = "0.2.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" + [[package]] name = "libm" version = "0.2.6" @@ -174,6 +239,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memoffset" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" +dependencies = [ + "autocfg", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -184,6 +258,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "ordered-float" version = "3.6.0" @@ -200,6 +284,7 @@ dependencies = [ "geo", "geojson", "ordered-float", + "rayon", "serde", ] @@ -221,6 +306,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "robust" version = "0.2.3" diff --git a/rust/cli/Cargo.toml b/rust/cli/Cargo.toml index 7a1be62..43cb4d1 100644 --- a/rust/cli/Cargo.toml +++ b/rust/cli/Cargo.toml @@ -8,3 +8,4 @@ anyhow = "1.0.70" geo = "0.24.1" geojson = { version = "0.24.0", features = ["geo-types"] } overline = { path = "../overline" } +rayon = "1.7.0" diff --git a/rust/cli/src/main.rs b/rust/cli/src/main.rs index 96a5765..b131610 100644 --- a/rust/cli/src/main.rs +++ b/rust/cli/src/main.rs @@ -3,6 +3,7 @@ use std::time::Instant; use anyhow::{bail, Result}; use geo::GeodesicLength; use geojson::{Feature, FeatureCollection, GeoJson}; +use rayon::prelude::*; use overline::{feature_to_line_string, overline, Output}; @@ -127,41 +128,42 @@ fn aggregate_properties( grouped_indices: &Vec, properties: Vec<(String, Aggregation)>, ) -> Vec { - let mut output = Vec::new(); - for grouped in grouped_indices { - // Copy the geometry - let mut feature = Feature { - geometry: Some(geojson::Geometry { - value: geojson::Value::from(&grouped.geometry), + grouped_indices + .par_iter() + .map(|grouped| { + // Copy the geometry + let mut feature = Feature { + geometry: Some(geojson::Geometry { + value: geojson::Value::from(&grouped.geometry), + bbox: None, + foreign_members: None, + }), + properties: None, bbox: None, + id: None, foreign_members: None, - }), - properties: None, - bbox: None, - id: None, - foreign_members: None, - }; - // Aggregate each specified property - for (key, aggregation) in &properties { - // Ignore features without this property - let mut values = grouped - .indices - .iter() - .flat_map(|i| input[*i].property(&key)); - match aggregation { - Aggregation::KeepAny => { - if let Some(value) = values.next() { - feature.set_property(key, value.clone()); + }; + // Aggregate each specified property + for (key, aggregation) in &properties { + // Ignore features without this property + let mut values = grouped + .indices + .iter() + .flat_map(|i| input[*i].property(&key)); + match aggregation { + Aggregation::KeepAny => { + if let Some(value) = values.next() { + feature.set_property(key, value.clone()); + } + } + Aggregation::SumFloat => { + feature.set_property(key, values.flat_map(|x| x.as_f64()).sum::()); } - } - Aggregation::SumFloat => { - feature.set_property(key, values.flat_map(|x| x.as_f64()).sum::()); } } - } - output.push(feature); - } - output + feature + }) + .collect() } /* Test cases: diff --git a/rust/overline/Cargo.toml b/rust/overline/Cargo.toml index 74a1181..cdf48a1 100644 --- a/rust/overline/Cargo.toml +++ b/rust/overline/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" geo = "0.24.1" geojson = { version = "0.24.0", features = ["geo-types"] } ordered-float = "3.6.0" +rayon = "1.7.0" serde = { version = "1.0.159", features = ["derive"] } diff --git a/rust/overline/src/lib.rs b/rust/overline/src/lib.rs index a2fcbca..e1ed897 100644 --- a/rust/overline/src/lib.rs +++ b/rust/overline/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use geojson::Feature; use ordered_float::NotNan; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; // TODO Never aggregate across OSM ID threshold. Plumb through an optional property to restrict @@ -34,55 +35,60 @@ pub fn overline(input: &Vec) -> Vec { // TODO We also need to split some line segments if they're not matching up at existing points. // Then look at each input, accumulating points as long all the indices match - let mut output = Vec::new(); - for (idx, input) in input.iter().enumerate() { - // This state is reset as we look through this input's points - let mut pts_so_far = Vec::new(); - let mut indices_so_far = Vec::new(); - let mut keep_this_output = false; + input + .par_iter() + .enumerate() + .flat_map(|(idx, input_feature)| { + let mut intermediate_output = Vec::new(); - if let Some(geom) = feature_to_line_string(input) { - for line in geom.lines() { - // The segment is guaranteed to exist - let indices = &line_segments[&HashedPoint::new_line(line)]; + // This state is reset as we look through this input's points + let mut pts_so_far = Vec::new(); + let mut indices_so_far = Vec::new(); + let mut keep_this_output = false; - if &indices_so_far == indices { - assert_eq!(*pts_so_far.last().unwrap(), line.start); - pts_so_far.push(line.end); - continue; - } else if !pts_so_far.is_empty() { - // The overlap ends here - let add = Output { - geometry: std::mem::take(&mut pts_so_far).into(), - indices: std::mem::take(&mut indices_so_far), - }; - if keep_this_output { - output.push(add); + if let Some(geom) = feature_to_line_string(input_feature) { + for line in geom.lines() { + // The segment is guaranteed to exist + let indices = &line_segments[&HashedPoint::new_line(line)]; + + if &indices_so_far == indices { + assert_eq!(*pts_so_far.last().unwrap(), line.start); + pts_so_far.push(line.end); + continue; + } else if !pts_so_far.is_empty() { + // The overlap ends here + let add = Output { + geometry: std::mem::take(&mut pts_so_far).into(), + indices: std::mem::take(&mut indices_so_far), + }; + if keep_this_output { + intermediate_output.push(add); + } + // Reset below } - // Reset below - } - assert!(pts_so_far.is_empty()); - pts_so_far.push(line.start); - pts_so_far.push(line.end); - indices_so_far = indices.clone(); - // Say we're processing input 2, and we have a segment with indices [2, 5]. We want to - // add it to output. But later we'll work on input 5 and see the same segment with - // indices [2, 5]. We don't want to add it again, so we'll skip it using the logic - // below, since we process input in order. - keep_this_output = indices_so_far.iter().all(|i| *i >= idx); + assert!(pts_so_far.is_empty()); + pts_so_far.push(line.start); + pts_so_far.push(line.end); + indices_so_far = indices.clone(); + // Say we're processing input 2, and we have a segment with indices [2, 5]. We want to + // add it to output. But later we'll work on input 5 and see the same segment with + // indices [2, 5]. We don't want to add it again, so we'll skip it using the logic + // below, since we process input in order. + keep_this_output = indices_so_far.iter().all(|i| *i >= idx); + } + } + // This input ended; add to output if needed + if !pts_so_far.is_empty() && keep_this_output { + intermediate_output.push(Output { + geometry: pts_so_far.into(), + indices: indices_so_far, + }); } - } - // This input ended; add to output if needed - if !pts_so_far.is_empty() && keep_this_output { - output.push(Output { - geometry: pts_so_far.into(), - indices: indices_so_far, - }); - } - } - output + intermediate_output + }) + .collect() } pub fn feature_to_line_string(f: &Feature) -> Option> {