-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmetta_writer.py
191 lines (159 loc) · 7.16 KB
/
metta_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from biocypher import BioCypher
import pathlib
import os
from biocypher._logger import logger
import networkx as nx
class MeTTaWriter:
def __init__(self, schema_config, biocypher_config, output_dir):
self.schema_config = schema_config
self.biocypher_config = biocypher_config
self.output_path = pathlib.Path(output_dir)
if not os.path.exists(output_dir):
logger.info(f"Directory {output_dir} doesn't exist. Creating it...")
self.output_path.mkdir()
self.bcy = BioCypher(
schema_config_path=schema_config, biocypher_config_path=biocypher_config
)
self.ontology = self.bcy._get_ontology()
self.create_type_hierarchy()
# self.excluded_properties = ["license", "version", "source"]
self.excluded_properties = []
def create_type_hierarchy(self):
G = self.ontology._nx_graph
file_path = f"{self.output_path}/type_defs.metta"
with open(file_path, "w") as f:
for node in G.nodes:
if "mixin" in node:
continue
ancestor = list(self.get_parent(G, node))[-1]
node = self.convert_input_labels(node)
ancestor = self.convert_input_labels(ancestor)
if ancestor == node:
f.write(f"(: {node.upper()} Type)\n")
else:
f.write(f"(<: {node.upper()} {ancestor.upper()})\n")
self.create_data_constructors(f)
logger.info("Type hierarchy created successfully.")
def create_data_constructors(self, file):
schema = self.bcy._get_ontology_mapping()._extend_schema()
self.edge_node_types = {}
def edge_data_constructor(edge_type, source_type, target_type, label):
return f"(: {label.lower()} (-> {source_type.upper()} {target_type.upper()} {edge_type.upper()})"
def node_data_constructor(node_type, node_label):
return f"(: {node_label.lower()} (-> $x {node_type.upper()}))"
for k, v in schema.items():
if (
v["represented_as"] == "edge"
): # (: (label $x $y) (-> source_type target_type
edge_type = self.convert_input_labels(k)
# ## TODO fix this in the scheme config
if isinstance(v["input_label"], list):
label = self.convert_input_labels(v["input_label"][0])
source_type = self.convert_input_labels(v["source"][0])
target_type = self.convert_input_labels(v["target"][0])
else:
label = self.convert_input_labels(v["input_label"])
source_type = self.convert_input_labels(v["source"])
target_type = self.convert_input_labels(v["target"])
out_str = edge_data_constructor(
edge_type, source_type, target_type, label
)
file.write(out_str + "\n")
self.edge_node_types[label.lower()] = {
"source": source_type.lower(),
"target": target_type.lower(),
}
elif v["represented_as"] == "node":
label = v["input_label"]
if not isinstance(label, list):
label = [label]
label = [self.convert_input_labels(l) for l in label]
node_type = self.convert_input_labels(k)
for l in label:
out_str = node_data_constructor(node_type, l)
file.write(out_str + "\n")
def write_nodes(self, nodes, path_prefix=None, create_dir=True):
if path_prefix is not None:
file_path = f"{self.output_path}/{path_prefix}/nodes.metta"
if create_dir:
if not os.path.exists(f"{self.output_path}/{path_prefix}"):
pathlib.Path(f"{self.output_path}/{path_prefix}").mkdir(
parents=True, exist_ok=True
)
else:
file_path = f"{self.output_path}/nodes.metta"
with open(file_path, "w") as f:
for node in nodes:
out_str = self.write_node(node)
for s in out_str:
f.write(s + "\n")
f.write("\n")
logger.info("Finished writing out nodes")
def write_edges(self, edges, path_prefix=None, create_dir=True):
if path_prefix is not None:
file_path = f"{self.output_path}/{path_prefix}/edges.metta"
if create_dir:
if not os.path.exists(f"{self.output_path}/{path_prefix}"):
pathlib.Path(f"{self.output_path}/{path_prefix}").mkdir(
parents=True, exist_ok=True
)
else:
file_path = f"{self.output_path}/edges.metta"
with open(file_path, "w") as f:
for edge in edges:
out_str = self.write_edge(edge)
for s in out_str:
f.write(s + "\n")
f.write("\n")
def write_node(self, node):
id, label, properties = node
if "." in label:
label = label.split(".")[1]
def_out = f"({self.convert_input_labels(label)} {id})"
return self.write_property(def_out, properties)
def write_edge(self, edge):
_, source_id, target_id, label, properties = edge
label = label.lower()
source_type = self.edge_node_types[label]["source"]
target_type = self.edge_node_types[label]["target"]
def_out = f"({label} ({source_type} {source_id}) ({target_type} {target_id}))"
return self.write_property(def_out, properties)
def write_property(self, def_out, property):
out_str = [def_out]
for k, v in property.items():
if k in self.excluded_properties or v is None:
continue
if isinstance(v, list):
prop = "("
for i, e in enumerate(v):
if isinstance(e, str):
prop += f'"{e}"'
else:
prop += f"{e}"
if i != len(v) - 1:
prop += " "
prop += ")"
out_str.append(f"(has-property {def_out} {k} {prop})")
else:
if isinstance(v, str):
out_str.append(f'(has-property {def_out} {k} "{v}")')
else:
out_str.append(f"(has-property {def_out} {k} {v})")
return out_str
def convert_input_labels(self, label, replace_char="_"):
"""
A method that removes spaces in input labels and replaces them with replace_char
:param label: Input label of a node or edge
:param replace_char: the character to replace spaces with
:return:
"""
return label.replace(" ", replace_char)
def get_parent(self, G, node):
"""
Get the immediate parent of a node in the ontology.
"""
return nx.dfs_preorder_nodes(G, node, depth_limit=2)
def show_ontology_structure(self):
self.bcy.show_ontology_structure()
def summary(self):
self.bcy.summary()