-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcondense_crate.py
More file actions
830 lines (679 loc) · 28.6 KB
/
condense_crate.py
File metadata and controls
830 lines (679 loc) · 28.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
#!/usr/bin/env python3
"""
condense_crate.py -- RO-Crate Graph Condensation for Interpretation
Reads an RO-Crate's ro-crate-metadata.json and produces a condensed version
that collapses repetitive provenance chains. Datasets that went through the
same software pipeline are grouped into DatasetGroup summary nodes, with one
representative kept for full provenance tracing.
Similarity is determined by structure and software, not by data content.
Usage:
python condense_crate.py <crate_dir> [--threshold 5] [--output condensed-metadata.json]
"""
import json
import argparse
from pathlib import Path
from collections import defaultdict
from datetime import date
# ---------------------------------------------------------------------------
# Type detection helpers (operate on raw JSON dicts)
# ---------------------------------------------------------------------------
EVI_TYPES = {
"Dataset", "Software", "MLModel", "Computation", "Annotation",
"Experiment", "ROCrate", "CreativeWork", "Schema",
}
def _extract_short_types(node: dict) -> list[str]:
"""Extract all short EVI type names from a node's @type field."""
raw = node.get("@type", [])
if isinstance(raw, str):
raw = [raw]
shorts = []
for t in raw:
short = t.split("#")[-1] if "#" in t else t.split(":")[-1] if ":" in t else t
if short in EVI_TYPES:
shorts.append(short)
return shorts
def get_evi_type(node: dict) -> str | None:
"""Extract the primary EVI type from a node's @type field."""
shorts = _extract_short_types(node)
if not shorts:
return None
# Prefer more specific types over generic ones
for preferred in ("ROCrate", "Computation", "Software", "MLModel",
"Experiment", "Annotation", "Schema"):
if preferred in shorts:
return preferred
return shorts[0]
def is_dataset(node: dict) -> bool:
types = _extract_short_types(node)
return "Dataset" in types and "ROCrate" not in types
def is_computation(node: dict) -> bool:
return get_evi_type(node) == "Computation"
def is_software(node: dict) -> bool:
return get_evi_type(node) == "Software"
def is_rocrate_root(node: dict) -> bool:
return "ROCrate" in _extract_short_types(node)
# ---------------------------------------------------------------------------
# Reference helpers
# ---------------------------------------------------------------------------
def get_id_list(node: dict, *fields) -> list[str]:
"""Extract a list of @id strings from one or more reference fields."""
ids = []
for field in fields:
val = node.get(field, [])
if val is None:
continue
if isinstance(val, dict):
val = [val]
if isinstance(val, list):
for item in val:
if isinstance(item, dict) and "@id" in item:
ids.append(item["@id"])
elif isinstance(item, str):
ids.append(item)
return ids
def get_generatedby_ids(dataset: dict) -> list[str]:
"""Get the @ids of computations that generated this dataset."""
return get_id_list(dataset, "generatedBy", "prov:wasGeneratedBy")
def make_id_ref(entity_id: str) -> dict:
"""Create a {"@id": ...} reference."""
return {"@id": entity_id}
# ---------------------------------------------------------------------------
# Provenance Signature
# ---------------------------------------------------------------------------
def compute_provenance_signature(
dataset_id: str,
index: dict[str, dict],
cache: dict[str, tuple],
) -> tuple:
"""
Compute a hashable signature for a dataset's provenance structure.
Two datasets with the same signature went through identical software
pipelines, regardless of which specific data/computation instances
were involved.
The signature is a recursive tuple:
(format, schema_ids, ((software_ids, (input_signatures, ...)), ...))
Leaf nodes (no generatedBy) produce: (format, schema_ids, None)
schema_ids is a tuple of evi:Schema @id references (empty tuple if none).
Two datasets with no schema both produce () and thus match.
"""
if dataset_id in cache:
return cache[dataset_id]
dataset = index.get(dataset_id)
if dataset is None:
sig = ("unknown", (), None)
cache[dataset_id] = sig
return sig
fmt = dataset.get("format", "unknown")
schema_ids = tuple(sorted(get_id_list(dataset, "evi:Schema")))
gen_comp_ids = get_generatedby_ids(dataset)
if not gen_comp_ids:
sig = (fmt, schema_ids, None)
else:
comp_sigs = []
for comp_id in sorted(gen_comp_ids):
comp = index.get(comp_id)
if comp is None:
comp_sigs.append(((), ()))
continue
# Software @ids define structural identity
sw_ids = tuple(sorted(get_id_list(comp, "usedSoftware")))
# Recursively signature the input datasets
input_dataset_ids = get_id_list(comp, "usedDataset")
input_sigs = tuple(sorted(
compute_provenance_signature(ds_id, index, cache)
for ds_id in input_dataset_ids
))
comp_sigs.append((sw_ids, input_sigs))
sig = (fmt, schema_ids, tuple(sorted(comp_sigs)))
cache[dataset_id] = sig
return sig
# ---------------------------------------------------------------------------
# Output-first backward traversal with inline condensation
# ---------------------------------------------------------------------------
def traverse_and_condense(
index: dict[str, dict],
threshold: int,
max_member_ids: int = 0,
) -> tuple[set[str], set[str], list[dict], dict[str, list[tuple[list[str], str]]]]:
"""
Traverse backward from the primary crate's outputs, keeping everything
reachable and collapsing large groups of sibling datasets that share the
same provenance signature (same software pipeline).
Returns:
keep_ids: set of entity @ids to keep in the condensed graph
remove_ids: set of entity @ids to remove (collapsed group members)
group_nodes: list of DatasetGroup summary nodes to add
comp_updates: dict mapping computation @id -> list of (member_ids, group_id)
"""
# Find root crate node
root_id = None
for nid, node in index.items():
if is_rocrate_root(node):
root_id = nid
break
if root_id is None:
return set(index.keys()), set(), [], {}
root = index[root_id]
# Separate EVI:outputs from hasPart for condensation
output_ids = get_id_list(root, "EVI:outputs")
part_ids = get_id_list(root, "hasPart", "EVI:outputs")
# Always keep structural nodes
keep_ids: set[str] = {root_id, "ro-crate-metadata.json"}
# Track collapsed datasets
collapsed_ids: set[str] = set()
group_nodes: list[dict] = []
comp_updates: dict[str, list[tuple[list[str], str]]] = defaultdict(list)
sig_cache: dict[str, tuple] = {}
# Pre-pass: condense repetitive top-level outputs
output_dataset_ids = [
oid for oid in output_ids
if oid in index and is_dataset(index[oid])
]
if len(output_dataset_ids) > threshold:
sig_to_ids: dict[tuple, list[str]] = defaultdict(list)
for ds_id in output_dataset_ids:
sig = compute_provenance_signature(ds_id, index, sig_cache)
sig_to_ids[sig].append(ds_id)
for sig, member_ids in sig_to_ids.items():
if len(member_ids) > threshold:
representative_id = sorted(member_ids)[0]
non_rep_ids = [mid for mid in member_ids
if mid != representative_id]
collapsed_ids.update(non_rep_ids)
for mid in non_rep_ids:
_collect_exclusive_backward(
mid, representative_id, index, collapsed_ids,
)
group = {
"consuming_comp_id": root_id,
"signature": sig,
"member_ids": member_ids,
"representative_id": representative_id,
}
group_node = create_dataset_group_node(group, index, max_member_ids)
group_nodes.append(group_node)
comp_updates[root_id].append(
(member_ids, group_node["@id"])
)
# Backward traversal
visited: set[str] = set()
stack = list(part_ids)
while stack:
current_id = stack.pop()
if current_id in visited or current_id in collapsed_ids:
continue
visited.add(current_id)
keep_ids.add(current_id)
node = index.get(current_id)
if node is None:
continue
evi_type = get_evi_type(node)
if is_dataset(node):
# Follow generatedBy backward
for comp_id in get_generatedby_ids(node):
stack.append(comp_id)
# Follow schema references
for schema_id in get_id_list(node, "evi:Schema"):
stack.append(schema_id)
elif evi_type == "Computation":
# Keep software
for sw_id in get_id_list(node, "usedSoftware"):
keep_ids.add(sw_id)
stack.append(sw_id)
# Keep ML models
for ml_id in get_id_list(node, "usedMLModel"):
keep_ids.add(ml_id)
stack.append(ml_id)
# Check input datasets for condensation opportunity
input_ids = get_id_list(node, "usedDataset")
if len(input_ids) > threshold:
# Group inputs by provenance signature
sig_to_ids: dict[tuple, list[str]] = defaultdict(list)
for ds_id in input_ids:
sig = compute_provenance_signature(ds_id, index, sig_cache)
sig_to_ids[sig].append(ds_id)
for sig, member_ids in sig_to_ids.items():
if len(member_ids) > threshold:
# Collapse this group
representative_id = sorted(member_ids)[0]
non_rep_ids = [mid for mid in member_ids
if mid != representative_id]
collapsed_ids.update(non_rep_ids)
# Also collapse backward chains of non-representatives
for mid in non_rep_ids:
_collect_exclusive_backward(
mid, representative_id, index, collapsed_ids,
)
group = {
"consuming_comp_id": current_id,
"signature": sig,
"member_ids": member_ids,
"representative_id": representative_id,
}
group_node = create_dataset_group_node(group, index, max_member_ids)
group_nodes.append(group_node)
comp_updates[current_id].append(
(member_ids, group_node["@id"])
)
# Continue tracing through representative only
stack.append(representative_id)
# Non-grouped inputs: trace normally
else:
for ds_id in member_ids:
stack.append(ds_id)
else:
# Few inputs, trace all normally
for ds_id in input_ids:
stack.append(ds_id)
# Also trace generated outputs (forward refs from computation)
for out_id in get_id_list(node, "generated"):
stack.append(out_id)
elif evi_type == "Experiment":
for ref_id in get_id_list(node, "usedSample", "usedInstrument",
"usedTreatment", "usedStain"):
stack.append(ref_id)
elif is_software(node):
pass # leaf, nothing to trace
# Also keep any software nodes in the index (they're shared)
for nid, n in index.items():
if is_software(n):
keep_ids.add(nid)
# Remove collapsed ids from keep_ids
keep_ids -= collapsed_ids
return keep_ids, collapsed_ids, group_nodes, comp_updates
def _collect_exclusive_backward(
dataset_id: str,
representative_id: str,
index: dict[str, dict],
collapsed: set[str],
) -> None:
"""
Collect backward chain entities of a non-representative dataset that are
NOT shared with the representative's chain. Add them to collapsed set.
We only collapse entities that are exclusively part of this non-representative
path (computations and their intermediate datasets). Shared entities like
software are never collapsed.
"""
rep_chain = _collect_backward_chain(representative_id, index)
stack = [dataset_id]
visited = set()
while stack:
cid = stack.pop()
if cid in visited:
continue
visited.add(cid)
# Don't collapse if it's in the representative's chain
if cid in rep_chain:
continue
node = index.get(cid)
if node is None:
continue
# Don't collapse software (shared across pipelines)
if is_software(node):
continue
collapsed.add(cid)
if is_dataset(node):
for comp_id in get_generatedby_ids(node):
stack.append(comp_id)
if is_computation(node):
for ref_id in get_id_list(node, "usedDataset"):
stack.append(ref_id)
def _collect_backward_chain(dataset_id: str, index: dict[str, dict]) -> set[str]:
"""Collect all entity @ids in the backward provenance chain."""
visited = set()
stack = [dataset_id]
while stack:
cid = stack.pop()
if cid in visited:
continue
visited.add(cid)
node = index.get(cid)
if node is None:
continue
if is_dataset(node):
for comp_id in get_generatedby_ids(node):
stack.append(comp_id)
if is_computation(node):
for ref_id in get_id_list(node, "usedDataset", "usedSoftware",
"usedMLModel"):
stack.append(ref_id)
return visited
# ---------------------------------------------------------------------------
# Build condensed graph
# ---------------------------------------------------------------------------
def create_dataset_group_node(
group: dict,
index: dict[str, dict],
max_member_ids: int = 0,
) -> dict:
"""Create a DatasetGroup summary node for a group of similar datasets."""
representative = index[group["representative_id"]]
member_ids = group["member_ids"]
count = len(member_ids)
sig = group["signature"]
# Extract common software from the signature
# Signature shape: (format, schema_ids, comp_sigs_or_None)
common_sw_ids = []
if sig[2]: # has computation signatures
for comp_sig in sig[2]:
sw_ids, _ = comp_sig
common_sw_ids.extend(sw_ids)
common_sw_ids = sorted(set(common_sw_ids))
fmt = sig[0]
# Generate a group @id based on the consuming entity
consuming_node = index.get(group["consuming_comp_id"], {})
if is_rocrate_root(consuming_node):
crate_name = consuming_node.get("name", "unknown").lower().replace(" ", "-")
group_id = f"ark:group/{crate_name}-{fmt.replace('/', '_').lstrip('.')}-outputs"
else:
comp_name = consuming_node.get("name", "unknown").lower().replace(" ", "-")
group_id = f"ark:group/{comp_name}-{fmt.replace('/', '_').lstrip('.')}-inputs"
# Build software name list for description
sw_names = []
for sw_id in common_sw_ids:
sw = index.get(sw_id, {})
sw_names.append(sw.get("name", sw_id))
description = (
f"{count} {fmt} files with identical provenance structure."
)
if sw_names:
description += f" All processed by {', '.join(sw_names)}."
# Extract common schema from the signature
schema_ids = list(sig[1]) if sig[1] else []
node = {
"@id": group_id,
"@type": ["prov:Entity", "https://w3id.org/EVI#DatasetGroup"],
"name": f"{representative.get('name', fmt + ' files')} (and {count - 1} similar)",
"description": description,
"format": fmt,
"evi:memberCount": count,
"evi:representativeDataset": make_id_ref(group["representative_id"]),
"evi:commonFormat": fmt,
"evi:commonSoftware": [make_id_ref(sw_id) for sw_id in common_sw_ids],
"evi:provenanceSignature": str(sig),
"evi:memberIds": _truncate_member_ids(sorted(member_ids), max_member_ids),
}
if schema_ids:
node["evi:commonSchema"] = [make_id_ref(sid) for sid in schema_ids]
return node
def _truncate_member_ids(ids: list[str], max_ids: int) -> list[str]:
"""Truncate member ID list if max_ids > 0, appending a summary entry."""
if max_ids <= 0 or len(ids) <= max_ids:
return ids
excluded = len(ids) - max_ids
return ids[:max_ids] + [f"... and {excluded} more (total: {len(ids)})"]
def condense_graph(
graph: list[dict],
threshold: int,
max_member_ids: int = 0,
) -> tuple[list[dict], dict]:
"""
Condense an RO-Crate @graph by collapsing repetitive provenance.
Returns (condensed_graph, stats).
"""
# Build index
index: dict[str, dict] = {}
for node in graph:
node_id = node.get("@id")
if node_id:
index[node_id] = node
original_count = len(graph)
# Traverse backward from outputs, condensing along the way
keep_ids, collapsed_ids, group_nodes, comp_updates = \
traverse_and_condense(index, threshold, max_member_ids)
if not group_nodes:
stats = {
"condensed": False,
"originalEntityCount": original_count,
"condensedEntityCount": original_count,
"datasetGroupCount": 0,
"note": "No repetitive provenance found above threshold.",
}
return graph, stats
# Build new graph: keep only entities in keep_ids
new_graph = []
for node in graph:
node_id = node.get("@id")
if node_id not in keep_ids:
continue
# Update consuming computation references
if node_id in comp_updates:
node = dict(node) # shallow copy
for member_ids, group_id in comp_updates[node_id]:
member_set = set(member_ids)
if "usedDataset" in node and node["usedDataset"]:
kept = [ref for ref in node["usedDataset"]
if ref.get("@id") not in member_set]
kept.append(make_id_ref(group_id))
node["usedDataset"] = kept
if "prov:used" in node and node["prov:used"]:
kept = [ref for ref in node["prov:used"]
if ref.get("@id") not in member_set]
kept.append(make_id_ref(group_id))
node["prov:used"] = kept
# Add condensation metadata to root
if is_rocrate_root(node):
node = dict(node)
condensed_count = len(keep_ids) + len(group_nodes)
node["evi:condensed"] = True
node["evi:condensationThreshold"] = threshold
node["evi:condensationDate"] = str(date.today())
node["evi:originalEntityCount"] = original_count
node["evi:condensedEntityCount"] = condensed_count
node["evi:datasetGroupCount"] = len(group_nodes)
total_collapsed = len(collapsed_ids)
node["evi:condensationNote"] = (
f"Condensed from {original_count} entities to "
f"{condensed_count}. "
f"{len(group_nodes)} dataset group(s) created by collapsing "
f"{total_collapsed} datasets with identical provenance signatures "
f"(same software chain). Full member lists preserved in evi:memberIds."
)
# Update hasPart to remove collapsed and add groups
if "hasPart" in node and node["hasPart"]:
kept = [ref for ref in node["hasPart"]
if ref.get("@id") not in collapsed_ids]
for gn in group_nodes:
kept.append(make_id_ref(gn["@id"]))
node["hasPart"] = kept
# Update EVI:outputs for top-level output condensation
if node_id in comp_updates:
for member_ids, group_id in comp_updates[node_id]:
member_set = set(member_ids)
if "EVI:outputs" in node and node["EVI:outputs"]:
kept = [ref for ref in node["EVI:outputs"]
if ref.get("@id") not in member_set]
kept.append(make_id_ref(group_id))
node["EVI:outputs"] = kept
new_graph.append(node)
# Add group nodes
new_graph.extend(group_nodes)
# Clean up dangling references: remove any {"@id": X} where X is not
# in the final graph
final_ids = {n.get("@id") for n in new_graph if "@id" in n}
ref_fields = ("usedDataset", "usedSoftware", "usedMLModel", "generated",
"hasPart", "prov:used", "generatedBy", "prov:wasGeneratedBy",
"derivedFrom", "prov:wasDerivedFrom", "usedByComputation",
"isPartOf", "EVI:outputs")
for i, node in enumerate(new_graph):
modified = False
node_copy = None
for field in ref_fields:
val = node.get(field)
if val is None:
continue
if isinstance(val, dict) and "@id" in val:
if val["@id"] not in final_ids:
if not modified:
node_copy = dict(node)
modified = True
node_copy[field] = []
elif isinstance(val, list):
cleaned = [ref for ref in val
if not (isinstance(ref, dict) and "@id" in ref
and ref["@id"] not in final_ids)]
if len(cleaned) != len(val):
if not modified:
node_copy = dict(node)
modified = True
node_copy[field] = cleaned
if modified:
new_graph[i] = node_copy
condensed_count = len(new_graph)
groups_info = []
for gn in group_nodes:
groups_info.append({
"memberCount": gn["evi:memberCount"],
"format": gn["format"],
"groupId": gn["@id"],
})
stats = {
"condensed": True,
"originalEntityCount": original_count,
"condensedEntityCount": condensed_count,
"datasetGroupCount": len(group_nodes),
"entitiesRemoved": len(collapsed_ids),
"groups": groups_info,
}
return new_graph, stats
# ---------------------------------------------------------------------------
# Multi-crate merge
# ---------------------------------------------------------------------------
def merge_additional_crates(
primary_graph: list[dict],
additional_paths: list[Path],
) -> list[dict]:
"""
Merge entities from additional RO-Crates into the primary graph.
Rules:
- The primary crate's root node (ROCrate type) is kept as-is.
- Each additional crate contributes its non-root, non-descriptor entities.
- Primary wins on any @id collision (additional entities are skipped if
the @id already exists in the primary graph).
- The ``ro-crate-metadata.json`` file descriptor from each additional
crate is always skipped.
- contentUrl values with file:/// prefix are resolved to absolute paths
relative to the sub-crate directory.
"""
# Build a fast lookup of @ids already present in the primary graph
primary_ids: set[str] = {node["@id"] for node in primary_graph if "@id" in node}
merged = list(primary_graph)
for crate_path in additional_paths:
metadata_path = crate_path / "ro-crate-metadata.json"
if not metadata_path.exists():
print(f"Warning: {metadata_path} not found, skipping")
continue
with open(metadata_path) as f:
crate = json.load(f)
additional_graph = crate.get("@graph", [])
added = 0
for node in additional_graph:
node_id = node.get("@id")
if not node_id:
continue
# Skip the file descriptor
if node_id == "ro-crate-metadata.json":
continue
# Skip the crate root from the additional crate
if is_rocrate_root(node):
continue
# Skip if already present in primary (primary wins)
if node_id in primary_ids:
continue
# Resolve contentUrl to absolute path
content_url = node.get("contentUrl")
if content_url and isinstance(content_url, str):
if content_url.startswith("file:///"):
relative = content_url[len("file:///"):]
absolute = str(crate_path / relative)
node = dict(node)
node["contentUrl"] = absolute
merged.append(node)
primary_ids.add(node_id)
added += 1
print(f" Merged {added} entities from {metadata_path}")
return merged
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Condense an RO-Crate by collapsing repetitive provenance chains.",
)
parser.add_argument(
"crate_dir",
type=Path,
help="Path to the primary RO-Crate directory containing ro-crate-metadata.json",
)
parser.add_argument(
"--additional-crates",
nargs="+",
type=Path,
metavar="CRATE_DIR",
default=[],
help="Additional RO-Crate directories whose entities are merged into the "
"primary graph before condensation. The primary crate's root stays as "
"the root; additional roots are skipped.",
)
parser.add_argument(
"--threshold",
type=int,
default=5,
help="Minimum group size to trigger condensation (default: 5)",
)
parser.add_argument(
"--output",
type=str,
default="condensed-metadata.json",
help="Output filename (default: condensed-metadata.json)",
)
parser.add_argument(
"--max-member-ids",
type=int,
default=0,
help="Limit the number of member IDs listed in each DatasetGroup node. "
"0 means unlimited (default: 0). When set, the list is truncated "
"and a summary entry is appended.",
)
args = parser.parse_args()
crate_dir = args.crate_dir.resolve()
metadata_path = crate_dir / "ro-crate-metadata.json"
if not metadata_path.exists():
print(f"Error: {metadata_path} not found")
return 1
with open(metadata_path) as f:
crate = json.load(f)
context = crate.get("@context", {})
graph = crate.get("@graph", [])
print(f"Loaded {len(graph)} entities from {metadata_path}")
# Merge additional crates if provided
if args.additional_crates:
resolved = [p.resolve() for p in args.additional_crates]
print(f"Merging {len(resolved)} additional crate(s)...")
graph = merge_additional_crates(graph, resolved)
print(f"Combined graph: {len(graph)} entities")
condensed_graph, stats = condense_graph(graph, args.threshold, args.max_member_ids)
if not stats["condensed"]:
print(f"Nothing to condense (no groups above threshold of {args.threshold})")
return 0
# Write condensed crate
output_path = crate_dir / args.output
condensed_crate = {
"@context": context,
"@graph": condensed_graph,
}
with open(output_path, "w") as f:
json.dump(condensed_crate, f, indent=2)
# Report
print(f"\nCondensation complete:")
print(f" Original entities: {stats['originalEntityCount']}")
print(f" Condensed entities: {stats['condensedEntityCount']}")
print(f" Entities removed: {stats['entitiesRemoved']}")
print(f" Dataset groups: {stats['datasetGroupCount']}")
for g in stats["groups"]:
print(f" - {g['memberCount']} {g['format']} files → 1 representative"
f" (group: {g['groupId']})")
print(f"\nWritten to {output_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())