From cb832e027291737518b295482eae9ccb10ce5278 Mon Sep 17 00:00:00 2001 From: Daniel Incicau Date: Fri, 19 Jul 2024 10:47:54 +0200 Subject: [PATCH 1/2] Remove yaml converter --- src/converter/__init__.py | 3 +- src/converter/converter.py | 123 +++++++++++++++---- src/converter/linkml_converter.py | 133 -------------------- src/converter/yaml_converter.py | 193 ------------------------------ 4 files changed, 98 insertions(+), 354 deletions(-) delete mode 100644 src/converter/linkml_converter.py delete mode 100644 src/converter/yaml_converter.py diff --git a/src/converter/__init__.py b/src/converter/__init__.py index 1148eb0..6081e14 100644 --- a/src/converter/__init__.py +++ b/src/converter/__init__.py @@ -1,2 +1 @@ -from .linkml_converter import * -from .yaml_converter import * +from .converter import * diff --git a/src/converter/converter.py b/src/converter/converter.py index 030fcde..737dfb8 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -1,68 +1,139 @@ import os +from src.utils.helpers import merge_dict_list, load_yaml -class ConverterTrait: + +class LinkMLConverter: def __init__(self, benchmark_file): self.stage_order_map = None self.benchmark_file = os.path.abspath(benchmark_file) + self.benchmark = load_yaml(benchmark_file) - def get_stage_id(self, stage): - raise NotImplementedError("Method not implemented yet") - - def get_module_id(self, module): - raise NotImplementedError("Method not implemented yet") + def get_benchmark_name(self): + return self.benchmark.name if self.benchmark.name else self.benchmark.id def get_benchmark_definition(self): - raise NotImplementedError("Method not implemented yet") + return self.benchmark - def get_benchmark_definition_file(self): - raise NotImplementedError("Method not implemented yet") + def get_stage_id(self, stage): + return stage.id + + def get_module_id(self, module): + return module.id def get_benchmark_stages(self): - raise NotImplementedError("Method not implemented yet") + return dict([(x.id, x) for x in self.benchmark.stages]) def get_benchmark_stage(self, stage_id): - raise NotImplementedError("Method not implemented yet") + stages = self.get_benchmark_stages().values() + return next(stage for stage in stages if stage.id == stage_id) def get_modules_by_stage(self, stage): - raise NotImplementedError("Method not implemented yet") + return dict([(x.id, x) for x in stage.modules]) def get_stage_implicit_inputs(self, stage): - raise NotImplementedError("Method not implemented yet") + if isinstance(stage, str): + stage = self.get_benchmark_stages()[stage] - def get_inputs_stage(self, implicit_inputs): - raise NotImplementedError("Method not implemented yet") + return [input.entries for input in stage.inputs] - def get_stage_explicit_inputs(self, stage): - raise NotImplementedError("Method not implemented yet") + def get_inputs_stage(self, implicit_inputs): + stages_map = {key: None for key in implicit_inputs} + if implicit_inputs is not None: + all_stages = self.get_benchmark_stages() + all_stages_outputs = [] + for stage_id in all_stages: + outputs = self.get_stage_outputs(stage=stage_id) + outputs = {key: stage_id for key, value in outputs.items()} + all_stages_outputs.append(outputs) + + all_stages_outputs = merge_dict_list(all_stages_outputs) + for in_deliverable in implicit_inputs: + # beware stage needs to be substituted + curr_output = all_stages_outputs[in_deliverable] + + stages_map[in_deliverable] = curr_output + + return stages_map + + def get_stage_explicit_inputs(self, implicit_inputs): + explicit = {key: None for key in implicit_inputs} + if implicit_inputs is not None: + all_stages = self.get_benchmark_stages() + all_stages_outputs = [] + for stage_id in all_stages: + outputs = self.get_stage_outputs(stage=stage_id) + outputs = { + key: value.format( + input="{input}", + stage=stage_id, + module="{module}", + params="{params}", + dataset="{dataset}", + ) + for key, value in outputs.items() + } + all_stages_outputs.append(outputs) + + all_stages_outputs = merge_dict_list(all_stages_outputs) + for in_deliverable in implicit_inputs: + # beware stage needs to be substituted + curr_output = all_stages_outputs[in_deliverable] + + explicit[in_deliverable] = curr_output + + return explicit def get_stage_outputs(self, stage): - raise NotImplementedError("Method not implemented yet") + if isinstance(stage, str): + stage = self.get_benchmark_stages()[stage] + + return dict([(output.id, output.path) for output in stage.outputs]) def get_module_excludes(self, module): - raise NotImplementedError("Method not implemented yet") + if isinstance(module, str): + module = self.get_benchmark_modules()[module] + + return module.exclude def get_module_parameters(self, module): - raise NotImplementedError("Method not implemented yet") + params = None + if module.parameters is not None: + params = [x.values for x in module.parameters] + + return params def get_module_repository(self, module): - raise NotImplementedError("Method not implemented yet") + return module.repository def is_initial(self, stage): - raise NotImplementedError("Method not implemented yet") + if stage.inputs is None or len(stage.inputs) == 0: + return True + else: + return False def get_after(self, stage): - raise NotImplementedError("Method not implemented yet") + return stage.after def get_stage_ids(self): - raise NotImplementedError("Method not implemented yet") + return [x.id for x in self.benchmark.stages] def get_module_ids(self): - raise NotImplementedError("Method not implemented yet") + module_ids = [] + for stage in self.benchmark.stages: + for module in stage.modules: + module_ids.append(module.id) + + return module_ids def get_output_ids(self): - raise NotImplementedError("Method not implemented yet") + output_ids = [] + for stage in self.benchmark.stages: + for output in stage.outputs: + output_ids.append(output.id) + + return output_ids def get_initial_datasets(self): stages = self.get_benchmark_stages() diff --git a/src/converter/linkml_converter.py b/src/converter/linkml_converter.py deleted file mode 100644 index 0e593ca..0000000 --- a/src/converter/linkml_converter.py +++ /dev/null @@ -1,133 +0,0 @@ -from src.converter.converter import ConverterTrait -from src.utils.helpers import merge_dict_list, load_yaml - - -class LinkMLConverter(ConverterTrait): - def __init__(self, benchmark_file): - super().__init__(benchmark_file) - self.benchmark = load_yaml(benchmark_file) - - def get_benchmark_name(self): - return self.benchmark.name if self.benchmark.name else self.benchmark.id - - def get_benchmark_definition(self): - return self.benchmark - - def get_stage_id(self, stage): - return stage.id - - def get_module_id(self, module): - return module.id - - def get_benchmark_stages(self): - return dict([(x.id, x) for x in self.benchmark.stages]) - - def get_benchmark_stage(self, stage_id): - stages = self.get_benchmark_stages().values() - return next(stage for stage in stages if stage.id == stage_id) - - def get_modules_by_stage(self, stage): - return dict([(x.id, x) for x in stage.modules]) - - def get_stage_implicit_inputs(self, stage): - if isinstance(stage, str): - stage = self.get_benchmark_stages()[stage] - - return [input.entries for input in stage.inputs] - - def get_inputs_stage(self, implicit_inputs): - stages_map = {key: None for key in implicit_inputs} - if implicit_inputs is not None: - all_stages = self.get_benchmark_stages() - all_stages_outputs = [] - for stage_id in all_stages: - outputs = self.get_stage_outputs(stage=stage_id) - outputs = {key: stage_id for key, value in outputs.items()} - all_stages_outputs.append(outputs) - - all_stages_outputs = merge_dict_list(all_stages_outputs) - for in_deliverable in implicit_inputs: - # beware stage needs to be substituted - curr_output = all_stages_outputs[in_deliverable] - - stages_map[in_deliverable] = curr_output - - return stages_map - - def get_stage_explicit_inputs(self, implicit_inputs): - explicit = {key: None for key in implicit_inputs} - if implicit_inputs is not None: - all_stages = self.get_benchmark_stages() - all_stages_outputs = [] - for stage_id in all_stages: - outputs = self.get_stage_outputs(stage=stage_id) - outputs = { - key: value.format( - input="{input}", - stage=stage_id, - module="{module}", - params="{params}", - dataset="{dataset}", - ) - for key, value in outputs.items() - } - all_stages_outputs.append(outputs) - - all_stages_outputs = merge_dict_list(all_stages_outputs) - for in_deliverable in implicit_inputs: - # beware stage needs to be substituted - curr_output = all_stages_outputs[in_deliverable] - - explicit[in_deliverable] = curr_output - - return explicit - - def get_stage_outputs(self, stage): - if isinstance(stage, str): - stage = self.get_benchmark_stages()[stage] - - return dict([(output.id, output.path) for output in stage.outputs]) - - def get_module_excludes(self, module): - if isinstance(module, str): - module = self.get_benchmark_modules()[module] - - return module.exclude - - def get_module_parameters(self, module): - params = None - if module.parameters is not None: - params = [x.values for x in module.parameters] - - return params - - def get_module_repository(self, module): - return module.repository - - def is_initial(self, stage): - if stage.inputs is None or len(stage.inputs) == 0: - return True - else: - return False - - def get_after(self, stage): - return stage.after - - def get_stage_ids(self): - return [x.id for x in self.benchmark.stages] - - def get_module_ids(self): - module_ids = [] - for stage in self.benchmark.stages: - for module in stage.modules: - module_ids.append(module.id) - - return module_ids - - def get_output_ids(self): - output_ids = [] - for stage in self.benchmark.stages: - for output in stage.outputs: - output_ids.append(output.id) - - return output_ids diff --git a/src/converter/yaml_converter.py b/src/converter/yaml_converter.py deleted file mode 100644 index 9892628..0000000 --- a/src/converter/yaml_converter.py +++ /dev/null @@ -1,193 +0,0 @@ -import os -import os.path as op - -from src.utils.helpers import merge_dict_list -from src.converter.converter import ConverterTrait - - -class YamlConverter(ConverterTrait): - def __init__(self, config): - super().__init__() - self.config = config - - def get_benchmark_definition(self): - return self.config - - def get_stage_id(self, stage): - return stage["id"] - - def get_module_id(self, module): - return module["id"] - - def get_benchmark_stages(self): - return dict([(x["id"], x) for x in self.config["steps"]]) - - def get_benchmark_stage(self, stage_id): - stages = self.get_benchmark_stages() - return [stage for stage in stages if stage["id"] == stage_id] - - def get_modules_by_stage(self, stage): - return dict([(x["id"], x) for x in stage["members"]]) - - def get_modules(self): - m = [] - for stage_name, stage in self.get_benchmark_stages(): - m.append([x["id"] for x in stage["members"]]) - - return sum(m, []) - - def get_module_parameters(self, module): - params = None - if "parameters" in module.keys(): - params = [x["values"] for x in module["parameters"]] - - return params - - def get_module_repository(self, module): - return module["repo"] - - def get_module_excludes(self, module): - excludes = None - if "exclude" in module.keys(): - excludes = module["exclude"] - - return excludes - - def get_stage_implicit_inputs(self, stage): - if isinstance(stage, str): - stage = self.get_benchmark_stages()[stage] - - if "initial" in stage.keys() and stage["initial"]: - return None - - return [input["entries"] for input in stage["inputs"]] - - def get_stage_outputs(self, stage): - if isinstance(stage, str): - stage = self.get_benchmark_stages()[stage] - - return dict([(output["id"], output["path"]) for output in stage["outputs"]]) - - def get_stage_explicit_inputs(self, implicit_inputs): - explicit = {key: None for key in implicit_inputs} - if implicit_inputs is not None: - all_stages = self.get_benchmark_stages() - all_stages_outputs = [ - self.get_stage_outputs(stage=stage_id) for stage_id in all_stages - ] - all_stages_outputs = merge_dict_list(all_stages_outputs) - - for in_deliverable in implicit_inputs: - # beware stage needs to be substituted - curr_output = all_stages_outputs[in_deliverable] - - explicit[in_deliverable] = curr_output - - return explicit - - def get_stage_explicit_input_dirnames(self, stage): - explicit = self.get_stage_explicit_inputs(stage) - de = explicit - if explicit is not None: - for i in range(len(explicit)): - for in_deliverable in explicit[i].keys(): - de[i][in_deliverable] = op.dirname(explicit[i][in_deliverable]) - - return de - - def is_initial(self, stage): - if "initial" in stage.keys() and stage["initial"]: - return True - else: - return False - - def get_after(self, stage): - if "after" in stage.keys(): - return stage["after"] - else: - return None - - def get_initial_dataset_paths(self, dataset): - filled = [] - for stage in self.config["steps"].keys(): - if ( - "initial" in self.config["steps"][stage].keys() - and self.config["steps"][stage]["initial"] - ): - outs = list(self.get_stage_outputs(stage).values()) - for i in range(len(outs)): - filled.append( - [ - outs[i].format( - stage=stage, mod=dataset, params="default", id=dataset - ) - ] - ) - - return sum(filled, []) - - ## playground ------------- - - # # dirty, fix - # def write_module_flag_for_dirty_module_wildcards(module): - # stage = get_initial_stage_name() - # ## creates an empty file - # open(op.join('out', stage, f"{module}/{module}.flag".format(module = module)), 'a') - - def tokenize_parameters(self): - print("todo") - - def count_path_depth(self, path): - return path.count(os.sep) - - ## if a module (stage) gets inputs from different modules, i.e. counts from 'processed' after 'raw' - ## and 'meta' from raw, then we have to nest outputs after the longest (deepest) folder - - ## that is, raw/processed/here, and not to raw/here - def get_deepest_input_dirname(self, stage): - ii = self.get_stage_implicit_inputs(stage) - deepest_inputs = [] - if ii is not None: - deepest_input = "." - deepest_input_depth = 0 - for input_dict in ii: - for item in input_dict.keys(): - curr_depth = self.count_path_depth(input_dict[item]) - if curr_depth > deepest_input_depth: - deepest_input_depth = curr_depth - deepest_input = op.dirname(input_dict[item]) - deepest_inputs.append(deepest_input) - - return deepest_inputs - - def get_deepest_input_dirname_for_input_dict(self, input_dict_list): - deepest_input = "." - deepest_input_depth = 0 - for input_dict in input_dict_list: - for item in input_dict.keys(): - curr_depth = self.count_path_depth(input_dict[item]) - if curr_depth > deepest_input_depth: - deepest_input_depth = curr_depth - deepest_input = op.dirname(input_dict[item]) - - return deepest_input - - # ## with substituted module/stage/ids - # def fill_explicit_outputs(stage, module): - # i = get_stage_explicit_outputs(stage) - # idir = get_deepest_input_dirname(stage) - - # oe = get_stage_outputs(stage) - # excludes = get_module_excludes(stage = stage, module = module) - # return('todo') - - def nest_deliverable_path(self, parent, path): - return op.join(parent, path) - - ## using the input identifiers, excludes and parameters and not 'after' clauses - def traverse_yaml(self): - lookup = "" - for stage_name, stage in self.get_benchmark_stages(): - for module in self.get_modules_by_stage(stage): - ii = self.get_stage_implicit_inputs(stage) - - return "todo" From 16d5a64d1d47d3fa46d4c28cfcaf6959ae4c9c0c Mon Sep 17 00:00:00 2001 From: Daniel Incicau Date: Sun, 21 Jul 2024 13:45:07 +0200 Subject: [PATCH 2/2] Change interface for model.Benchmark such that it takes directly the benchmark_yaml path --- main.py | 6 ++---- src/model/benchmark.py | 21 ++++++++++++++++----- src/workflow/snakemake/rules/utils.smk | 17 +++++------------ 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/main.py b/main.py index c459269..f955806 100644 --- a/main.py +++ b/main.py @@ -15,10 +15,8 @@ def main(benchmark_file): - converter = LinkMLConverter(benchmark_file) - validator = Validator() - converter = validator.validate(converter) - benchmark = Benchmark(converter) + benchmark = Benchmark(benchmark_file) + converter = benchmark.get_converter() print(benchmark.get_definition()) stages = converter.get_benchmark_stages() diff --git a/src/model/benchmark.py b/src/model/benchmark.py index ee8f09a..57f10a6 100644 --- a/src/model/benchmark.py +++ b/src/model/benchmark.py @@ -1,15 +1,26 @@ +from pathlib import Path + import src.model.dag_operations as dag +from src.converter import LinkMLConverter from src.utils.helpers import * +from src.validation import Validator class Benchmark: - def __init__(self, converter, output_folder="out"): + def __init__(self, benchmark_yaml: Path, out_dir: str = "out"): + converter = LinkMLConverter(benchmark_yaml) + validator = Validator() + converter = validator.validate(converter) + self.converter = converter - self.output_folder = output_folder - self.G = dag.build_dag_from_definition(converter, self.output_folder) + self.out_dir = out_dir + self.G = dag.build_dag_from_definition(converter, self.out_dir) self.execution_paths = None + def get_converter(self): + return self.converter + def get_benchmark_name(self): return self.converter.get_benchmark_name() @@ -42,10 +53,10 @@ def get_output_paths(self): execution_paths = self.get_execution_paths() output_paths = [ - format_name(output, self.output_folder) + format_name(output, self.out_dir) for path in execution_paths for output in self._construct_output_paths( - prefix=self.output_folder, nodes=path + prefix=self.out_dir, nodes=path ) ] diff --git a/src/workflow/snakemake/rules/utils.smk b/src/workflow/snakemake/rules/utils.smk index 55bab9c..388a546 100644 --- a/src/workflow/snakemake/rules/utils.smk +++ b/src/workflow/snakemake/rules/utils.smk @@ -1,17 +1,10 @@ -from src.converter import LinkMLConverter +from pathlib import Path from src.model import Benchmark -from src.validation import Validator +def load(benchmark_yaml: Path): + return Benchmark(benchmark_yaml) -def load(benchmark_file_path): - converter = LinkMLConverter(benchmark_file_path) - validator = Validator() - converter = validator.validate(converter) - benchmark = Benchmark(converter) - return benchmark - - -def load_node(benchmark_file_path, node_id): - benchmark = load(benchmark_file_path) +def load_node(benchmark_yaml: Path, node_id: str): + benchmark = load(benchmark_yaml) return benchmark.get_node_by_id(node_id) \ No newline at end of file