Skip to content

Commit

Permalink
Parallelize with rayon. Split step drops from 22s to 11s, and
Browse files Browse the repository at this point in the history
aggregating properties from 3.5s to 2s
  • Loading branch information
dabreegster committed Mar 31, 2023
1 parent 5bfb0f8 commit 512a923
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 73 deletions.
107 changes: 107 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ anyhow = "1.0.70"
geo = "0.24.1"
geojson = { version = "0.24.0", features = ["geo-types"] }
overline = { path = "../overline" }
rayon = "1.7.0"
62 changes: 32 additions & 30 deletions rust/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::Instant;
use anyhow::{bail, Result};
use geo::GeodesicLength;
use geojson::{Feature, FeatureCollection, GeoJson};
use rayon::prelude::*;

use overline::{feature_to_line_string, overline, Output};

Expand Down Expand Up @@ -127,41 +128,42 @@ fn aggregate_properties(
grouped_indices: &Vec<Output>,
properties: Vec<(String, Aggregation)>,
) -> Vec<Feature> {
let mut output = Vec::new();
for grouped in grouped_indices {
// Copy the geometry
let mut feature = Feature {
geometry: Some(geojson::Geometry {
value: geojson::Value::from(&grouped.geometry),
grouped_indices
.par_iter()
.map(|grouped| {
// Copy the geometry
let mut feature = Feature {
geometry: Some(geojson::Geometry {
value: geojson::Value::from(&grouped.geometry),
bbox: None,
foreign_members: None,
}),
properties: None,
bbox: None,
id: None,
foreign_members: None,
}),
properties: None,
bbox: None,
id: None,
foreign_members: None,
};
// Aggregate each specified property
for (key, aggregation) in &properties {
// Ignore features without this property
let mut values = grouped
.indices
.iter()
.flat_map(|i| input[*i].property(&key));
match aggregation {
Aggregation::KeepAny => {
if let Some(value) = values.next() {
feature.set_property(key, value.clone());
};
// Aggregate each specified property
for (key, aggregation) in &properties {
// Ignore features without this property
let mut values = grouped
.indices
.iter()
.flat_map(|i| input[*i].property(&key));
match aggregation {
Aggregation::KeepAny => {
if let Some(value) = values.next() {
feature.set_property(key, value.clone());
}
}
Aggregation::SumFloat => {
feature.set_property(key, values.flat_map(|x| x.as_f64()).sum::<f64>());
}
}
Aggregation::SumFloat => {
feature.set_property(key, values.flat_map(|x| x.as_f64()).sum::<f64>());
}
}
}
output.push(feature);
}
output
feature
})
.collect()
}

/* Test cases:
Expand Down
1 change: 1 addition & 0 deletions rust/overline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ edition = "2021"
geo = "0.24.1"
geojson = { version = "0.24.0", features = ["geo-types"] }
ordered-float = "3.6.0"
rayon = "1.7.0"
serde = { version = "1.0.159", features = ["derive"] }
92 changes: 49 additions & 43 deletions rust/overline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use geojson::Feature;
use ordered_float::NotNan;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};

// TODO Never aggregate across OSM ID threshold. Plumb through an optional property to restrict
Expand Down Expand Up @@ -34,55 +35,60 @@ pub fn overline(input: &Vec<Feature>) -> Vec<Output> {
// TODO We also need to split some line segments if they're not matching up at existing points.

// Then look at each input, accumulating points as long all the indices match
let mut output = Vec::new();
for (idx, input) in input.iter().enumerate() {
// This state is reset as we look through this input's points
let mut pts_so_far = Vec::new();
let mut indices_so_far = Vec::new();
let mut keep_this_output = false;
input
.par_iter()
.enumerate()
.flat_map(|(idx, input_feature)| {
let mut intermediate_output = Vec::new();

if let Some(geom) = feature_to_line_string(input) {
for line in geom.lines() {
// The segment is guaranteed to exist
let indices = &line_segments[&HashedPoint::new_line(line)];
// This state is reset as we look through this input's points
let mut pts_so_far = Vec::new();
let mut indices_so_far = Vec::new();
let mut keep_this_output = false;

if &indices_so_far == indices {
assert_eq!(*pts_so_far.last().unwrap(), line.start);
pts_so_far.push(line.end);
continue;
} else if !pts_so_far.is_empty() {
// The overlap ends here
let add = Output {
geometry: std::mem::take(&mut pts_so_far).into(),
indices: std::mem::take(&mut indices_so_far),
};
if keep_this_output {
output.push(add);
if let Some(geom) = feature_to_line_string(input_feature) {
for line in geom.lines() {
// The segment is guaranteed to exist
let indices = &line_segments[&HashedPoint::new_line(line)];

if &indices_so_far == indices {
assert_eq!(*pts_so_far.last().unwrap(), line.start);
pts_so_far.push(line.end);
continue;
} else if !pts_so_far.is_empty() {
// The overlap ends here
let add = Output {
geometry: std::mem::take(&mut pts_so_far).into(),
indices: std::mem::take(&mut indices_so_far),
};
if keep_this_output {
intermediate_output.push(add);
}
// Reset below
}
// Reset below
}

assert!(pts_so_far.is_empty());
pts_so_far.push(line.start);
pts_so_far.push(line.end);
indices_so_far = indices.clone();
// Say we're processing input 2, and we have a segment with indices [2, 5]. We want to
// add it to output. But later we'll work on input 5 and see the same segment with
// indices [2, 5]. We don't want to add it again, so we'll skip it using the logic
// below, since we process input in order.
keep_this_output = indices_so_far.iter().all(|i| *i >= idx);
assert!(pts_so_far.is_empty());
pts_so_far.push(line.start);
pts_so_far.push(line.end);
indices_so_far = indices.clone();
// Say we're processing input 2, and we have a segment with indices [2, 5]. We want to
// add it to output. But later we'll work on input 5 and see the same segment with
// indices [2, 5]. We don't want to add it again, so we'll skip it using the logic
// below, since we process input in order.
keep_this_output = indices_so_far.iter().all(|i| *i >= idx);
}
}
// This input ended; add to output if needed
if !pts_so_far.is_empty() && keep_this_output {
intermediate_output.push(Output {
geometry: pts_so_far.into(),
indices: indices_so_far,
});
}
}
// This input ended; add to output if needed
if !pts_so_far.is_empty() && keep_this_output {
output.push(Output {
geometry: pts_so_far.into(),
indices: indices_so_far,
});
}
}

output
intermediate_output
})
.collect()
}

pub fn feature_to_line_string(f: &Feature) -> Option<geo::LineString<f64>> {
Expand Down

0 comments on commit 512a923

Please sign in to comment.