-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstrata_solver.py
452 lines (367 loc) · 19 KB
/
strata_solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
'''
This program generates a set of plausible stratigraphies with uncertainties, for a given drillhole lithology log.
It uses map data for distance and topology constraints, and several free parameters describing the solution complexity (level of deformation) constraints.
Author: Vitaliy Ogarko, [email protected]
The University of Western Australia
'''
import numpy as np
from dataclasses import dataclass, field
from typing import List
import networkx as nx
from strata_solution import StrataSolution
#==================================================================================================================
@dataclass
class StrataSolverParameters:
'''
The strata solver parameters holder.
'''
# Unit contact topology constraints.
add_topology_constraints: bool = False
# Jumps over units in topology graph to allow skipping units: i.e., one jump would allow contact A->C for the graph A->B->C.
max_num_strata_jumps: int = 0
# 'Age alignment' constraints: the maximum number of times the age direction can flip.
max_num_age_flips: int = 0
# 'Returning to the same unit' constraints.
max_num_returns_per_unit: int = 0
# The number of unit contacts inside the same litholgy sequence.
max_num_unit_contacts_inside_litho: int = 0
# Use the single closest unit for the top (first) lithology.
single_top_unit: bool = False
#==================================================================================================================
@dataclass
class StrataTableElement:
'''
The element of a strata table.
'''
path_exists: bool = False
lithos: List[str] = field(default_factory=list)
#==================================================================================================================
def generate_strata_table(drillsample_data, strata_data, single_top_unit):
'''
Generates the stratigraphic table, and unit names list.
'''
num_rows = drillsample_data.get_num_rows()
num_units = strata_data.get_num_units()
unit_names = strata_data.get_unit_names()
print("num_rows (before) = ", num_rows)
print("num_units = ", num_units)
strata_table = np.ndarray(shape=[num_rows, num_units], dtype=object)
# Initialize with default objects.
strata_table.flat = [StrataTableElement() for _ in strata_table.flat]
new_row_index = 0
# Missing lithologies.
missing_lithos = set()
for row in drillsample_data.rows[:]:
any_litho_found = False
# Loop over lithos in the current drillsample row.
for litho in row.lithos[:]:
litho_found = False
if (new_row_index == 0 and single_top_unit):
# Use only the closest unit for the top lithology.
if (litho in strata_data.litho2dist):
# Sorted distance list for this lithology.
dist_list = strata_data.litho2dist[litho]
closest_unit_distance = dist_list[0][0]
closest_unit = dist_list[0][1]
print('Closest top unit info (litho, unit, distance):', [litho, closest_unit, closest_unit_distance])
for unit_name in strata_data.unit2litho:
if (unit_name == closest_unit):
litho_found = True
unit_index = unit_names.index(unit_name)
strata_table[new_row_index, unit_index].path_exists = True
strata_table[new_row_index, unit_index].lithos.append(litho)
else:
for unit_name in strata_data.unit2litho:
if (litho in strata_data.unit2litho[unit_name]):
litho_found = True
unit_index = unit_names.index(unit_name)
strata_table[new_row_index, unit_index].path_exists = True
strata_table[new_row_index, unit_index].lithos.append(litho)
if (not litho_found):
# Drillhole lithology not found in units data.
print("WARNING: Drillhole lithologies not found in units data: ", litho)
# Remove this lithology and its score from drillsample data for the current row.
del row.scores[row.lithos.index(litho)]
row.lithos.remove(litho)
# Update the missing lithos list.
missing_lithos.add(litho)
else:
any_litho_found = True
if (not any_litho_found):
# Treat this as "no data".
drillsample_data.rows.remove(row)
else:
new_row_index += 1
num_rows = drillsample_data.get_num_rows()
print("num_rows (after) = ", num_rows)
if (num_rows == 0):
print('No data left!!!')
return np.ndarray(shape=[num_rows, num_units], dtype=object), set()
# Remove rows due to missing lithologies.
strata_table = strata_table[0:num_rows, :]
return strata_table, missing_lithos
#==================================================================================================================
'''
A class for storing the stratigraphic route.
'''
class StrataRoute:
__slots__ = 'to_remove', 'path', 'current_thickness', 'unit_visited', 'num_unit_contacts_inside_litho', 'num_age_flips', 'is_age_aligned', 'num_contacts'
def __init__(self, num_units):
# Flag for removal.
self.to_remove = False
# Containts the number of times each unit was visited.
self.unit_visited = np.zeros((num_units), dtype=int)
# The number of unit contacts inside the same lithology sequence.
self.num_unit_contacts_inside_litho = 0
# The current number of age direction flips.
self.num_age_flips = 0
# Flag for the age direction of the last unit contact.
self.is_age_aligned = False
# The number of unit contacts.
self.num_contacts = 0
def add_first_position(self, strat, thickness_change):
'''
Adds the first position to the route.
'''
# Unit index for every drillhole data.
self.path = [strat]
# The thickness of the last strata unit.
self.current_thickness = thickness_change
# Mark this unit as 'visited'.
self.unit_visited[strat] += 1
def get_strata_sequence(self):
'''
Returns a sequence of strata units in the route excluding consequtive dublicates.
'''
return tuple([v for i, v in enumerate(self.path) if i == 0 or v != self.path[i - 1]])
#==================================================================================================================
def flatten(S):
'''
Flattens the multilevel list of lists.
For example, it will convert [[[1,1],2,2],3,3] to [1,1,2,2,3,3].
'''
if S == []:
return S
if isinstance(S[0], list):
return flatten(S[0]) + flatten(S[1:])
return S[:1] + flatten(S[1:])
#==================================================================================================================
def apply_max_num_returns_constraint(route, strata_list, max_num_returns):
'''
Apply the "maximum number of returns to a unit" constraint:
remove from the input unit list the units where the route cannot return anymore.
'''
# Apply the "max number of returns" constraint.
for strat in strata_list[:]:
if (route.unit_visited[strat] - 1 >= max_num_returns):
# Reached the maximum numer of local returns (to this unit).
strata_list.remove(strat)
#==================================================================================================================
def apply_topology_constraints(graph, unit_names, strat0, strata_list, max_num_jumps):
'''
Apply unit topology (connectivity) constraints:
remove from the input unit list the units not connected to a given one (strat0).
'''
if (unit_names[strat0] in graph.nodes()):
for strat in strata_list[:]:
if (not nx.has_path(graph, unit_names[strat0], unit_names[strat])):
# Units are not connected (via any path). Skip this unit.
strata_list.remove(strat)
else:
path_length = len(nx.shortest_path(graph, unit_names[strat0], unit_names[strat]))
if (path_length - 2 > max_num_jumps):
# The number of jumps to connect these units exceeded the limit. Skip this unit.
strata_list.remove(strat)
#==================================================================================================================
def _same_lithos(lithos1, lithos2, alternative_rock_names):
'''
Test if two litho lists are the same considering their alternative rock names.
'''
def _build_litho_group(lithos, alternative_rock_names):
group = set()
for litho in lithos:
if (litho in alternative_rock_names):
# Adding group id.
group.add(alternative_rock_names[litho])
else:
# Addding litho name - it has no alternative names.
group.add(litho)
return group
group1 = _build_litho_group(lithos1, alternative_rock_names)
group2 = _build_litho_group(lithos2, alternative_rock_names)
return (group1 == group2)
#==================================================================================================================
def print_info(row, drillsample_data, num_routes):
'''
Print progress info.
'''
if (num_routes >= 0):
print(row, drillsample_data.rows[row].depth_from, drillsample_data.rows[row].depth_to, drillsample_data.rows[row].lithos, num_routes)
else:
print(row, drillsample_data.rows[row].depth_from, drillsample_data.rows[row].depth_to, drillsample_data.rows[row].lithos, end = "\r")
#==================================================================================================================
def generate_strat_routes(spar, strata_data, drillsample_data, thickness_data, map_graph, alternative_rock_names):
'''
Generating stratigraphic routes.
'''
print('Starting strata solver with:', spar)
if (len(drillsample_data.rows) == 0):
# Empty drillsample data - return.
return StrataSolution([], [], [], drillsample_data.get_depth_data(), dict())
# Generating the table of possible strata paths.
strata_table, missing_lithos = generate_strata_table(drillsample_data, strata_data, spar.single_top_unit)
# Unit index to unit name mapping.
unit_names = strata_data.get_unit_names()
num_rows = strata_table.shape[0]
num_units = strata_table.shape[1]
# Local flags are based on the function input data.
add_thickness_constraints = (len(thickness_data) > 0)
if (spar.add_topology_constraints):
# Sanity check: check that strata units exist in the graph.
for unit_name in unit_names:
if unit_name not in map_graph.nodes():
print("WARNING: Not found graph unit: ", unit_name)
# Extract strata thikcness to lists (faster data structures).
min_strata_thickness = [d.mean - d.range for d in thickness_data]
max_strata_thickness = [d.mean + d.range for d in thickness_data]
all_routes = []
all_routes_number = []
# Set the initial routes.
row = 0
thickness_change = drillsample_data.get_thickness_change(row)
for strat in range(num_units):
if (strata_table[row, strat].path_exists):
new_route = StrataRoute(num_units)
new_route.add_first_position(strat, thickness_change)
# Adding new route into the list.
all_routes.append(new_route)
print("Starting routes:")
print([r.path for r in all_routes])
row_max = num_rows
print("row_max = ", row_max)
# Print the starting info.
print_info(0, drillsample_data, len(all_routes))
# Going through the strata table and generating the routes.
for row in range(1, row_max):
# Print the info.
print_info(row, drillsample_data, -1)
# The drillhole lithos.
# Note: we deliberately consider the full list of drillsample lithos instead of lithos on the route.
# Because considering the route lithos may lead to exponential growth of number of routes due to frequent unit change.
current_lithos = drillsample_data.rows[row].lithos
previous_lithos = drillsample_data.rows[row - 1].lithos
thickness_change = drillsample_data.get_thickness_change(row)
new_routes = []
# Allowed strata units for a unit change.
strata_allowed = [strat for strat in range(num_units) if (strata_table[row, strat].path_exists)]
same_lithos = False
# Avoid using alternative names here as this led to missing a contact (when changing from Quartzite to Sandstone in SA collarID=265020).
#if (_same_lithos(current_lithos, previous_lithos, alternative_rock_names)):
if (set(current_lithos) == set(previous_lithos)):
same_lithos = True
# Iterate over all routes.
for route in all_routes:
# The current strata index.
strat0 = route.path[-1]
current_thickness = route.current_thickness
#--------------------------------------------------------------------
# Check if we can go down in other stratas (and create new routes).
#--------------------------------------------------------------------
can_change = True
# Add 'unit contacts inside the same litho' constraints.
if (same_lithos):
# Constrain the maximum number of unit contacts inside a litho sequence.
if (route.num_unit_contacts_inside_litho >= spar.max_num_unit_contacts_inside_litho):
can_change = False
else:
# Reset.
route.num_unit_contacts_inside_litho = 0
# Apply unit thickness constraints.
if (add_thickness_constraints):
# Ignore thickness for the top unit
if (len(route.path) > 1):
can_change = can_change and (current_thickness >= min_strata_thickness[strat0])
if (can_change):
# Strata units excluding the current one.
strata_list = [s for s in strata_allowed if (s != strat0)]
# Applying the "maximum number of returns to a unit" constraint.
apply_max_num_returns_constraint(route, strata_list, spar.max_num_returns_per_unit)
# Apply unit topology constraints.
if (spar.add_topology_constraints):
apply_topology_constraints(map_graph, unit_names, strat0, strata_list, spar.max_num_strata_jumps)
if (len(strata_list) != 0):
# Copy the route to create references to it below.
old_path = route.path.copy()
# Looking to which strata unit we can change.
for strat in strata_list:
# Making the new route.
new_route = StrataRoute(num_units)
# Disabled age flips as it is incompatible with strata jumps (Can make a second graph with reversed age directions to allow this).
# if (spar.add_topology_constraints):
# # Processing age flips.
# e = (unit_names[strat0], unit_names[strat])
# new_route.is_age_aligned = map_graph.has_edge(*e)
# if (route.num_contacts > 0):
# if (new_route.is_age_aligned != route.is_age_aligned):
# new_route.num_age_flips = route.num_age_flips + 1
# else:
# new_route.num_age_flips = route.num_age_flips
#
# # Apply the age alignment constraints.
# if (new_route.num_age_flips > spar.max_num_age_flips):
# # Skip this route.
# continue
# New path contains the reference to the old path, and the new route position.
# Note: we are not copying the full old path, but only store a reference to it to save memory.
new_route.path = [old_path, strat]
new_route.current_thickness = thickness_change
np.copyto(new_route.unit_visited, route.unit_visited)
# Count this unit as visited.
new_route.unit_visited[strat] += 1
# Processing the unit contact inside the same lithology sequence.
if (same_lithos):
new_route.num_unit_contacts_inside_litho = route.num_unit_contacts_inside_litho + 1
else:
new_route.num_unit_contacts_inside_litho = 0
new_route.num_contacts = route.num_contacts + 1
# Adding new route into the list.
new_routes.append(new_route)
#-----------------------------------------------------------------
# Check if we can go down the same srata unit (if cannot -- remove the current route).
#-----------------------------------------------------------------
can_stay = True
if (add_thickness_constraints):
# Apply unit thickness constraints.
can_stay = can_stay and (current_thickness < max_strata_thickness[strat0])
path_exists = strata_table[row, strat0].path_exists
can_stay = can_stay and path_exists
# Processing the route.
if (can_stay):
# Adding new route position.
route.path.append(strat0)
route.current_thickness += thickness_change
else:
# Did not reach the end of a drillhole, and cannot go down the same unit.
# Mark the route for removal.
route.to_remove = True
# Remove the routes marked for removal.
all_routes = [route for route in all_routes if not route.to_remove]
# Addig new routes.
all_routes.extend(new_routes)
# Update the number of routes.
num_routes = len(all_routes)
all_routes_number.append(num_routes)
# Print the info.
print_info(row, drillsample_data, num_routes)
if (num_routes == 0):
break
#------------------------------------------------------------------
# Adding the final number of routes.
all_routes_number.append(len(all_routes))
# Flatten the multilevel list of lists: convert [[[1,1],2,2],3,3] to [1,1,2,2,3,3].
for route in all_routes:
route.path = flatten(route.path)
# Create the solution object.
depth_data = drillsample_data.get_depth_data()
solution = StrataSolution(all_routes, all_routes_number, unit_names, depth_data, strata_data.unit2dist)
return solution