Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #112 #113

Merged
merged 9 commits into from
Feb 11, 2025
Prev Previous commit
Next Next commit
Render node with two features related to laterality.
napakalas committed Feb 7, 2025
commit d575e125d8468e7315f038040d2f66c47d10c3e0
167 changes: 105 additions & 62 deletions mapmaker/routing/__init__.py
Original file line number Diff line number Diff line change
@@ -955,21 +955,22 @@ def add_route_edges_from_graph(G, used_nodes):
neighbour_dict = connectivity_graph.nodes[next(iter(neighbour_dict['contains']))]
edge_dict = connectivity_graph.edges[(node, neighbour)]
if neighbour_dict['type'] == 'feature':
# selecting one the first neighbour feature to be connected with the closest no-segment node
# what about connecting all features?
(features := list(f for f in neighbour_dict['features'])).sort(key=lambda f: f.id)
f = features[0]
neighbouring_ids.update([f.id])
closest_feature_id = None
for n, s in itertools.product([f.id], segment_graph.nodes):
if (edge_dicts := self.__centreline_graph.get_edge_data(n, s)) is not None:
closest_feature_dict[n] = s
tmp_edge_dicts[(n, s)] = edge_dicts[0]
break
if f.id not in closest_feature_dict:
closest_feature_id = self.__closest_feature_id_to_point(f.geometry.centroid, segment_graph.nodes)
closest_feature_dict[f.id] = closest_feature_id
tmp_edge_dicts[(f.id, closest_feature_id)] = edge_dict
# if the node is a no-segment and neighbour is a terminal, connect to all neighbour features
# else connect to one closest no_segment point and neighbour feature.
features = (neighbour_dict['features'] if len(neighbour_dict['features']) <= 2
else sorted(neighbour_dict['features'], key=lambda f: f.id)[0:1])
for feature in features:
neighbouring_ids.update([feature.id])
closest_feature_id = None
for n, s in itertools.product([feature.id], segment_graph.nodes):
if (edge_dicts := self.__centreline_graph.get_edge_data(n, s)) is not None:
closest_feature_dict[n] = s
tmp_edge_dicts[(n, s)] = edge_dicts[0]
break
if feature.id not in closest_feature_dict:
closest_feature_id = self.__closest_feature_id_to_point(feature.geometry.centroid, segment_graph.nodes)
closest_feature_dict[feature.id] = closest_feature_id
tmp_edge_dicts[(feature.id, closest_feature_id)] = edge_dict
elif neighbour_dict['type'] in ['segment', 'no-segment']: # should check this limitation
candidates= {}
for n, s in itertools.product(neighbour_dict['subgraph'].nodes, segment_graph.nodes):
@@ -1149,17 +1150,15 @@ def get_ftu_node(feature: Feature):
return feature

# select the closest feature of a node with multiple features to it's neighbors
def get_node_feature(node_dict, neighbours, prev_features) -> Feature:
def get_node_feature(node_dict, neighbour_features, used_features) -> Feature:
(features:=list(f for f in node_dict['features'])).sort(key=lambda f: f.id)
selected_feature = features[0]
# in a case of a terminal node having multiple features, select the closest one to it's neighbours
# in a case of a terminal node having multiple features, select the closest one to it's neighbour_features
if len(features) > 1:
self.__log.error('Terminal node has multiple features', path=path.id, entity=node_dict['name'],
features=sorted(set(f.id for f in node_dict["features"])))
if selected_feature.properties.get('fc-class') is None:
feature_distances = {}
neighbour_features = [f for n in neighbours for f in connectivity_graph.nodes[n].get('features', [])] + \
[self.__flatmap.get_feature(fid) for n in neighbours for fid in connectivity_graph.nodes[n].get('used', [])]
if len(neighbour_features) > 0:
for f in features:
distances = []
@@ -1168,6 +1167,7 @@ def get_node_feature(node_dict, neighbours, prev_features) -> Feature:
feature_distances[f] = sum(distances)/len(distances)
selected_feature = min(feature_distances, key=feature_distances.get) # type: ignore
else:
prev_features = [feature for features in used_features.values() for feature in features]
if len(prev_features) > 0 and len(features) > 1:
min_distance = prev_features[-1].geometry.centroid.distance(features[0].geometry.centroid)
for f in (features:=[f for f in features[1:]]):
@@ -1189,10 +1189,34 @@ def get_node_feature(node_dict, neighbours, prev_features) -> Feature:
if len([node for node in subgraph.nodes if connectivity_graph.degree[node] == 1]) == 0:
pseudo_terminals += list(subgraph.nodes)[0:1]

# sorting nodes with priority -> terminal, number of features (2 than 1, than any size), distance to neighbours
terminal_one_feature = {}
terminal_one_feature = {
n: min(
(features[0].geometry.centroid.distance(nf.geometry.centroid)),
terminal_one_feature.get(n, float('inf'))
)
for n, n_dict in connectivity_graph.nodes(data=True)
if connectivity_graph.degree(n) == 1 and len(features := list(n_dict.get("features", []))) == 1
for neighbour in connectivity_graph.neighbors(n)
for nf in connectivity_graph.nodes[neighbour].get("features", [])
}
terminal_one_feature = dict(sorted(terminal_one_feature.items(), key=lambda item: item[1]))
terminal_two_features = [
n for n, n_dict in connectivity_graph.nodes(data=True)
if len(n_dict.get("features", [])) == 2 and connectivity_graph.degree(n) == 1
]
sorted_nodes = (
terminal_two_features +
list(terminal_one_feature.keys()) +
[n for n in connectivity_graph if n not in terminal_two_features and n not in terminal_one_feature]
)

terminal_graphs: dict[tuple, nx.Graph] = {}
visited = set()
previous_features = []
for node, node_dict in connectivity_graph.nodes(data=True):
used_features = {}
for node in sorted_nodes:
node_dict = connectivity_graph.nodes[node]
if node not in visited and (connectivity_graph.degree(node) == 1 or node in pseudo_terminals):
if node_dict['type'] == 'feature':
# First check node isn't already the end of a centreline
@@ -1231,47 +1255,66 @@ def add_paths_to_neighbours(node, node_dict):
for neighbour in (neighbours - visited):
neighbour_dict = connectivity_graph.nodes[neighbour]
degree = connectivity_graph.degree(neighbour)
node_feature = get_node_feature(node_dict, [neighbour], previous_features)
previous_features.append(node_feature)
node_feature_centre = node_feature.geometry.centroid
if (debug_properties:=connectivity_graph.get_edge_data(node, neighbour)) is None:
debug_properties = {}
if neighbour_dict['type'] == 'feature':
terminal_graph.add_node(node_feature.id, feature=node_feature)
if len(used_ids := neighbour_dict.get('used', set())):
closest_feature_id = self.__closest_feature_id_to_point(node_feature_centre, used_ids)
terminal_graph.add_edge(node_feature.id, closest_feature_id,
upstream=True, **debug_properties)
segments = set()
for connected_edges in route_graph[closest_feature_id].values():
for edge_dict in connected_edges.values():
if (segment_id := edge_dict.get('segment')) is not None:
segments.add(segment_id)
terminal_graph.nodes[closest_feature_id]['upstream'] = True
terminal_graph.nodes[closest_feature_id]['segments'] = segments
else:
neighbour_feature = get_node_feature(neighbour_dict, [node], previous_features)
previous_features.append(neighbour_feature)
terminal_graph.add_node(neighbour_feature.id, feature=neighbour_feature)
terminal_graph.add_edge(node_feature.id, neighbour_feature.id, **debug_properties)
elif neighbour_dict['type'] == 'segment':
closest_feature_id = None
closest_distance = None
last_segment = None
for segment_id in neighbour_dict['subgraph'].graph['segment-ids']:
(segment_end, distance) = self.__closest_segment_node_to_point(node_feature_centre, segment_id)
if (segment_end is not None
and segment_end in route_graph
and (closest_distance is None or distance < closest_distance)):
closest_feature_id = segment_end
closest_distance = distance
last_segment = segment_id
if closest_feature_id is not None:
terminal_graph.add_edge(node_feature.id, closest_feature_id, upstream=True, **debug_properties)
terminal_graph.nodes[node_feature.id]['feature'] = node_feature
terminal_graph.nodes[closest_feature_id]['upstream'] = True
terminal_graph.nodes[closest_feature_id]['segments'] = set([last_segment])
neighbour_dict['used'] = {closest_feature_id}
node_features = (
[get_node_feature(node_dict, neighbour_dict.get('features', []), used_features)]
if len(node_dict['features']) != 2
else used_features.get(node, set())
if connectivity_graph.degree(node) > 1 and len(used_features.get(node, set())) in [1, 2]
else set(node_dict['features'])
if connectivity_graph.degree(node) == 1
else [get_node_feature(node_dict, neighbour_dict.get('features', []), used_features)]
)
for node_feature in node_features:
used_features.setdefault(node, set()).add(node_feature)
node_feature_centre = node_feature.geometry.centroid
debug_properties = connectivity_graph.get_edge_data(node, neighbour) or {}
if neighbour_dict['type'] == 'feature':
terminal_graph.add_node(node_feature.id, feature=node_feature)
if len(used_ids := neighbour_dict.get('used', set())):
closest_feature_id = self.__closest_feature_id_to_point(node_feature_centre, used_ids)
terminal_graph.add_edge(node_feature.id, closest_feature_id,
upstream=True, **debug_properties)
segments = set()
for connected_edges in route_graph[closest_feature_id].values():
for edge_dict in connected_edges.values():
if (segment_id := edge_dict.get('segment')) is not None:
segments.add(segment_id)
terminal_graph.nodes[closest_feature_id]['upstream'] = True
terminal_graph.nodes[closest_feature_id]['segments'] = segments
else:
neighbour_terminal_laterals = [
k for k in connectivity_graph[neighbour]
if connectivity_graph.degree(k) == 1 and len(connectivity_graph.nodes[k].get('features', [])) == 2
]
neighbour_features = (
neighbour_dict.get('features', [])
if len(neighbour_dict.get('features', [])) <= 2 and degree == 1 and len(node_features) == 1
else [get_node_feature(neighbour_dict, [node_feature], used_features)]
if len(neighbour_terminal_laterals) > 0 and len(used_features.get(neighbour, set())) == 0
else [get_node_feature(neighbour_dict, [node_feature], used_features)]
)
for neighbour_feature in neighbour_features:
used_features.setdefault(neighbour, set()).add(neighbour_feature)
terminal_graph.add_node(neighbour_feature.id, feature=neighbour_feature)
terminal_graph.add_edge(node_feature.id, neighbour_feature.id, **debug_properties)
elif neighbour_dict['type'] == 'segment':
closest_feature_id = None
closest_distance = None
last_segment = None
for segment_id in neighbour_dict['subgraph'].graph['segment-ids']:
(segment_end, distance) = self.__closest_segment_node_to_point(node_feature_centre, segment_id)
if (segment_end is not None
and segment_end in route_graph
and (closest_distance is None or distance < closest_distance)):
closest_feature_id = segment_end
closest_distance = distance
last_segment = segment_id
if closest_feature_id is not None:
terminal_graph.add_edge(node_feature.id, closest_feature_id, upstream=True, **debug_properties)
terminal_graph.nodes[node_feature.id]['feature'] = node_feature
terminal_graph.nodes[closest_feature_id]['upstream'] = True
terminal_graph.nodes[closest_feature_id]['segments'] = set([last_segment])
neighbour_dict['used'] = {closest_feature_id}
# Only have our neighbour visit their neighbours if the neighbour is unconnected
if degree > 1 and len(neighbour_dict['used']) == 0 and neighbour_dict['type'] == 'feature':
neighbours_neighbours.append((neighbour, neighbour_dict))