Skip to content
This repository was archived by the owner on Aug 27, 2024. It is now read-only.

Commit bd1ba6f

Browse files
author
Daniel Incicau
committed
Module execution: clone repository, checkout commit, execute run.sh
1 parent 924f86c commit bd1ba6f

File tree

11 files changed

+143
-32
lines changed

11 files changed

+143
-32
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ graph:
1313
snakemake --dag | dot -Tpng > workflow_dag.png
1414
clean:
1515
rm -f benchmark.pkl Snakefile
16-
rm -rf ./in ./out ./log ./data/D1 ./data/D2 workflow_dag.png output_dag.png
16+
rm -rf ./in ./out ./log ./data/D1 ./data/D2 workflow_dag.png output_dag.png ./.snakemake

Pipfile

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ linkml-runtime = ">=1.7.0"
1111
rdflib = "==7.0.0"
1212
pyyaml = "~=6.0.1"
1313
omni-schema = {git = "https://github.com/omnibenchmark/omni-schema.git", editable = true, ref = "version/0.0.2"}
14+
gitpython = "*"
1415

1516
[dev-packages]
1617

data/Benchmark_001.yaml

+5-5
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ stages:
5555
- id: P2
5656
software_environment: "R"
5757
parameters:
58-
- values: ["-a 0", "-c 0"]
59-
- values: ["-a 1", "-c 0.1"]
58+
- values: ["-a 0", "-b 0"]
59+
- values: ["-a 1", "-b 0.1"]
6060
repository:
6161
url: https://github.com/omnibenchmark-example/process.git
6262
commit: 24579a8
@@ -107,17 +107,17 @@ stages:
107107
- id: m1
108108
software_environment: "python"
109109
repository:
110-
url: git@github.com:omnibenchmark-example/metric.git
110+
url: https://github.com/omnibenchmark-example/metric.git
111111
commit: ba781d7
112112
- id: m2
113113
software_environment: "python"
114114
repository:
115-
url: git@github.com:omnibenchmark-example/metric.git
115+
url: https://github.com/omnibenchmark-example/metric.git
116116
commit: ba781d7
117117
- id: m3
118118
software_environment: "python"
119119
repository:
120-
url: git@github.com:omnibenchmark-example/metric.git
120+
url: https://github.com/omnibenchmark-example/metric.git
121121
commit: ba781d7
122122
inputs:
123123
- entries: [

main.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def main(benchmark_file):
6363

6464
if __name__ == "__main__":
6565
parser = argparse.ArgumentParser(description='Test OmniWorkflow converter.')
66-
parser.add_argument('--benchmark_file', default='data/Benchmark_002.yaml',
66+
parser.add_argument('--benchmark_file', default='data/Benchmark_001.yaml',
6767
type=str, help='Location of the benchmark file')
6868

6969
args = parser.parse_args()

src/converter/linkml_converter.py

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ def __init__(self, benchmark_file):
77
super().__init__(benchmark_file)
88
self.benchmark = load_yaml(benchmark_file)
99

10+
def get_benchmark_name(self):
11+
return self.benchmark.name if self.benchmark.name else self.benchmark.id
12+
1013
def get_benchmark_definition(self):
1114
return self.benchmark
1215

src/model/benchmark.py

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ def __init__(self, converter, output_folder='out'):
1010

1111
self.execution_paths = None
1212

13+
def get_benchmark_name(self):
14+
return self.converter.get_benchmark_name()
15+
1316
def get_definition(self):
1417
return self.converter.get_benchmark_definition()
1518

src/model/dag_operations.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ def expend_stage_nodes(converter, stage, output_folder):
2626
for inputs in inputs_for_stage:
2727
required_input_stages = set(converter.get_inputs_stage(inputs).values()) if inputs else None
2828
most_recent_input_stage = sorted(list(required_input_stages), key=converter.stage_order)[-1] if inputs else None
29-
inputs = converter.get_stage_explicit_inputs(inputs).values() if inputs else None
30-
inputs = [x.replace('{input}', '{pre}') for x in inputs] if inputs else None
29+
inputs = converter.get_stage_explicit_inputs(inputs) if inputs else None
30+
inputs = {k: v.replace('{input}', '{pre}') for k, v in inputs.items()} if inputs else None
3131
node = BenchmarkNode(converter, stage, module, param, inputs, outputs, param_id,
3232
after=most_recent_input_stage)
3333
nodes.append(node)

src/model/node.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os.path
22

3+
from omni_schema.datamodel.omni_schema import Repository
4+
35

46
class BenchmarkNode:
57
def __init__(self, converter,
@@ -22,14 +24,27 @@ def __init__(self, converter,
2224
def get_id(self):
2325
return BenchmarkNode.to_id(self.stage_id, self.module_id, self.param_id, self.after)
2426

27+
def get_benchmark_name(self):
28+
return self.converter.get_benchmark_name()
29+
2530
def get_definition(self):
2631
return self.converter.get_benchmark_definition()
2732

2833
def get_definition_file(self):
2934
return self.converter.benchmark_file
3035

3136
def get_inputs(self):
32-
return self.inputs if self.inputs else []
37+
return self.inputs.values() if self.inputs else []
38+
39+
def get_inputs_dict(self):
40+
return self.inputs if self.inputs else {}
41+
42+
def get_explicit_inputs(self):
43+
explicit_inputs = [self.converter.get_stage_explicit_inputs(i) for i in self.converter.get_stage_implicit_inputs(self.stage)]
44+
return explicit_inputs
45+
46+
def get_benchmark_name(self):
47+
return self.converter.get_benchmark_name()
3348

3449
def get_input_paths(self):
3550
input_paths = []
@@ -53,6 +68,9 @@ def get_output_paths(self):
5368
def get_parameters(self):
5469
return self.parameters
5570

71+
def get_repository(self):
72+
return self.converter.get_module_repository(module=self.module)
73+
5674
def is_initial(self):
5775
return self.converter.is_initial(self.stage)
5876

src/workflow/snakemake/format/formatter.py

+13-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import re
22
from itertools import takewhile
33
from pathlib import Path
4-
from typing import List, Set, Tuple, Union, NamedTuple
4+
from typing import List, Set, Tuple, Union, NamedTuple, Dict, Any
55

66
from src.model import BenchmarkNode, Benchmark
77

@@ -25,7 +25,7 @@ def format_output_templates_to_be_expanded(node: BenchmarkNode) -> List[str]:
2525
return outputs
2626

2727

28-
def format_input_templates_to_be_expanded(benchmark: Benchmark, wildcards: Wildcards) -> List[str]:
28+
def format_input_templates_to_be_expanded(benchmark: Benchmark, wildcards: Wildcards, return_as_dict=False) -> dict[str, str] | list[str]:
2929
"""Formats benchmark inputs that will be expanded according to Snakemake's engine"""
3030

3131
pre = wildcards.pre
@@ -43,15 +43,17 @@ def format_input_templates_to_be_expanded(benchmark: Benchmark, wildcards: Wildc
4343
node_hash = hash(BenchmarkNode.to_id(stage_id, module_id, param_id, after_stage_id))
4444
matching_node = next((node for node in nodes if hash(node) == node_hash), None)
4545
if matching_node:
46-
node_inputs = matching_node.get_inputs()
46+
node_inputs = matching_node.get_inputs_dict()
4747

4848
inputs = _match_inputs(node_inputs, pre_stages, pre, dataset)
4949

5050
# print(f'Inputs: {stage_id} {module_id} {param_id}: {inputs}')
51-
return inputs
52-
51+
if return_as_dict:
52+
return inputs
53+
else:
54+
return inputs.values()
5355
else:
54-
return []
56+
return {} if return_as_dict else []
5557

5658

5759
def _extract_stages_from_path(path: str, known_stage_ids: Set[str]) -> List[Union[str, tuple]]:
@@ -113,17 +115,17 @@ def _match_input_prefix(input: str, pre: str) -> str:
113115
return formatted_input
114116

115117

116-
def _match_inputs(inputs: List[str], stages: List[Tuple[str]], pre: str, dataset: str) -> List[str]:
118+
def _match_inputs(inputs: dict[str, str], stages: List[Tuple[str]], pre: str, dataset: str) -> dict[str, str]:
117119
all_matched = True
118120

119-
formatted_inputs = []
120-
for input in inputs:
121+
formatted_inputs = {}
122+
for key, input in inputs.items():
121123
formatted_input = _match_input_module(input, stages, dataset)
122124
if not formatted_input:
123125
all_matched = False
124126
break
125127
else:
126128
formatted_input = _match_input_prefix(formatted_input, pre)
127-
formatted_inputs.append(formatted_input)
129+
formatted_inputs[key] = formatted_input
128130

129-
return formatted_inputs if all_matched else []
131+
return formatted_inputs if all_matched else {}

src/workflow/snakemake/rules/rule_node.smk

+15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def _create_initial_node(node):
1919
module_id = node.module_id
2020
param_id = node.param_id
2121

22+
repository = node.get_repository()
23+
repository_url = repository.url if repository else None
24+
commit_hash = repository.commit if repository else None
25+
2226
rule:
2327
name: f"{{stage}}_{{module}}_{{param}}".format(stage=stage_id,module=module_id,param=param_id)
2428
wildcard_constraints:
@@ -29,6 +33,8 @@ def _create_initial_node(node):
2933
output:
3034
formatter.format_output_templates_to_be_expanded(node),
3135
params:
36+
repository_url = repository_url,
37+
commit_hash = commit_hash,
3238
parameters = node.get_parameters(),
3339
script: os.path.join(os.path.dirname(os.path.realpath(scripts.__file__)), 'run_module.py')
3440

@@ -44,6 +50,12 @@ def _create_intermediate_node(benchmark, node):
4450
if any(['{params}' in o for o in outputs]):
4551
post += '/' + param_id
4652

53+
repository = node.get_repository()
54+
repository_url = repository.url if repository else None
55+
commit_hash = repository.commit if repository else None
56+
57+
inputs_map = lambda wildcards: formatter.format_input_templates_to_be_expanded(benchmark, wildcards, return_as_dict=True)
58+
4759
rule:
4860
name: f"{{stage}}_{{module}}_{{param}}".format(stage=stage_id,module=module_id,param=param_id)
4961
wildcard_constraints:
@@ -55,6 +67,9 @@ def _create_intermediate_node(benchmark, node):
5567
output:
5668
formatter.format_output_templates_to_be_expanded(node)
5769
params:
70+
inputs_map = inputs_map,
71+
repository_url = repository_url,
72+
commit_hash = commit_hash,
5873
parameters = node.get_parameters()
5974
script: os.path.join(os.path.dirname(os.path.realpath(scripts.__file__)), 'run_module.py')
6075

src/workflow/snakemake/scripts/run_module.py

+80-11
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
##
55
## Started 22 Feb 2024
66
## Izaskun Mallona
7-
8-
import sys
7+
import hashlib
8+
import subprocess
99
import os
1010
from typing import List
1111

12+
from git import Repo
1213
from snakemake.script import Snakemake
1314

1415

@@ -22,7 +23,61 @@ def mock_execution(inputs: List[str], output: str, snakemake: Snakemake):
2223
print(' params are', snakemake.params)
2324

2425

25-
def dump_parameters_to_file(output_dir: str, parameters: str):
26+
def execution(module_dir: str, module_name: str, output_dir: str, dataset: str,
27+
inputs_map: dict[str, str], parameters: List[str]):
28+
29+
run_sh = os.path.join(module_dir, 'run.sh')
30+
if not os.path.exists(run_sh):
31+
raise RuntimeError(f'{module_name} run.sh script does not exist')
32+
33+
# Constructing the command list
34+
command = [run_sh, output_dir, dataset]
35+
36+
# Adding input files with their respective keys
37+
if inputs_map:
38+
for k, v in inputs_map.items():
39+
command.extend([f"--{k}", v])
40+
41+
# Adding extra parameters
42+
if parameters:
43+
command.extend(parameters)
44+
45+
try:
46+
# Execute the shell script
47+
result = subprocess.run(command, check=True, capture_output=True, text=True)
48+
return result.stdout
49+
50+
except subprocess.CalledProcessError as e:
51+
raise RuntimeError(f'Error executing {run_sh}') from e
52+
53+
54+
# Create a unique folder name based on the repository URL and commit hash
55+
def generate_unique_repo_folder_name(repo_url, commit_hash):
56+
unique_string = f"{repo_url}@{commit_hash}"
57+
folder_name = hashlib.md5(unique_string.encode()).hexdigest()
58+
59+
return folder_name
60+
61+
62+
def clone_module(output_dir: str, repository_url: str, commit_hash: str):
63+
module_name = generate_unique_repo_folder_name(repository_url, commit_hash)
64+
module_dir = os.path.join(output_dir, module_name)
65+
66+
if not os.path.exists(module_dir):
67+
repo = Repo.clone_from(repository_url, module_dir)
68+
repo.git.checkout(commit_hash)
69+
else:
70+
repo = Repo(module_dir)
71+
72+
if repo.head.commit.hexsha[:7] != commit_hash:
73+
raise RuntimeError(f'WARNING: {commit_hash} does not match {repo.head.commit.hexsha[:7]}')
74+
75+
return module_dir
76+
77+
78+
def dump_parameters_to_file(output_dir: str, parameters: List[str]):
79+
os.makedirs(output_dir, exist_ok=True)
80+
2681
if parameters is not None:
2782
params_file = os.path.join(output_dir, 'parameters.txt')
2883
with open(params_file, 'w') as params_file:
@@ -35,17 +90,31 @@ def dump_parameters_to_file(output_dir: str, parameters: str):
3590

3691
try:
3792
snakemake: Snakemake = snakemake
38-
parameters = dict(snakemake.params)['parameters']
39-
output_dir = os.path.dirname(snakemake.output[0])
40-
os.makedirs(output_dir, exist_ok=True)
93+
params = dict(snakemake.params)
94+
parameters = params['parameters']
95+
repository_url = params['repository_url']
96+
commit_hash = params['commit_hash']
97+
inputs_map = params.get('inputs_map')
4198

99+
# Create parameters file for outputs
100+
output_dir = os.path.dirname(snakemake.output[0])
42101
dump_parameters_to_file(output_dir, parameters)
43102

44-
for out in snakemake.output:
45-
with open(out, 'w') as sys.stdout:
46-
mock_execution(inputs=snakemake.input,
47-
output=out,
48-
snakemake=snakemake)
103+
# Clone github repository
104+
repositories_dir = os.path.join(".snakemake", "repos")
105+
module_dir = clone_module(repositories_dir, repository_url, commit_hash)
106+
107+
# Execute module code
108+
module_name = snakemake.rule
109+
110+
# TODO Fix logic of inferring output dirname
111+
output_dir = os.path.commonpath(snakemake.output)
112+
if os.path.splitext(output_dir)[1] != '':
113+
output_dir = os.path.dirname(output_dir)
114+
115+
dataset = snakemake.wildcards.dataset
116+
execution(module_dir, module_name=module_name, output_dir=output_dir, dataset=dataset,
117+
inputs_map=inputs_map, parameters=parameters)
49118

50119
except NameError:
51120
raise RuntimeError("This script must be run from within a Snakemake workflow.")

0 commit comments

Comments
 (0)