forked from AnatomicMaps/flatmap-maker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
1523 lines (1395 loc) · 89 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#===============================================================================
#
# Flatmap viewer and annotation tools
#
# Copyright (c) 2020 David Brooks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#===============================================================================
"""
Networks are defined in terms of their topological connections and geometric
structures and can be thought of as conduit networks through which individual
wires are routed.
"""
#===============================================================================
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass, field
from functools import partial
import itertools
import math
import sys
import typing
from typing import TYPE_CHECKING, Any, Optional
#===============================================================================
if sys.version_info >= (3, 10):
# Python 3.10
from itertools import pairwise
else:
# Python < 3.10
from itertools import tee
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)
#===============================================================================
from beziers.path import BezierPath
from beziers.point import Point as BezierPoint
import networkx as nx
import shapely.geometry
import structlog
#===============================================================================
from mapmaker.flatmap.feature import Feature
from mapmaker.flatmap.layers import PATHWAYS_TILE_LAYER
from mapmaker.geometry.beziers import bezier_to_linestring, closest_time_distance
from mapmaker.geometry.beziers import coords_to_point
from mapmaker.geometry.beziers import split_bezier_path_at_point
from mapmaker.knowledgebase import AnatomicalNode
from mapmaker.knowledgebase.celldl import FC_CLASS, FC_KIND
from mapmaker.knowledgebase.sckan import PATH_TYPE
from mapmaker.settings import settings
from mapmaker.utils import log
import mapmaker.utils.graph as graph_utils
#===============================================================================
from .options import MIN_EDGE_JOIN_RADIUS
from .routedpath import IntermediateNode, PathRouter, RoutedPath
#===============================================================================
if TYPE_CHECKING:
from mapmaker.flatmap import FlatMap
from mapmaker.properties import PropertiesStore
from mapmaker.properties.pathways import Path
#=============================================================
def expand_centreline_graph(graph: nx.MultiGraph) -> nx.Graph:
#=============================================================
G = nx.Graph()
for node_0, node_1, edge_dict in graph.edges(data=True):
G.add_node(node_0, graph_object='node', **graph.nodes[node_0])
G.add_node(node_1, graph_object='node', **graph.nodes[node_1])
if (segment_id := edge_dict.get('segment')) is not None:
edge_node = segment_id
else:
edge_node = (node_0, node_1)
G.add_node(edge_node, graph_object='edge', edge=(node_0, node_1), **edge_dict)
G.add_edge(node_0, edge_node, graph_object='node')
G.add_edge(edge_node, node_1, graph_object='node')
return G
def collapse_centreline_graph(graph: nx.Graph) -> nx.Graph:
#==========================================================
G = nx.Graph()
seen_edges = set()
for node_or_edge, node_dict in graph.nodes(data=True):
new_dict = node_dict.copy()
graph_object = new_dict.pop('graph_object', None)
if graph_object == 'edge':
edge_nodes = new_dict.pop('edge')
if edge_nodes in seen_edges:
log.warning('Edge ignored as it is already in the route graph', type='conn', node=node_or_edge)
else:
G.add_edge(*edge_nodes, **new_dict)
seen_edges.add(edge_nodes)
elif graph_object == 'node':
G.add_node(node_or_edge, **new_dict)
else:
log.warning('Expanded graph node ignored as it has no `graph type', type='conn', node=node_or_edge)
return G
#===============================================================================
@dataclass
class NetworkNode:
full_id: str
intermediate: bool = False
map_feature: Optional[Feature] = None
feature_id: str = field(init=False)
ftu_id: Optional[str] = field(init=False)
properties: dict[str, Any] = field(default_factory=dict, init=False)
def __post_init__(self):
self.feature_id = self.full_id.rsplit('/', 1)[-1]
self.ftu_id = self.full_id.rsplit('/', 2)[-2] if '/' in self.full_id else None
def __eq__(self, other):
return self.feature_id == other.feature_id
def __hash__(self):
return hash(self.feature_id)
@property
def centre(self) -> Optional[BezierPoint]:
return self.properties.get('centre')
@centre.setter
def centre(self, value: BezierPoint):
self.properties['centre'] = value
@property
def geometry(self):
return self.properties.get('geometry')
@property
def radii(self) -> Optional[tuple[float, float]]:
return self.properties.get('radii')
def set_properties_from_feature(self, feature: Feature):
#=======================================================
self.map_feature = feature
self.properties.update(feature.properties)
self.properties['geometry'] = feature.geometry
if feature.geometry is not None:
centre = feature.geometry.centroid
self.properties['centre'] = BezierPoint(centre.x, centre.y)
radius = max(math.sqrt(feature.geometry.area/math.pi), MIN_EDGE_JOIN_RADIUS)
self.properties['radii'] = (0.999*radius, 1.001*radius)
else:
log.warning('Centreline node has no geometry', type='conn', node=feature.id)
#===============================================================================
class Network(object):
def __init__(self, flatmap: 'FlatMap', network: dict, properties_store: Optional['PropertiesStore']=None):
self.__flatmap = flatmap
self.__id = network.get('id')
self.__type = network.get('type', 'nerve')
self.__log = typing.cast(structlog.BoundLogger, log.bind(type='network', network=self.__id))
self.__centreline_models: dict[str, str] = {} #! Centreline id --> models
self.__centreline_nodes: dict[str, list[NetworkNode]] = defaultdict(list) #! Centreline id --> [Network nodes]
self.__nodes_by_ftu: dict[str, list[NetworkNode]] = defaultdict(list) #! FTU id id --> {Network nodes}
self.__containers_by_centreline = {} #! Centreline id --> set of features that centreline is contained in
self.__models_to_id: dict[str, set[str]] = defaultdict(set) #! Ontological term --> centrelines
self.__feature_ids: set[str] = set()
self.__full_ids: set[str] = set() #! A ``full id`` is a slash-separated list of feature ids
self.__missing_identifiers: set[AnatomicalNode] = set()
self.__end_feature_ids = set()
self.__container_feature_ids = set()
# The following are assigned once we have feature geometry
self.__centreline_graph = nx.MultiGraph() #! Can have multiple paths between nodes which will be contained in different features
self.__containers_by_segment: dict[str, set[str]] = defaultdict(set) #! Segment id --> set of features that segment is contained in
self.__centrelines_by_containing_feature = defaultdict(set) #! Feature id --> set of centrelines that are contained in feature
self.__expanded_centreline_graph: Optional[nx.Graph] = None #! Expanded version of centreline graph
self.__segment_edge_by_segment: dict[str, tuple[str, str, str]] = {} #! Segment id --> segment edge
self.__segment_ids_by_centreline: dict[str, list[str]] = defaultdict(list) #! Centreline id --> segment ids of the centreline
# Track how nodes are associated with centrelines
end_nodes_to_centrelines = defaultdict(list)
intermediate_nodes_to_centrelines = defaultdict(list)
# Check for nerve cuffs in ``contained-in`` lists
if properties_store is not None:
for centreline in network.get('centrelines', []):
if (centreline_id := centreline.get('id')) is not None:
centreline_models = centreline.get('models')
if len(containers := set(centreline.get('contained-in', []))) > 0:
contained_in = []
for feature_id in containers:
if properties_store.properties(feature_id).get('models') is None:
# Container features are of no use if they don't model anything...
self.__log.error('Contained-in feature of centreline does not have an anatomical term',
feature=feature_id, centreline=centreline_id)
elif (models := properties_store.nerve_models_by_id.get(feature_id)) is None:
contained_in.append(feature_id)
elif centreline_models is None:
self.__log.warning('Contained-in feature used as nerve model of its centreline',
feature=feature_id, centreline=centreline_id)
centreline['models'] = models
elif models == centreline_models:
self.__log.warning('Contained-in feature also models nerve of its centreline',
feature=feature_id, centreline=centreline_id)
elif models != centreline_models:
self.__log.error('Contained-in feature models a different nerve than its centreline',
feature=feature_id, centreline=centreline_id)
centreline['contained-in'] = contained_in
# Collect centreline and end node identifiers, checking validity
centrelines_by_end_feature = defaultdict(set)
for centreline in network.get('centrelines', []):
centreline_id = centreline.get('id')
if centreline_id is None:
self.__log.error('Centreline in network does not have an id')
elif centreline_id in self.__centreline_nodes:
self.__log.error('Centreline in network has a duplicate id', centreline=centreline_id)
else:
self.__add_feature(centreline_id)
if (centreline_models := centreline.get('models')) is not None:
self.__models_to_id[centreline_models].add(centreline_id)
# If we have ``properties_store`` without ``centreline_models`` annotation for the centreline then set it
if properties_store is not None:
properties_store.set_property(centreline_id, 'centreline', True)
if (models := properties_store.get_property(centreline_id, 'models')) is None:
properties_store.set_property(centreline_id, 'models', centreline_models)
elif centreline_models != models:
self.__log.error('Centreline models both two entities', entities=[centreline_models, models],
centreline=centreline_id)
if (nerve_id := properties_store.nerve_ids_by_model.get(centreline_models)) is not None:
# Assign nerve cuff id to centreline
properties_store.set_property(centreline_id, 'nerve', nerve_id)
elif properties_store is not None:
properties_store.set_property(centreline_id, 'centreline', True)
if (centreline_models := properties_store.get_property(centreline_id, 'models')) is not None:
# No ``models`` are directly specified for the centreline so assign what we've found
centreline['models'] = centreline_models
elif (centreline_label := centreline.get('label')) is not None:
properties_store.set_property(centreline_id, 'label', centreline_label)
if centreline_models is not None:
self.__models_to_id[centreline_models].add(centreline_id)
self.__centreline_models[centreline_id] = centreline_models
# Check connected nodes
connected_nodes = centreline.get('connects', [])
if len(connected_nodes) < 2:
self.__log.error('Centreline in network has too few nodes', centreline=centreline_id)
else:
self.__add_feature(connected_nodes[0])
self.__add_feature(connected_nodes[-1])
centrelines_by_end_feature[connected_nodes[0]].add(centreline_id)
centrelines_by_end_feature[connected_nodes[-1]].add(centreline_id)
for node_id in connected_nodes:
network_node = NetworkNode(node_id)
self.__centreline_nodes[centreline_id].append(network_node)
if (ftu_id := network_node.ftu_id) is not None:
if network_node not in self.__nodes_by_ftu[ftu_id]:
self.__nodes_by_ftu[ftu_id].append(network_node)
if properties_store is not None:
if (properties_store.get_property(node_id, 'models') is None
and properties_store.get_property(node_id, 'label') is None):
properties_store.set_property(node_id, 'node', True)
self.__containers_by_centreline[centreline_id] = set(centreline.get('contained-in', []))
end_nodes_to_centrelines[connected_nodes[0]].append(centreline_id)
end_nodes_to_centrelines[connected_nodes[-1]].append(centreline_id)
for node_id in connected_nodes[1:-1]:
intermediate_nodes_to_centrelines[node_id].append(centreline_id)
# Check for multiple branches and crossings
for node, centrelines in intermediate_nodes_to_centrelines.items():
if len(centrelines) > 1:
self.__log.error('Node is intermediate node for more than several centrelines',
node=node, centrelines=centrelines)
if len(centrelines := end_nodes_to_centrelines.get(node, [])) > 1:
self.__log.error('Intermediate node branches to several centrelines',
node=node, centrelines=centrelines)
# Separate out branch nodes that make up the segmented centreline graph from intermediate nodes
for centreline_id, nodes in self.__centreline_nodes.items():
for node in nodes:
node.intermediate = (node.full_id not in self.__full_ids)
if node.ftu_id is None and node.feature_id in self.__nodes_by_ftu:
if node not in self.__nodes_by_ftu[node.feature_id]:
self.__nodes_by_ftu[node.feature_id].append(node)
# Create feature_id |--> centreline_id mapping and check containing features are not
# also end features
for centreline_id, containing_features in self.__containers_by_centreline.items():
for feature_id in containing_features:
if (centrelines := centrelines_by_end_feature.get(feature_id)) is None:
self.__centrelines_by_containing_feature[feature_id].add(centreline_id)
else:
self.__log.warning('Container feature of centrelineis also an end node for other centrelines',
feature=feature_id, centreline=centreline_id, centrelines=centrelines)
@property
def id(self):
return self.__id
def __add_feature(self, full_id):
#=================================
self.__full_ids.add(full_id)
for id in full_id.split('/'):
self.__feature_ids.add(id)
def check_features_on_map(self):
#===============================
# Check that the network's features are on the map
for id in sorted(self.__feature_ids):
if not self.__flatmap.has_feature(id):
self.__log.warning('Network feature cannot be found on the flatmap', feature=id)
def has_feature(self, feature):
#==============================
# Is the ``feature`` included in this network?
return (feature.id in self.__feature_ids
or feature.get_property('tile-layer') == PATHWAYS_TILE_LAYER)
def __map_feature(self, feature_id):
#===================================
if feature_id not in self.__missing_identifiers:
if (feature := self.__flatmap.get_feature(feature_id)) is not None:
return feature
self.__log.error('Cannot find network feature', feature=feature_id)
self.__missing_identifiers.add(feature_id)
return None
def create_geometry(self):
#=========================
def add_centreline_graph_node(network_node):
feature_id = network_node.feature_id
if feature_id not in self.__centreline_graph:
self.__centreline_graph.add_node(feature_id, network_node=network_node, **network_node.properties)
# Direction of a path at the node boundary, going towards the centre (radians)
self.__centreline_graph.nodes[feature_id]['edge-direction'] = {}
# Angle of the radial line from the node's centre to a path's intersection with the boundary (radians)
self.__centreline_graph.nodes[feature_id]['edge-node-angle'] = {}
def truncate_segments_at_start(segments, network_node):
# This assumes node centre is close to segments[0].start
node_centre = network_node.centre
radii = network_node.radii
if segments[0].start.distanceFrom(node_centre) > radii[0]:
return segments
n = 0
while n < len(segments) and segments[n].end.distanceFrom(node_centre) < radii[0]:
n += 1
if n >= len(segments):
return segments
bz = segments[n]
u = 0.0
v = 1.0
while True:
t = (u + v)/2.0
point = bz.pointAtTime(t)
if point.distanceFrom(node_centre) > radii[1]:
v = t
elif point.distanceFrom(node_centre) < radii[0]:
u = t
else:
break
# Drop 0 -- t at front of bezier
split = bz.splitAtTime(t)
segments[n] = split[1]
return segments[n:]
def truncate_segments_at_end(segments, network_node):
# This assumes node centre is close to segments[-1].end
node_centre = network_node.centre
radii = network_node.radii
if segments[-1].end.distanceFrom(node_centre) > radii[0]:
return segments
n = len(segments) - 1
while n >= 0 and segments[n].start.distanceFrom(node_centre) < radii[0]:
n -= 1
if n < 0:
return segments
bz = segments[n]
u = 0.0
v = 1.0
while True:
t = (u + v)/2.0
point = bz.pointAtTime(t)
if point.distanceFrom(node_centre) > radii[1]:
u = t
elif point.distanceFrom(node_centre) < radii[0]:
v = t
else:
break
# Drop t -- 1 at end of bezier
split = bz.splitAtTime(t)
segments[n] = split[0]
return segments[:n+1]
def set_segment_geometry(node_id_0, node_1_id, edge_dict):
segments = edge_dict.pop('bezier-path').asSegments()
segment_id = edge_dict['segment']
network_nodes = edge_dict['network-nodes']
start_node = edge_dict.pop('path-start-node')
end_node = edge_dict.pop('path-end-node')
(start_node_id, end_node_id) = (start_node.feature_id, end_node.feature_id)
edge_dict['start-node'] = start_node_id
edge_dict['end-node'] = end_node_id
# Truncate the path at branch nodes
if self.__centreline_graph.degree(node_id_0) >= 2: # type: ignore
if start_node_id == node_id_0:
# This assumes network_nodes[0] centre is close to segments[0].start
segments = truncate_segments_at_start(segments, network_nodes[0])
else:
# This assumes network_nodes[-1] centre is close to segments[-1].end
segments = truncate_segments_at_end(segments, network_nodes[-1])
if self.__centreline_graph.degree(node_1_id) >= 2: # type: ignore
if start_node_id == node_id_0:
# This assumes network_nodes[0] centre is close to segments[-1].end
segments = truncate_segments_at_end(segments, network_nodes[0])
else:
# This assumes network_nodes[-1] centre is close to segments[0].start
segments = truncate_segments_at_start(segments, network_nodes[-1])
# We've now have the possibly truncated path of the centreline segment
# so save it along with geometric information about its end points
edge_dict['bezier-segments'] = segments
bz_path = BezierPath.fromSegments(segments)
edge_dict['geometry'] = bezier_to_linestring(bz_path)
# Direction of a path at the node boundary, going towards the centre (radians)
self.__centreline_graph.nodes[start_node_id]['edge-direction'][segment_id] = segments[0].startAngle + math.pi
# Angle of the radial line from the node's centre to a path's intersection with the boundary (radians)
self.__centreline_graph.nodes[start_node_id]['edge-node-angle'][end_node_id] = (
(segments[0].pointAtTime(0.0) - start_node.centre).angle)
# Direction of a path at the node boundary, going towards the centre (radians)
self.__centreline_graph.nodes[end_node_id]['edge-direction'][segment_id] = segments[-1].endAngle
# Angle of the radial line from the node's centre to a path's intersection with the boundary (radians)
self.__centreline_graph.nodes[end_node_id]['edge-node-angle'][start_node_id] = (
# why segments[0].pointAtTime(0.0) ??
(segments[-1].pointAtTime(1.0) - end_node.centre).angle)
def time_scale(scale, T, x):
return (scale(x) - T)/(1.0 - T)
# Initialise
self.__containers_by_segment = defaultdict(set) #! Segment id --> set of features that segment is contained in
self.__segment_edge_by_segment = {}
self.__segment_ids_by_centreline = defaultdict(list)
segment_edge_ids_by_centreline = defaultdict(list)
for centreline_id, network_nodes in self.__centreline_nodes.items():
if (centreline_feature := self.__map_feature(centreline_id)) is None:
self.__log.warning('Centreline ignored, not on map', centreline=centreline_id)
continue
if (self.__map_feature(network_nodes[0].feature_id) is None
or self.__map_feature(network_nodes[-1].feature_id) is None):
self.__log.error('Centreline ignored, end nodes are not on map', centreline=centreline_id)
centreline_feature.set_property('exclude', not settings.get('authoring', False))
continue
# Set centreline type to that of the network
if self.__type is not None:
centreline_feature.set_property('type', self.__type)
# Set network node properties
valid_nodes = []
for network_node in network_nodes:
feature = self.__map_feature(network_node.feature_id)
if feature is not None:
valid_nodes.append(network_node)
network_node.set_properties_from_feature(feature)
network_nodes = valid_nodes
self.__centreline_nodes[centreline_id] = valid_nodes
bz_path = BezierPath.fromSegments(centreline_feature.get_property('bezier-segments'))
# A node's centre may not be where a centreline into the node finishes
# so check both ends of the Bezier path to decide the curve's direction
node_0_centre = network_nodes[0].centre
node_1_centre = network_nodes[-1].centre
min_distance = None
coords = (0, 0)
for n, centre in enumerate([node_0_centre, node_1_centre]):
for m, t in enumerate([0.0, 1.0]):
distance = centre.distanceFrom(bz_path.pointAtTime(t)) # type: ignore
if min_distance is None or distance < min_distance:
min_distance = distance
coords = (n, m)
path_reversed = (coords[0] != coords[1])
if path_reversed:
centreline_feature.geometry = centreline_feature.geometry.reverse()
# Construct the segmented centreline graph
seg_no = 0
start_index = 0
while start_index < len(network_nodes) - 1:
seg_no += 1
start_node = network_nodes[start_index]
end_index = start_index + 1
# Loop must terminate as network_nodes[-1] is a map feature from above
end_node = network_nodes[end_index]
while end_node.map_feature is None or end_node.intermediate:
end_node.intermediate = True # Nodes without a feature become intermediate nodes
end_index += 1
end_node = network_nodes[end_index]
if start_index > 0 or end_index < (len(network_nodes) - 1):
segment_id = f'{centreline_id}/{seg_no}'
else:
segment_id = centreline_id
# Initialise the graph's node data before creating an edge between them
add_centreline_graph_node(start_node)
add_centreline_graph_node(end_node)
# Add an edge to the segmented centreline graph
edge_feature_ids = (start_node.feature_id, end_node.feature_id)
key = self.__centreline_graph.add_edge(*edge_feature_ids, id=edge_feature_ids,
centreline=centreline_id, segment=segment_id)
path_start_node = end_node if path_reversed else start_node
path_end_node = start_node if path_reversed else end_node
# Split the current Bezier path at the segment boundary and return the remainder
(cl_path, bz_path) = split_bezier_path_at_point(bz_path, path_end_node.centre) # type: ignore
# Save properties for later
edge_id = (*edge_feature_ids, key)
edge_dict = self.__centreline_graph.edges[edge_id]
edge_dict['bezier-path'] = cl_path
edge_dict['reversed'] = path_reversed
edge_dict['path-start-node'] = path_start_node
edge_dict['path-end-node'] = path_end_node
edge_dict['network-nodes'] = network_nodes[start_index:end_index+1]
segment_edge_ids_by_centreline[centreline_id].append(edge_id)
self.__segment_edge_by_segment[segment_id] = edge_id
self.__segment_ids_by_centreline[centreline_id].append(segment_id)
start_index = end_index
# Set the ``degree`` property now that we have the complete graph
for feature_id, node_dict in self.__centreline_graph.nodes(data=True):
node_dict['degree'] = self.__centreline_graph.degree(feature_id)
for node_0, node_1, edge_dict in self.__centreline_graph.edges(data=True):
set_segment_geometry(node_0, node_1, edge_dict)
# Process intermediate nodes
for node_0, node_1, edge_dict in self.__centreline_graph.edges(data=True):
bz_segments = edge_dict.pop('bezier-segments')
bz_path = BezierPath.fromSegments(bz_segments)
edge_id = edge_dict['centreline'] # Used in error messages below
path_reversed = edge_dict['reversed']
segment_id = edge_dict['segment']
# Set intermediate node centres to their closest point on the centreline's path
last_t = 0.0 if not path_reversed else 1.0
for network_node in edge_dict['network-nodes'][1:-1]:
t = closest_time_distance(bz_path, network_node.centre)[0]
if (not path_reversed and t <= last_t
or path_reversed and t >= last_t):
self.__log.error('Centreline nodes are out of sequence...', centreline=segment_id)
else:
network_node.centre = bz_path.pointAtTime(t)
last_t = t
# Find where the centreline's segments cross intermediate nodes
intermediate_geometry = {}
intersection_times = {}
for seg_num, bz_segment in enumerate(bz_segments):
line = bezier_to_linestring(bz_segment)
time_points = []
for network_node in edge_dict['network-nodes'][1:-1]:
if not network_node.properties.get('node', False):
node_id = network_node.feature_id
intermediate_geometry[node_id] = network_node.geometry
intersection_points = []
if network_node.geometry.intersects(line):
intersection = network_node.geometry.boundary.intersection(line)
if isinstance(intersection, shapely.geometry.Point):
intersection_points.append((closest_time_distance(bz_segment, coords_to_point((intersection.x, intersection.y)))[0], ))
else:
intersecting_points = intersection.geoms
if len(intersecting_points) > 2:
self.__log.warning('Intermediate node has multiple intersections with centreline',
node=node_id, centreline=segment_id)
else:
intersection_points.append(sorted((closest_time_distance(bz_segment, coords_to_point((pt.x, pt.y)))[0]
for pt in intersecting_points)))
if len(intersection_points) == 0:
self.__log.warning("Intermediate node doesn't intersect centreline",
node=node_id, centreline=segment_id)
else:
time_points.extend(((pts, node_id) for pts in intersection_points))
intersection_times[seg_num] = sorted(time_points)
# Break the centreline segment into components, consisting of Bezier segments and intermediate
# nodes. This is what is used for rendering paths along the centreline segment
path_components = []
last_intersection = None
for seg_num, bz_segment in enumerate(bz_segments):
prev_intersection = None
scale = partial(time_scale, lambda x: x, 0.0)
node_intersections = intersection_times[seg_num]
intersection_num = 0
while intersection_num < len(node_intersections):
times, node_id = node_intersections[intersection_num]
if len(times) == 0:
continue
node_geometry = intermediate_geometry[node_id]
time_0 = scale(times[0])
if len(times) == 1:
# path touches feature
if last_intersection is not None:
assert node_id == last_intersection[1]
# check times[0] < 0.5 ??
bz_parts = bz_segment.splitAtTime(time_0)
path_components.append(IntermediateNode(node_geometry, last_intersection[0].startAngle, bz_parts[0].endAngle))
bz_segment = bz_parts[1]
scale = partial(time_scale, scale, time_0)
last_intersection = None
elif (intersection_num + 1) == len(node_intersections):
# check times[0] > 0.5 ??
bz_parts = bz_segment.splitAtTime(time_0)
path_components.append(bz_parts[0])
last_intersection = (bz_parts[1], node_id)
else:
self.__log.error('Node only intersects once with centreline', node=node_id, centreline=edge_id)
else:
# path crosses feature
if prev_intersection is not None and prev_intersection[0] >= times[0]:
self.__log.error('Intermediate nodes overlap on centreline', nodes=[prev_intersection[1], node_id], centreline=edge_id)
else:
bz_parts = bz_segment.splitAtTime(time_0)
path_components.append(bz_parts[0])
bz_segment = bz_parts[1]
scale = partial(time_scale, scale, time_0)
time_1 = scale(times[1])
bz_parts = bz_segment.splitAtTime(time_1)
path_components.append(IntermediateNode(node_geometry, bz_parts[0].startAngle, bz_parts[0].endAngle))
bz_segment = bz_parts[1]
scale = partial(time_scale, scale, time_1)
prev_intersection = (times[1], node_id)
intersection_num += 1
if last_intersection is None:
path_components.append(bz_segment)
if last_intersection is not None:
self.__log.error('Last intermediate node on centreline only intersects once', node=last_intersection[1], centreline=edge_id)
edge_dict['path-components'] = path_components
# Map container features to their centreline segments
for centreline_id, feature_ids in self.__containers_by_centreline.items():
if len(segment_edge_ids := segment_edge_ids_by_centreline[centreline_id]) == 1:
segment_id = self.__centreline_graph.edges[segment_edge_ids[0]]['segment']
# assert centreline_id == segment_id
self.__containers_by_segment[segment_id] = feature_ids
else:
for feature_id in feature_ids:
# Find segment for the container node
feature = self.__map_feature(feature_id)
if feature is not None:
node_geometry = feature.geometry
longest_match = 0
longest_segment_edge = None
for segment_edge_id in segment_edge_ids:
segment_line_geometry = self.__centreline_graph.edges[segment_edge_id]['geometry']
intersection = node_geometry.intersection(segment_line_geometry.buffer(20))
if not intersection.is_empty:
if intersection.length > longest_match:
longest_segment_edge = segment_edge_id
if longest_segment_edge is not None:
segment_id = self.__centreline_graph.edges[longest_segment_edge]['segment']
self.__containers_by_segment[segment_id].add(feature_id)
self.__expanded_centreline_graph = expand_centreline_graph(self.__centreline_graph)
def route_graph_from_path(self, path: 'Path') -> Optional[nx.Graph]:
#===================================================================
return self.__route_graph_from_connectivity(path)
def layout(self, route_graphs: dict[str, nx.Graph]) -> dict[int, RoutedPath]:
#============================================================================
path_router = PathRouter()
for path_id, route_graph in route_graphs.items():
path_router.add_path(path_id, route_graph)
# Layout the paths and return the resulting routes
return path_router.layout()
def __segment_properties_from_ids(self, centreline_ids: Iterable[str]) -> dict[str, Any]:
#========================================================================================
properties = {}
nerve_ids = set()
segment_ids = set()
for centreline_id in centreline_ids:
if ((feature := self.__map_feature(centreline_id)) is not None
and (nerve_id := feature.get_property('nerve')) is not None):
# Save the id of the centreline's nerve cuff
nerve_ids.add(nerve_id)
segment_ids.update(self.__segment_ids_by_centreline[centreline_id])
properties['subgraph'] = self.__centreline_graph.edge_subgraph([self.__segment_edge_by_segment[segment_id]
for segment_id in segment_ids]).copy()
properties['subgraph'].graph['segment-ids'] = segment_ids
properties['nerve-ids'] = nerve_ids
properties['type'] = 'segment'
return properties
def __feature_properties_from_node(self, connectivity_node: AnatomicalNode) -> dict[str, Any]:
#=============================================================================================
# # Allow to use the identified alias
if (matched:=self.__flatmap.features_for_anatomical_node(connectivity_node, warn=True)) is not None:
if connectivity_node.name != matched[0].name:
connectivity_node = matched[0]
# Find the features and centrelines that corresponds to a connectivity node
properties: dict[str, Any] = {
'node': connectivity_node,
'name': connectivity_node.name,
'type': None,
'contains': set(),
'used': set()
}
# Can we directly identify the centreline from nodes anatomical base term?
if (centreline_ids := self.__models_to_id.get(connectivity_node[0])) is not None:
if len(connectivity_node[1]) > 0:
self.__log.error('Node has centreline inside layers', name=connectivity_node.full_name)
properties.update(self.__segment_properties_from_ids(centreline_ids))
elif matched is not None:
properties['name'] = matched[0].name
features = set(f for f in matched[1] if f.id is not None and not f.get_property('unrouted', False))
if len(features):
properties['type'] = 'feature'
properties['features'] = features
elif connectivity_node not in self.__missing_identifiers:
properties['warning'] = {
'msg': 'Cannot find feature for connectivity node',
'node': connectivity_node,
'name': connectivity_node.full_name
}
self.__missing_identifiers.add(connectivity_node)
return properties
def __closest_feature_id_to_point(self, point, node_feature_ids) -> Optional[str]:
#=================================================================================
# Find feature id of feature that is closest to ``point``.
closest_feature_id: Optional[str] = None
closest_distance = -1
for node_id in node_feature_ids:
node_centre = self.__centreline_graph.nodes[node_id]['geometry'].centroid
distance = point.distance(node_centre)
if closest_feature_id is None or distance < closest_distance:
closest_distance = distance
closest_feature_id = node_id
return closest_feature_id
def __closest_segment_node_to_point(self, point, segment_id) -> tuple[Optional[str], float]:
#===========================================================================================
# Find segment's node that is closest to ``point``.
closest_node = None
closest_distance = -1
for node_id in self.__segment_edge_by_segment[segment_id][0:2]:
node_centre = self.__centreline_graph.nodes[node_id]['geometry'].centroid
distance = point.distance(node_centre)
if closest_node is None or distance < closest_distance:
closest_distance = distance
closest_node = node_id
return (closest_node, closest_distance)
def __route_graph_from_connectivity(self, path: 'Path', debug=False) -> Optional[nx.Graph]:
#==========================================================================================
connectivity_graph = path.connectivity
# Map connectivity nodes to map features and centrelines, storing the result
# in the connectivity graph
for node, node_dict in connectivity_graph.nodes(data=True):
node_dict.update(self.__feature_properties_from_node(node))
if (warning := node_dict.pop('warning', None)) is not None:
if (msg := warning.pop('msg', None)) is not None:
self.__log.warning(msg, **warning)
def bypass_missing_node(ms_node):
if len(neighbours:=list(connectivity_graph.neighbors(ms_node))) > 1:
predecessors, successors = [], []
for neighbour in neighbours:
if neighbour == connectivity_graph.edges[(ms_node, neighbour)]['predecessor']:
predecessors += [neighbour]
elif neighbour == connectivity_graph.edges[(ms_node, neighbour)]['successor']:
successors += [neighbour]
if len(predecessors) == 0 or len(successors) == 0:
predecessors =neighbours
successors = neighbours
for e in [edge for edge in itertools.product(predecessors,successors) if (edge[0]!=edge[1])]:
ms_nodes = list(set([ms_node] + connectivity_graph.edges[(e[0], ms_node)].get('missing_nodes', []) + \
connectivity_graph.edges[(ms_node, e[1])].get('missing_nodes', [])))
connectivity_graph.add_edges_from(
[e],
completeness = False,
missing_nodes = ms_nodes,
predecessor = e[0],
successor = e[1],
)
connectivity_graph.remove_nodes_from([ms_node])
# Removing missing nodes (in FC and AC)
if settings.get('NPO', False):
missing_nodes = [c for c in connectivity_graph.nodes if c in self.__missing_identifiers]
for ms_node in missing_nodes:
bypass_missing_node(ms_node)
# Merging duplicate nodes due to aliasing
if settings.get('NPO', False):
group_nodes = {}
for node, node_dict in connectivity_graph.nodes(data=True):
if (att_node:=node_dict['node']) not in group_nodes:
group_nodes[att_node] = []
group_nodes[att_node] += [node]
for g_node, ref_nodes in group_nodes.items():
if len(ref_nodes) > 1:
if g_node in ref_nodes:
ref_nodes.remove(g_node)
else:
g_node = ref_nodes[0]
ref_nodes = ref_nodes[1:]
for ref_node in ref_nodes:
connectivity_graph = nx.contracted_nodes(connectivity_graph, g_node, ref_node, self_loops=False)
if path.trace:
for node, node_dict in connectivity_graph.nodes(data=True):
node_data = {}
if node_dict['type'] == 'feature':
node_data['features'] = {f.id for f in node_dict['features']}
elif node_dict['type'] == 'segment':
node_data['segments'] = node_dict['subgraph'].graph['segment-ids']
log.info('Connectivity for node', type='trace', path=path.id, node=node, data=node_data)
log.info('Connectivity edges', type='trace', path=path.id, edges=connectivity_graph.edges)
# Go through all the nodes that map to a feature and flag those which enclose
# an immediate neighbour so that we don't draw a path to the containing node
# but instead will pass through it.
feature_nodes = set()
seen_pairs = set()
for node, node_dict in connectivity_graph.nodes(data=True):
if node_dict['type'] == 'feature':
feature_nodes.add(node)
for neighbour in connectivity_graph.neighbors(node):
if (node, neighbour) in seen_pairs:
continue
seen_pairs.add((node, neighbour))
seen_pairs.add((neighbour, node))
neighbour_dict = connectivity_graph.nodes[neighbour]
if neighbour_dict['type'] == 'feature':
feature_nodes.add(neighbour)
if node_dict['name'] == neighbour_dict['name']:
self.__log.error('Adjacent connectivity nodes are identical!', path=path.id, nodes=node_dict["name"])
elif neighbour_dict['name'].startswith(node_dict['name']):
# node contains neighbour
node_dict['contains'].add(neighbour)
neighbour_dict['contained-by'] = node
elif node_dict['name'].startswith(neighbour_dict['name']):
# neighbour contains node
neighbour_dict['contains'].add(node)
node_dict['contained-by'] = neighbour
for node in feature_nodes:
if len(connectivity_graph.nodes[node]['contains']) == 0:
while (container := connectivity_graph.nodes[node].get('contained-by')) is not None:
connectivity_graph.nodes[container]['exclude'] = True
node = container
# Create the route graph for the connectivity path
route_graph = nx.MultiGraph()
# Nerves and nodes used by the path
path_nerve_ids = set()
path_node_ids = set()
# Helper functions
def add_route__with_edge_dict(node_0, node_1, edge_dict):
route_graph.add_node(node_0, **self.__centreline_graph.nodes[node_0])
route_graph.add_node(node_1, **self.__centreline_graph.nodes[node_1])
route_graph.add_edge(node_0, node_1, **edge_dict)
path_node_ids.update(node.feature_id for node in edge_dict['network-nodes'])
def add_route_edges_from_graph(G, used_nodes):
node_feature_ids = set()
for node_0, node_1, edge_dict in G.edges(data=True):
add_route__with_edge_dict(node_0, node_1, edge_dict)
node_feature_ids.update({node_0, node_1})
for node in used_nodes:
node_dict = connectivity_graph.nodes[node]
node_dict['used'] = node_feature_ids
# Add directly identified centreline segments to the route, noting path nodes and
# nerve cuff identifiers
no_sub_segment_nodes = []
node_with_segments = {}
for node, node_dict in connectivity_graph.nodes(data=True):
node_type = node_dict['type']
if node_type == 'segment':
path_nerve_ids.update(node_dict['nerve-ids'])
segment_graph = node_dict['subgraph']
if segment_graph.number_of_edges() > 1:
# Get set of neighbouring features
neighbouring_ids = set()
for neighbour in connectivity_graph.neighbors(node):
neighbour_dict = connectivity_graph.nodes[neighbour]
if neighbour_dict['type'] == 'feature':
neighbouring_ids.update(f.id for f in neighbour_dict['features'])
elif neighbour_dict['type'] == 'segment':
neighbouring_ids.update(neighbour_dict['subgraph'].nodes)
segment_graph = graph_utils.get_connected_subgraph(segment_graph, neighbouring_ids)
if segment_graph.number_of_edges():
# Add segment edges used by our path to the route
add_route_edges_from_graph(segment_graph, {node})
node_with_segments[node] = segment_graph
else:
self.__log.warning('Cannot find any sub-segments of centreline', path=path.id, entity=node_dict['name'])
node_dict['type'] = 'no-segment'
no_sub_segment_nodes += [node] # store node with undecided sub-segments
# Handles unidentified segments related to centerline type nodes
# Looks for the closest feature to the connected node, then determines the segments
new_direct_edges = set()
tmp_edge_dicts = {}
for node in no_sub_segment_nodes:
segment_graph = connectivity_graph.nodes[node]['subgraph']
if segment_graph.number_of_edges() > 1:
neighbouring_ids = set()
updated_neighbouring_ids = set()
closest_feature_dict = {} # .geometry.centroid.distance sometime is inconsistent
for neighbour in connectivity_graph.neighbors(node):
neighbour_dict = connectivity_graph.nodes[neighbour]
if len(neighbour_dict.get('contains', set())) > 0:
neighbour_dict = connectivity_graph.nodes[next(iter(neighbour_dict['contains']))]
edge_dict = connectivity_graph.edges[(node, neighbour)]
if neighbour_dict['type'] == 'feature':
# if the node is a no-segment and neighbour is a terminal, connect to all neighbour features
# else connect to one closest no_segment point and neighbour feature.
features = (
neighbour_dict['features']
if len(neighbour_dict['features']) <= 2
else sorted(neighbour_dict['features'], key=lambda f: f.id)[:1]
)
for feature in features:
neighbouring_ids.update([feature.id])
closest_feature_id = None
for n, s in itertools.product([feature.id], segment_graph.nodes):
if (edge_dicts := self.__centreline_graph.get_edge_data(n, s)) is not None:
closest_feature_dict[n] = s
tmp_edge_dicts[(n, s)] = edge_dicts[0]
break
if feature.id not in closest_feature_dict:
closest_feature_id = self.__closest_feature_id_to_point(feature.geometry.centroid, segment_graph.nodes)
closest_feature_dict[feature.id] = closest_feature_id
tmp_edge_dicts[(feature.id, closest_feature_id)] = edge_dict
elif neighbour_dict['type'] in ['segment', 'no-segment']: # should check this limitation
candidates= {}
for n, s in itertools.product(neighbour_dict['subgraph'].nodes, segment_graph.nodes):
if (nf:=self.__map_feature(n)) is not None and (sf:=self.__map_feature(s)) is not None:
candidates[(n,s)] = nf.geometry.centroid.distance(sf.geometry.centroid)
tmp_edge_dicts[(n,s)] = edge_dict
if len(candidates) > 0:
selected_c = min(candidates, key=candidates.get) # type: ignore
neighbouring_ids.update([selected_c[0]])
closest_feature_dict[selected_c[0]] = selected_c[1]
for n_id in neighbouring_ids:
if n_id in segment_graph:
updated_neighbouring_ids.update([n_id])
else:
if self.__map_feature(n_id) is not None:
if (closest_feature_id:=closest_feature_dict.get(n_id)) is not None:
updated_neighbouring_ids.update([closest_feature_id])
new_direct_edges.update([(n_id, closest_feature_id)])
segment_graph = graph_utils.get_connected_subgraph(segment_graph, updated_neighbouring_ids)
if segment_graph.number_of_edges():
# Add segment edges used by our path to the route
add_route_edges_from_graph(segment_graph, {node})