Skip to content
2 changes: 1 addition & 1 deletion .github/CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
identity and expression, level of experience, education, socioeconomic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.

Expand Down
25 changes: 19 additions & 6 deletions src/linkml_map/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ class Session:
_target_schemaview: Optional[SchemaView] = None

def set_transformer_specification(
self, specification: Optional[Union[TransformationSpecification, dict, str, Path]] = None
self,
specification: Optional[
Union[TransformationSpecification, dict, str, Path]
] = None,
):
if isinstance(specification, Path):
specification = str(specification)
Expand All @@ -55,15 +58,19 @@ def set_transformer_specification(
)
normalizer.expand_all = True
specification = normalizer.normalize(specification)
self.transformer_specification = TransformationSpecification(**specification)
self.transformer_specification = TransformationSpecification(
**specification
)
elif isinstance(specification, str):
if "\n" in specification:
obj = yaml.safe_load(specification)
else:
obj = yaml.safe_load(open(specification))
self.set_transformer_specification(obj)

def set_source_schema(self, schema: Union[str, Path, dict, SchemaView, SchemaDefinition]):
def set_source_schema(
self, schema: Union[str, Path, dict, SchemaView, SchemaDefinition]
):
"""
Sets the schema from a path or SchemaView object.
"""
Expand Down Expand Up @@ -118,8 +125,12 @@ def set_object_transformer(
def target_schema(self) -> SchemaDefinition:
if self._target_schema is None:
if not self.schema_mapper:
self.schema_mapper = SchemaMapper(source_schemaview=self.source_schemaview)
self._target_schema = self.schema_mapper.derive_schema(self.transformer_specification)
self.schema_mapper = SchemaMapper(
source_schemaview=self.source_schemaview
)
self._target_schema = self.schema_mapper.derive_schema(
self.transformer_specification
)
return self._target_schema

@property
Expand All @@ -140,7 +151,9 @@ def reverse_transform(self, obj: dict, **kwargs) -> dict:
inv_spec = self.invert()
reverse_transformer = ObjectTransformer()
reverse_transformer.specification = inv_spec
reverse_transformer.source_schemaview = SchemaView(yaml_dumper.dumps(self.target_schema))
reverse_transformer.source_schemaview = SchemaView(
yaml_dumper.dumps(self.target_schema)
)
return reverse_transformer.map_object(obj, **kwargs)

def invert(self, in_place=False) -> TransformationSpecification:
Expand Down
170 changes: 116 additions & 54 deletions src/linkml_map/transformer/object_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def __init__(
if bindings:
self.bindings.update(bindings)

def get_ctxt_obj_and_dict(self, source_obj: OBJECT_TYPE = None) -> Tuple[DynObj, OBJECT_TYPE]:
def get_ctxt_obj_and_dict(
self, source_obj: OBJECT_TYPE = None
) -> Tuple[DynObj, OBJECT_TYPE]:
"""
Transform a source object into a typed context object and dictionary, and cache results.

Expand All @@ -75,7 +77,9 @@ def get_ctxt_obj_and_dict(self, source_obj: OBJECT_TYPE = None) -> Tuple[DynObj,

ctxt_obj = self.object_transformer.object_index.bless(source_obj_dyn)
ctxt_dict = {
k: getattr(ctxt_obj, k) for k in ctxt_obj._attributes() if not k.startswith("_")
k: getattr(ctxt_obj, k)
for k in ctxt_obj._attributes()
if not k.startswith("_")
}
else:
do = dynamic_object(source_obj, self.sv, self.source_type)
Expand Down Expand Up @@ -106,7 +110,9 @@ def __getitem__(self, name: Any) -> Any:

def __setitem__(self, name: Any, value: Any):
del name, value
raise RuntimeError(f"__setitem__ not allowed on class {self.__class__.__name__}")
raise RuntimeError(
f"__setitem__ not allowed on class {self.__class__.__name__}"
)


@dataclass
Expand All @@ -129,14 +135,24 @@ def index(self, source_obj: Any, target: str = None):
if isinstance(source_obj, dict):
if target is None:
[target] = [
c.name for c in self.source_schemaview.all_classes().values() if c.tree_root
c.name
for c in self.source_schemaview.all_classes().values()
if c.tree_root
]
if target is None:
raise ValueError(f"target must be passed if source_obj is dict: {source_obj}")
source_obj_typed = dynamic_object(source_obj, self.source_schemaview, target)
self.object_index = ObjectIndex(source_obj_typed, schemaview=self.source_schemaview)
raise ValueError(
f"target must be passed if source_obj is dict: {source_obj}"
)
source_obj_typed = dynamic_object(
source_obj, self.source_schemaview, target
)
self.object_index = ObjectIndex(
source_obj_typed, schemaview=self.source_schemaview
)
else:
self.object_index = ObjectIndex(source_obj, schemaview=self.source_schemaview)
self.object_index = ObjectIndex(
source_obj, schemaview=self.source_schemaview
)

def map_object(
self,
Expand All @@ -161,12 +177,16 @@ def map_object(
if len(source_types) == 1:
source_type = source_types[0]
elif len(source_types) > 1:
raise ValueError("No source type specified and multiple root classes found")
raise ValueError(
"No source type specified and multiple root classes found"
)
elif len(source_types) == 0:
if len(sv.all_classes()) == 1:
source_type = list(sv.all_classes().keys())[0]
else:
raise ValueError("No source type specified and no root classes found")
raise ValueError(
"No source type specified and no root classes found"
)

if source_type in sv.all_types():
if target_type:
Expand All @@ -181,10 +201,17 @@ def map_object(
elif target_type == "curie":
return self.compress_uri(source_obj)
return source_obj
if source_type in sv.all_enums():
# TODO: enum derivations
return self.transform_enum(source_obj, source_type, source_obj)
# return str(source_obj)

# Do enumeration transform if source_type has enumeration name(s)
source_type_enums = yaml.safe_load(source_type)
if not isinstance(source_type_enums, list):
source_type_enums = [source_type_enums]
source_type_enums = [
enum for enum in source_type_enums if enum in sv.all_enums()
]
if len(source_type_enums) > 0:
return self.transform_enum(source_obj, source_type_enums, source_obj)

source_obj_typed = None
if isinstance(source_obj, (BaseModel, YAMLRoot)):
# ensure dict
Expand All @@ -201,7 +228,9 @@ def map_object(
v = None
source_class_slot = None
if slot_derivation.unit_conversion:
v = self._perform_unit_conversion(slot_derivation, source_obj, sv, source_type)
v = self._perform_unit_conversion(
slot_derivation, source_obj, sv, source_type
)
elif slot_derivation.expr:
if bindings is None:
bindings = Bindings(
Expand All @@ -217,26 +246,34 @@ def map_object(
v = eval_expr_with_mapping(slot_derivation.expr, bindings)
except Exception:
if not self.unrestricted_eval:
raise RuntimeError(f"Expression not in safe subset: {slot_derivation.expr}")
raise RuntimeError(
f"Expression not in safe subset: {slot_derivation.expr}"
)
ctxt_obj, _ = bindings.get_ctxt_obj_and_dict()
aeval = Interpreter(usersyms={"src": ctxt_obj, "target": None})
aeval(slot_derivation.expr)
v = aeval.symtable["target"]
elif slot_derivation.populated_from:
v = source_obj.get(slot_derivation.populated_from, None)
source_class_slot = sv.induced_slot(slot_derivation.populated_from, source_type)
source_class_slot = sv.induced_slot(
slot_derivation.populated_from, source_type
)
logger.debug(
f"Pop slot {slot_derivation.name} => {v} using {slot_derivation.populated_from} // {source_obj}"
)
elif slot_derivation.sources:
vmap = {s: source_obj.get(s, None) for s in slot_derivation.sources}
vmap = {k: v for k, v in vmap.items() if v is not None}
if len(vmap.keys()) > 1:
raise ValueError(f"Multiple sources for {slot_derivation.name}: {vmap}")
raise ValueError(
f"Multiple sources for {slot_derivation.name}: {vmap}"
)
elif len(vmap.keys()) == 1:
v = list(vmap.values())[0]
source_class_slot_name = list(vmap.keys())[0]
source_class_slot = sv.induced_slot(source_class_slot_name, source_type)
source_class_slot = sv.induced_slot(
source_class_slot_name, source_type
)
else:
v = None
source_class_slot = None
Expand All @@ -253,14 +290,19 @@ def map_object(
source_class_slot_range = source_class_slot.range
if source_class_slot.multivalued:
if isinstance(v, list):
v = [self.map_object(v1, source_class_slot_range, target_range) for v1 in v]
v = [
self.map_object(v1, source_class_slot_range, target_range)
for v1 in v
]
elif isinstance(v, dict):
v = {
k1: self.map_object(v1, source_class_slot_range, target_range)
k1: self.map_object(
v1, source_class_slot_range, target_range
)
for k1, v1 in v.items()
}
else:
v = [v]
v = [self.map_object(v, source_class_slot_range, target_range)]
else:
v = self.map_object(v, source_class_slot_range, target_range)
if (
Expand All @@ -269,9 +311,9 @@ def map_object(
and not isinstance(v, list)
):
v = self._singlevalued_to_multivalued(v, slot_derivation)
if self._is_coerce_to_singlevalued(slot_derivation, class_deriv) and isinstance(
v, list
):
if self._is_coerce_to_singlevalued(
slot_derivation, class_deriv
) and isinstance(v, list):
v = self._multivalued_to_singlevalued(v, slot_derivation)
v = self._coerce_datatype(v, target_range)
if slot_derivation.dictionary_key and isinstance(v, list):
Expand All @@ -281,7 +323,8 @@ def map_object(
del v1[slot_derivation.dictionary_key]
elif (
slot_derivation.cast_collection_as
and slot_derivation.cast_collection_as == CollectionType.MultiValuedList
and slot_derivation.cast_collection_as
== CollectionType.MultiValuedList
and isinstance(v, dict)
):
# CompactDict to List
Expand All @@ -297,7 +340,11 @@ def map_object(
return tgt_attrs

def _perform_unit_conversion(
self, slot_derivation: SlotDerivation, source_obj: Any, sv: SchemaView, source_type: str
self,
slot_derivation: SlotDerivation,
source_obj: Any,
sv: SchemaView,
source_type: str,
) -> Union[float, Dict]:
uc = slot_derivation.unit_conversion
curr_v = source_obj.get(slot_derivation.populated_from, None)
Expand All @@ -309,7 +356,8 @@ def _perform_unit_conversion(
from_unit = curr_v.get(uc.source_unit_slot, None)
if from_unit is None:
raise ValueError(
f"Could not determine unit from {curr_v}" f" using {uc.source_unit_slot}"
f"Could not determine unit from {curr_v}"
f" using {uc.source_unit_slot}"
)
magnitude = curr_v.get(uc.source_magnitude_slot, None)
if magnitude is None:
Expand All @@ -332,7 +380,9 @@ def _perform_unit_conversion(
elif slot.unit.descriptive_name:
from_unit = slot.unit.descriptive_name
else:
raise NotImplementedError(f"Cannot determine unit system for {slot.unit}")
raise NotImplementedError(
f"Cannot determine unit system for {slot.unit}"
)
magnitude = curr_v
if not from_unit:
raise ValueError(f"Could not determine from_unit for {slot_derivation}")
Expand All @@ -352,7 +402,9 @@ def _perform_unit_conversion(
v = {uc.target_magnitude_slot: v, uc.target_unit_slot: to_unit}
return v

def _multivalued_to_singlevalued(self, vs: List[Any], slot_derivation: SlotDerivation) -> Any:
def _multivalued_to_singlevalued(
self, vs: List[Any], slot_derivation: SlotDerivation
) -> Any:
if slot_derivation.stringification:
stringification = slot_derivation.stringification
delimiter = stringification.delimiter
Expand All @@ -366,15 +418,19 @@ def _multivalued_to_singlevalued(self, vs: List[Any], slot_derivation: SlotDeriv
else:
raise ValueError(f"Unknown syntax: {stringification.syntax}")
else:
raise ValueError(f"Cannot convert multivalued to single valued: {vs}; no delimiter")
raise ValueError(
f"Cannot convert multivalued to single valued: {vs}; no delimiter"
)
if len(vs) > 1:
raise ValueError(f"Cannot coerce multiple values {vs}")
if len(vs) == 0:
return None
else:
return vs[0]

def _singlevalued_to_multivalued(self, v: Any, slot_derivation: SlotDerivation) -> List[Any]:
def _singlevalued_to_multivalued(
self, v: Any, slot_derivation: SlotDerivation
) -> List[Any]:
stringification = slot_derivation.stringification
if stringification:
delimiter = stringification.delimiter
Expand All @@ -391,7 +447,9 @@ def _singlevalued_to_multivalued(self, v: Any, slot_derivation: SlotDerivation)
else:
raise ValueError(f"Unknown syntax: {syntax}")
else:
raise ValueError(f"Cannot convert single valued to multivalued: {v}; no delimiter")
raise ValueError(
f"Cannot convert single valued to multivalued: {v}; no delimiter"
)
return vs
return [v]

Expand Down Expand Up @@ -424,24 +482,28 @@ def transform_object(
tr_obj_dict = self.map_object(source_obj, source_type_name)
return target_class(**tr_obj_dict)

def transform_enum(self, source_value: str, enum_name: str, source_obj: Any) -> Optional[str]:
enum_deriv = self._get_enum_derivation(enum_name)
if enum_deriv.expr:
try:
if enum_deriv.expr:
v = eval_expr(enum_deriv.expr, **source_obj, NULL=None)
except Exception:
aeval = Interpreter(usersyms={"src": source_obj, "target": None})
aeval(enum_deriv.expr)
v = aeval.symtable["target"]
if v is not None:
return v
for pv_deriv in enum_deriv.permissible_value_derivations.values():
if source_value == pv_deriv.populated_from:
return pv_deriv.name
if source_value in pv_deriv.sources:
return pv_deriv.name
if enum_deriv.mirror_source:
return str(source_value)
else:
return None
def transform_enum(
self, source_value: str, enum_name: Union[str, List[str]], source_obj: Any
) -> Optional[str]:
if isinstance(enum_name, str):
enum_name = [enum_name]
for cur_enum in enum_name:
enum_deriv = self._get_enum_derivation(cur_enum)
if enum_deriv.expr:
try:
if enum_deriv.expr:
v = eval_expr(enum_deriv.expr, **source_obj, NULL=None)
except Exception:
aeval = Interpreter(usersyms={"src": source_obj, "target": None})
aeval(enum_deriv.expr)
v = aeval.symtable["target"]
if v is not None:
return v
for pv_deriv in enum_deriv.permissible_value_derivations.values():
if source_value == pv_deriv.populated_from:
return pv_deriv.name
if source_value in pv_deriv.sources:
return pv_deriv.name
if enum_deriv.mirror_source:
return str(source_value)
return None
Loading