diff --git a/mapmaker/flatmap/__init__.py b/mapmaker/flatmap/__init__.py index c19546e2..8d8e5182 100644 --- a/mapmaker/flatmap/__init__.py +++ b/mapmaker/flatmap/__init__.py @@ -247,7 +247,13 @@ def save_feature_for_node_lookup(self, feature: Feature): def features_for_anatomical_node(self, anatomical_node: AnatomicalNode, warn: bool=True) -> Optional[tuple[AnatomicalNode, set[Feature]]]: #========================================================================================================================================= if self.__feature_node_map is not None: - return self.__feature_node_map.features_for_anatomical_node(anatomical_node, warn=warn) + if len((features:=self.__feature_node_map.features_for_anatomical_node(anatomical_node, warn=warn))[1]) > 0: + return features + if len(fts:=set(feature for feature in self.__features_with_id.values() + if feature.models in [features[0][0]]+list(features[0][1]) + and feature.get_property('kind')=='proxy')) > 0: + return (anatomical_node, fts) + return features def duplicate_feature_id(self, feature_ids: str) -> bool: #======================================================== @@ -299,6 +305,7 @@ def __add_proxied_features(self): if self.__bottom_exported_layer is None: log.warning('No exported layer on which to add proxy features', type='proxy') return + proxy_seqs = {} for proxy_definition in self.__properties_store.proxies: feature_model = proxy_definition['feature'] if self.__feature_node_map.has_model(feature_model): @@ -308,16 +315,17 @@ def __add_proxied_features(self): if not self.__feature_node_map.has_model(proxy_model): log.warning('Proxy missing from map', type='proxy', models=feature_model, proxy=proxy_model) for feature in self.__feature_node_map.get_features(proxy_model): - self.__add_proxy_feature(feature, feature_model) + proxy_seqs[feature.id] = proxy_seqs.get(feature.id, -1) + 1 + self.__add_proxy_feature(feature, feature_model, proxy_seqs[feature.id]) - def __add_proxy_feature(self, feature: Feature, feature_model: str): - #=================================================================== + def __add_proxy_feature(self, feature: Feature, feature_model: str, proxy_seq: int): + #================================================================================ if 'Polygon' not in feature.geometry.geom_type: log.warning('Proxy feature must have a polygon shape', type='proxy', models=feature_model, feature=feature) elif self.__bottom_exported_layer is not None: self.__bottom_exported_layer.add_feature( - self.new_feature(self.__bottom_exported_layer.id, proxy_dot(feature.geometry), { # type: ignore - 'id': f'proxy_on_{feature.id}', + self.new_feature(self.__bottom_exported_layer.id, proxy_dot(feature.geometry, proxy_seq), { # type: ignore + 'id': f'proxy_{proxy_seq}_on_{feature.id}', 'tile-layer': FEATURES_TILE_LAYER, 'models': feature_model, 'kind': 'proxy' diff --git a/mapmaker/flatmap/feature.py b/mapmaker/flatmap/feature.py index d198345a..b9289fc0 100644 --- a/mapmaker/flatmap/feature.py +++ b/mapmaker/flatmap/feature.py @@ -130,7 +130,7 @@ def __init__(self, terms_alias_file: Optional[str]=None): for alias in equivalence.get('aliases', []): alias = (alias[0], tuple(alias[1])) if isinstance(alias, list) else alias if alias in self.__anatomical_aliases: - self.__log.error('Alias cannot map to both terms, alias=alias, terms=[self.__anatomical_aliases[alias], term]') + self.__log.error(f'Alias cannot map to both terms, alias=alias, terms={[self.__anatomical_aliases[alias], term]}') else: self.__anatomical_aliases[alias] = term self.__model_to_features: dict[str|tuple, set[Feature]] = defaultdict(set) diff --git a/mapmaker/geometry/proxy_dot.py b/mapmaker/geometry/proxy_dot.py index 166492a4..93bb359b 100644 --- a/mapmaker/geometry/proxy_dot.py +++ b/mapmaker/geometry/proxy_dot.py @@ -23,7 +23,7 @@ #=============================================================================== -def proxy_dot(poly: MultiPolygon|Polygon) -> Polygon: +def proxy_dot(poly: MultiPolygon|Polygon, proxy_seq: int) -> Polygon: envelope_coords = shapely.oriented_envelope(poly).boundary.coords edge_coords = list(zip(envelope_coords, envelope_coords[1:])) edges = [LineString(coords) for coords in edge_coords] @@ -35,8 +35,9 @@ def proxy_dot(poly: MultiPolygon|Polygon) -> Polygon: median_line = LineString([p0, p1]) else: median_line = LineString([p1, p0]) - proxy_point = shapely.line_interpolate_point(median_line, 0.8, normalized=True) - - return proxy_point.buffer(median_line.length/16) + distance = 0.8 - proxy_seq * 0.15 + proxy_point = shapely.line_interpolate_point(median_line, distance, normalized=True) + median_distance = median_line.length/16 if median_line.length/16 < 1000 else 1000 + return proxy_point.buffer(median_distance) #=============================================================================== diff --git a/mapmaker/properties/pathways.py b/mapmaker/properties/pathways.py index e795c3af..34cde5fe 100644 --- a/mapmaker/properties/pathways.py +++ b/mapmaker/properties/pathways.py @@ -673,9 +673,18 @@ def __route_network_connectivity(self, network: Network): path = paths_by_id[path_id] path_geojson_ids = [] path_taxons = None + added_properties = { + key: value for key, value in [ + ('missing-nodes', path.connectivity.graph.get('missing_nodes')), + ('alert', path.connectivity.graph.get('alert')), + ('biological-sex', path.connectivity.graph.get('biological-sex')), + ('completeness', path.connectivity.graph.get('completeness')), + ] if value is not None + } + for geometric_shape in geometric_shapes: if geometric_shape.properties.get('type') not in ['arrow', 'junction']: - properties = DEFAULT_PATH_PROPERTIES.copy() + properties = DEFAULT_PATH_PROPERTIES.copy() | added_properties if routed_path.centrelines is not None: # The list of nerve models that the path is associated with properties['nerves'] = routed_path.centrelines_model @@ -700,7 +709,7 @@ def __route_network_connectivity(self, network: Network): path_taxons = feature.get_property('taxons') for geometric_shape in geometric_shapes: - properties = DEFAULT_PATH_PROPERTIES.copy() + properties = DEFAULT_PATH_PROPERTIES.copy() | added_properties properties.update(geometric_shape.properties) if properties.get('type') in ['arrow', 'junction']: properties['kind'] = path.path_type.viewer_kind diff --git a/mapmaker/routing/__init__.py b/mapmaker/routing/__init__.py index 79010a6d..e5d17dd4 100644 --- a/mapmaker/routing/__init__.py +++ b/mapmaker/routing/__init__.py @@ -88,6 +88,14 @@ def pairwise(iterable): #=============================================================================== +NOT_LATERAL_NODES = [ + 'UBERON:0001896', + 'UBERON:0000988', + 'UBERON:0001891' +] + +#============================================================= + def expand_centreline_graph(graph: nx.MultiGraph) -> nx.Graph: #============================================================= G = nx.Graph() @@ -955,21 +963,25 @@ def add_route_edges_from_graph(G, used_nodes): neighbour_dict = connectivity_graph.nodes[next(iter(neighbour_dict['contains']))] edge_dict = connectivity_graph.edges[(node, neighbour)] if neighbour_dict['type'] == 'feature': - # selecting one the first neighbour feature to be connected with the closest no-segment node - # what about connecting all features? - (features := list(f for f in neighbour_dict['features'])).sort(key=lambda f: f.id) - f = features[0] - neighbouring_ids.update([f.id]) - closest_feature_id = None - for n, s in itertools.product([f.id], segment_graph.nodes): - if (edge_dicts := self.__centreline_graph.get_edge_data(n, s)) is not None: - closest_feature_dict[n] = s - tmp_edge_dicts[(n, s)] = edge_dicts[0] - break - if f.id not in closest_feature_dict: - closest_feature_id = self.__closest_feature_id_to_point(f.geometry.centroid, segment_graph.nodes) - closest_feature_dict[f.id] = closest_feature_id - tmp_edge_dicts[(f.id, closest_feature_id)] = edge_dict + # if the node is a no-segment and neighbour is a terminal, connect to all neighbour features + # else connect to one closest no_segment point and neighbour feature. + features = ( + neighbour_dict['features'] + if len(neighbour_dict['features']) <= 2 and all(item not in {neighbour_dict['node'][0], *neighbour_dict['node'][1]} for item in NOT_LATERAL_NODES) + else sorted(neighbour_dict['features'], key=lambda f: f.id)[:1] + ) + for feature in features: + neighbouring_ids.update([feature.id]) + closest_feature_id = None + for n, s in itertools.product([feature.id], segment_graph.nodes): + if (edge_dicts := self.__centreline_graph.get_edge_data(n, s)) is not None: + closest_feature_dict[n] = s + tmp_edge_dicts[(n, s)] = edge_dicts[0] + break + if feature.id not in closest_feature_dict: + closest_feature_id = self.__closest_feature_id_to_point(feature.geometry.centroid, segment_graph.nodes) + closest_feature_dict[feature.id] = closest_feature_id + tmp_edge_dicts[(feature.id, closest_feature_id)] = edge_dict elif neighbour_dict['type'] in ['segment', 'no-segment']: # should check this limitation candidates= {} for n, s in itertools.product(neighbour_dict['subgraph'].nodes, segment_graph.nodes): @@ -984,7 +996,7 @@ def add_route_edges_from_graph(G, used_nodes): if n_id in segment_graph: updated_neighbouring_ids.update([n_id]) else: - if (f:=self.__map_feature(n_id)) is not None: + if self.__map_feature(n_id) is not None: if (closest_feature_id:=closest_feature_dict.get(n_id)) is not None: updated_neighbouring_ids.update([closest_feature_id]) new_direct_edges.update([(n_id, closest_feature_id)]) @@ -1149,17 +1161,15 @@ def get_ftu_node(feature: Feature): return feature # select the closest feature of a node with multiple features to it's neighbors - def get_node_feature(node_dict, neighbours, prev_features) -> Feature: + def get_node_feature(node_dict, neighbour_features, used_features) -> Feature: (features:=list(f for f in node_dict['features'])).sort(key=lambda f: f.id) selected_feature = features[0] - # in a case of a terminal node having multiple features, select the closest one to it's neighbours + # in a case of a terminal node having multiple features, select the closest one to it's neighbour_features if len(features) > 1: self.__log.error('Terminal node has multiple features', path=path.id, entity=node_dict['name'], features=sorted(set(f.id for f in node_dict["features"]))) if selected_feature.properties.get('fc-class') is None: feature_distances = {} - neighbour_features = [f for n in neighbours for f in connectivity_graph.nodes[n].get('features', [])] + \ - [self.__flatmap.get_feature(fid) for n in neighbours for fid in connectivity_graph.nodes[n].get('used', [])] if len(neighbour_features) > 0: for f in features: distances = [] @@ -1168,6 +1178,7 @@ def get_node_feature(node_dict, neighbours, prev_features) -> Feature: feature_distances[f] = sum(distances)/len(distances) selected_feature = min(feature_distances, key=feature_distances.get) # type: ignore else: + prev_features = [feature for features in used_features.values() for feature in features] if len(prev_features) > 0 and len(features) > 1: min_distance = prev_features[-1].geometry.centroid.distance(features[0].geometry.centroid) for f in (features:=[f for f in features[1:]]): @@ -1186,14 +1197,38 @@ def get_node_feature(node_dict, neighbours, prev_features) -> Feature: (temp_connectivity_graph := nx.Graph(connectivity_graph)).remove_nodes_from(centrelines) subgraphs = [temp_connectivity_graph.subgraph(component) for component in nx.connected_components(temp_connectivity_graph)] for subgraph in subgraphs: - if len([node for node in subgraph.nodes if connectivity_graph.degree[node] == 1]) > 0: - break - pseudo_terminals += list(subgraph.nodes)[0:1] + if len([node for node in subgraph.nodes if connectivity_graph.degree[node] == 1]) == 0: + pseudo_terminals += list(subgraph.nodes)[0:1] + + # sorting nodes with priority -> terminal, number of features (2 than 1, than any size), distance to neighbours + one_feature_terminals = { + n: min([ + features[0].geometry.centroid.distance(nf.geometry.centroid) + for neighbour in connectivity_graph.neighbors(n) + for nf in ( + connectivity_graph.nodes[neighbour].get("features", set()) | + {self.__flatmap.get_feature(f_id) for f_id in connectivity_graph.nodes[neighbour].get("used", set())} + ) + ]) + for n, n_dict in connectivity_graph.nodes(data=True) + if connectivity_graph.degree(n) == 1 and len(features := list(n_dict.get("features", []))) == 1 + } + one_feature_terminals = dict(sorted(one_feature_terminals.items(), key=lambda item: item[1])) + two_feature_terminals = [ + n for n, n_dict in connectivity_graph.nodes(data=True) + if len(n_dict.get("features", [])) == 2 and connectivity_graph.degree(n) == 1 + ] + sorted_nodes = ( + two_feature_terminals + + list(one_feature_terminals.keys()) + + [n for n in connectivity_graph if n not in two_feature_terminals and n not in one_feature_terminals] + ) terminal_graphs: dict[tuple, nx.Graph] = {} visited = set() - previous_features = [] - for node, node_dict in connectivity_graph.nodes(data=True): + used_features = {} + for node in sorted_nodes: + node_dict = connectivity_graph.nodes[node] if node not in visited and (connectivity_graph.degree(node) == 1 or node in pseudo_terminals): if node_dict['type'] == 'feature': # First check node isn't already the end of a centreline @@ -1232,47 +1267,68 @@ def add_paths_to_neighbours(node, node_dict): for neighbour in (neighbours - visited): neighbour_dict = connectivity_graph.nodes[neighbour] degree = connectivity_graph.degree(neighbour) - node_feature = get_node_feature(node_dict, [neighbour], previous_features) - previous_features.append(node_feature) - node_feature_centre = node_feature.geometry.centroid - if (debug_properties:=connectivity_graph.get_edge_data(node, neighbour)) is None: - debug_properties = {} - if neighbour_dict['type'] == 'feature': - terminal_graph.add_node(node_feature.id, feature=node_feature) - if len(used_ids := neighbour_dict.get('used', set())): - closest_feature_id = self.__closest_feature_id_to_point(node_feature_centre, used_ids) - terminal_graph.add_edge(node_feature.id, closest_feature_id, - upstream=True, **debug_properties) - segments = set() - for connected_edges in route_graph[closest_feature_id].values(): - for edge_dict in connected_edges.values(): - if (segment_id := edge_dict.get('segment')) is not None: - segments.add(segment_id) - terminal_graph.nodes[closest_feature_id]['upstream'] = True - terminal_graph.nodes[closest_feature_id]['segments'] = segments - else: - neighbour_feature = get_node_feature(neighbour_dict, [node], previous_features) - previous_features.append(neighbour_feature) - terminal_graph.add_node(neighbour_feature.id, feature=neighbour_feature) - terminal_graph.add_edge(node_feature.id, neighbour_feature.id, **debug_properties) - elif neighbour_dict['type'] == 'segment': - closest_feature_id = None - closest_distance = None - last_segment = None - for segment_id in neighbour_dict['subgraph'].graph['segment-ids']: - (segment_end, distance) = self.__closest_segment_node_to_point(node_feature_centre, segment_id) - if (segment_end is not None - and segment_end in route_graph - and (closest_distance is None or distance < closest_distance)): - closest_feature_id = segment_end - closest_distance = distance - last_segment = segment_id - if closest_feature_id is not None: - terminal_graph.add_edge(node_feature.id, closest_feature_id, upstream=True, **debug_properties) - terminal_graph.nodes[node_feature.id]['feature'] = node_feature - terminal_graph.nodes[closest_feature_id]['upstream'] = True - terminal_graph.nodes[closest_feature_id]['segments'] = set([last_segment]) - neighbour_dict['used'] = {closest_feature_id} + neighbour_features = neighbour_dict.get('features', set()) | {self.__flatmap.get_feature(f_id) for f_id in neighbour_dict.get('used', set())} + node_features = ( + [get_node_feature(node_dict, neighbour_features, used_features)] + if len(node_dict['features']) != 2 + else used_features.get(node, set()) + if connectivity_graph.degree(node) > 1 and len(used_features.get(node, set())) in [1, 2] + else set(node_dict['features']) + if connectivity_graph.degree(node) == 1 and all(item not in {node_dict['node'][0], *node_dict['node'][1]} for item in NOT_LATERAL_NODES) + else [get_node_feature(node_dict, neighbour_features, used_features)] + ) + for node_feature in node_features: + used_features.setdefault(node, set()).add(node_feature) + node_feature_centre = node_feature.geometry.centroid + debug_properties = connectivity_graph.get_edge_data(node, neighbour) or {} + if neighbour_dict['type'] == 'feature': + terminal_graph.add_node(node_feature.id, feature=node_feature) + if len(used_ids := neighbour_dict.get('used', set())): + closest_feature_id = self.__closest_feature_id_to_point(node_feature_centre, used_ids) + terminal_graph.add_edge(node_feature.id, closest_feature_id, + upstream=True, **debug_properties) + segments = set() + for connected_edges in route_graph[closest_feature_id].values(): + for edge_dict in connected_edges.values(): + if (segment_id := edge_dict.get('segment')) is not None: + segments.add(segment_id) + terminal_graph.nodes[closest_feature_id]['upstream'] = True + terminal_graph.nodes[closest_feature_id]['segments'] = segments + else: + neighbour_terminal_laterals = [ + k for k in connectivity_graph[neighbour] + if connectivity_graph.degree(k) == 1 and len(connectivity_graph.nodes[k].get('features', [])) == 2 + ] + neighbour_features = ( + neighbour_dict.get('features', []) + if len(neighbour_dict.get('features', [])) <= 2 and degree == 1 and len(node_features) == 1 + and all(item not in {neighbour_dict['node'][0], *neighbour_dict['node'][1]} for item in NOT_LATERAL_NODES) + else [get_node_feature(neighbour_dict, [node_feature], used_features)] + if len(neighbour_terminal_laterals) > 0 and len(used_features.get(neighbour, set())) == 0 + else [get_node_feature(neighbour_dict, [node_feature], used_features)] + ) + for neighbour_feature in neighbour_features: + used_features.setdefault(neighbour, set()).add(neighbour_feature) + terminal_graph.add_node(neighbour_feature.id, feature=neighbour_feature) + terminal_graph.add_edge(node_feature.id, neighbour_feature.id, **debug_properties) + elif neighbour_dict['type'] == 'segment': + closest_feature_id = None + closest_distance = None + last_segment = None + for segment_id in neighbour_dict['subgraph'].graph['segment-ids']: + (segment_end, distance) = self.__closest_segment_node_to_point(node_feature_centre, segment_id) + if (segment_end is not None + and segment_end in route_graph + and (closest_distance is None or distance < closest_distance)): + closest_feature_id = segment_end + closest_distance = distance + last_segment = segment_id + if closest_feature_id is not None: + terminal_graph.add_edge(node_feature.id, closest_feature_id, upstream=True, **debug_properties) + terminal_graph.nodes[node_feature.id]['feature'] = node_feature + terminal_graph.nodes[closest_feature_id]['upstream'] = True + terminal_graph.nodes[closest_feature_id]['segments'] = set([last_segment]) + neighbour_dict['used'] = {closest_feature_id} # Only have our neighbour visit their neighbours if the neighbour is unconnected if degree > 1 and len(neighbour_dict['used']) == 0 and neighbour_dict['type'] == 'feature': neighbours_neighbours.append((neighbour, neighbour_dict)) diff --git a/mapmaker/routing/routedpath.py b/mapmaker/routing/routedpath.py index b84d16d9..3871447a 100644 --- a/mapmaker/routing/routedpath.py +++ b/mapmaker/routing/routedpath.py @@ -326,7 +326,7 @@ def path_geometry(self) -> dict[str, list[GeometricShape]]: """ reference_nodes = {} path_geometry = defaultdict(list) - def connect_gap(node, node_points, added_properties=dict(), iscentreline=False): + def connect_gap(node, node_points, iscentreline=False): # don't fill the gap from centreline to centreline if node in reference_nodes and iscentreline: if reference_nodes[node]['iscenterline']: @@ -344,7 +344,8 @@ def connect_gap(node, node_points, added_properties=dict(), iscentreline=False): bezier_to_linestring(bz), { 'path-id': path_id, 'source': path_source, - } | added_properties)) + 'label': self.__graph.graph.get('label') + })) else: reference_nodes[node] = {'points': node_points, 'iscenterline': iscentreline} if node in reference_nodes and not iscentreline and reference_nodes[node]['iscenterline']: @@ -415,16 +416,17 @@ def connect_gap(node, node_points, added_properties=dict(), iscentreline=False): connect_gap(edge_dict['end-node'], [end_point], iscentreline=True) # Draw paths to terminal nodes - def draw_arrow(start_point, end_point, path_id, path_source, added_properties): + def draw_arrow(start_point, end_point, path_id, path_source): heading = (end_point - start_point).angle end_point -= BezierPoint.fromAngle(heading)*0.9*ARROW_LENGTH path_geometry[path_id].append(GeometricShape.arrow(end_point, heading, ARROW_LENGTH, properties={ 'type': 'arrow', 'path-id': path_id, - 'source': path_source - } | added_properties)) + 'source': path_source, + 'label': self.__graph.graph.get('label') + })) - def draw_line(node_0, node_1, added_properties, tolerance=0.1, separation=2000): + def draw_line(node_0, node_1, tolerance=0.1, separation=2000): start_coords = self.__graph.nodes[node_0]['geometry'].centroid.coords[0] end_coords = self.__graph.nodes[node_1]['geometry'].centroid.coords[0] offset = self.__graph.nodes[node_0]['offsets'][node_1] @@ -440,36 +442,26 @@ def draw_line(node_0, node_1, added_properties, tolerance=0.1, separation=2000): bezier_to_linestring(bz, offset=path_offset), { 'path-id': path_id, 'source': path_source, - } | added_properties)) + })) bz_line_coord = bezier_to_line_coords(bz, offset=path_offset) if self.__graph.degree(node_0) == 1: - draw_arrow(coords_to_point(bz_line_coord[-1]), coords_to_point(bz_line_coord[0]), path_id, path_source, added_properties) + draw_arrow(coords_to_point(bz_line_coord[-1]), coords_to_point(bz_line_coord[0]), path_id, path_source) if self.__graph.degree(node_1) == 1: - draw_arrow(coords_to_point(bz_line_coord[0]), coords_to_point(bz_line_coord[-1]), path_id, path_source, added_properties) - connect_gap(node_0, [coords_to_point(bz_line_coord[0])], added_properties) - connect_gap(node_1, [coords_to_point(bz_line_coord[-1])], added_properties) + draw_arrow(coords_to_point(bz_line_coord[0]), coords_to_point(bz_line_coord[-1]), path_id, path_source) + connect_gap(node_0, [coords_to_point(bz_line_coord[0])]) + connect_gap(node_1, [coords_to_point(bz_line_coord[-1])]) terminal_nodes = set() for node_0, node_1, edge_dict in self.__graph.edges(data=True): ## This assumes node_1 is the terminal... path_id = edge_dict.get('path-id') path_source = edge_dict.get('source') - added_properties = { - 'completeness': edge_dict.get('completeness', True), - 'label': self.__graph.graph.get('label') - } - if (missing_nodes:=edge_dict.get('missing_nodes')) is not None: - added_properties['missing-nodes'] = missing_nodes - if (alert:=self.__graph.graph.get('alert')) is not None: - added_properties['alert'] = alert - if (biological_sex:=self.__graph.graph.get('biological-sex')) is not None: - added_properties['biological-sex'] = biological_sex if ((edge_type := edge_dict.get('type')) == 'terminal' or (edge_type == 'upstream' and 'upstream' not in [self.__graph.nodes[node_0].get('type'), self.__graph.nodes[node_1].get('type')])): # Draw lines... terminal_nodes.update([node_0, node_1]) - draw_line(node_0, node_1, added_properties) + draw_line(node_0, node_1) elif edge_type == 'upstream': terminal_nodes.update([node_0, node_1]) if self.__graph.nodes[node_0].get('type') == 'upstream': @@ -487,28 +479,24 @@ def draw_line(node_0, node_1, added_properties, tolerance=0.1, separation=2000): end_coords = self.__graph.nodes[terminal_node]['geometry'].centroid.coords[0] end_point = coords_to_point(end_coords) heading = (end_point - start_point).angle - end_point -= BezierPoint.fromAngle(heading)*0.9*ARROW_LENGTH - bz = bezier_connect(start_point, end_point, angle, heading) + bz_end_point = end_point - BezierPoint.fromAngle(heading)*0.9*ARROW_LENGTH + bz = bezier_connect(start_point, bz_end_point, angle, heading) path_geometry[path_id].append(GeometricShape( bezier_to_linestring(bz), { 'path-id': path_id, 'source': path_source, - } | added_properties)) + })) bz_line_coord = bezier_to_line_coords(bz) - connect_gap(upstream_node, [coords_to_point(bz_line_coord[0])], added_properties) - connect_gap(terminal_node, [coords_to_point(bz_line_coord[-1])], added_properties) + connect_gap(upstream_node, [coords_to_point(bz_line_coord[0])]) + connect_gap(terminal_node, [coords_to_point(bz_line_coord[-1])]) if self.__trace: path_geometry[path_id].extend(bezier_control_points(bz, label=f'{self.__path_id}-T')) # Draw arrow iff degree(node_1) == 1 if self.__graph.degree(terminal_node) == 1: - path_geometry[path_id].append(GeometricShape.arrow(end_point, heading, ARROW_LENGTH, properties={ - 'type': 'arrow', - 'path-id': path_id, - 'source': path_source - } | added_properties)) + draw_arrow(start_point, end_point, path_id, path_source) else: # This is when the upstream node doesn't have an ongoing centreline - draw_line(upstream_node, terminal_node, added_properties) + draw_line(upstream_node, terminal_node) # Connect edges at branch nodes for node, node_dict in self.__graph.nodes(data=True):