diff --git a/Snakefile b/Snakefile index 683ae92f..140e0f71 100644 --- a/Snakefile +++ b/Snakefile @@ -3,8 +3,8 @@ from spras import runner import shutil import yaml from spras.dataset import Dataset -from spras.util import process_config from spras.analysis import ml, summary, graphspace, cytoscape +import spras.config as _config # Snakemake updated the behavior in the 6.5.0 release https://github.com/snakemake/snakemake/pull/1037 # and using the wrong separator prevents Snakemake from matching filenames to the rules that can produce them @@ -13,23 +13,27 @@ SEP = '/' wildcard_constraints: params="params-\w+" -config, datasets, out_dir, algorithm_params, algorithm_directed, pca_params, hac_params = process_config(config) +# Elsewhere we import this as config, but in the Snakefile, the variable config is already populated +# with the parsed config.yaml. This is done by Snakemake, which magically pipes config into this file +# without declaration! +_config.init_global(config) -# TODO consider the best way to pass global configuration information to the run functions -SINGULARITY = "singularity" in config and config["singularity"] -if SINGULARITY: - print('Running Singularity containers') -else: - print('Running Docker containers') +out_dir = _config.config.out_dir +algorithm_params = _config.config.algorithm_params +algorithm_directed = _config.config.algorithm_directed +pca_params = _config.config.pca_params +hac_params = _config.config.hac_params + +FRAMEWORK = _config.config.container_framework +print(f"Running {FRAMEWORK} containers") # Return the dataset dictionary from the config file given the label -def get_dataset(datasets, label): - return datasets[label] +def get_dataset(_datasets, label): + return _datasets[label] algorithms = list(algorithm_params) algorithms_with_params = [f'{algorithm}-params-{params_hash}' for algorithm, param_combos in algorithm_params.items() for params_hash in param_combos.keys()] - -dataset_labels = list(datasets.keys()) +dataset_labels = list(_config.config.datasets.keys()) # Get the parameter dictionary for the specified # algorithm and parameter combination hash @@ -46,7 +50,7 @@ def write_parameter_log(algorithm, param_label, logfile): # Log the dataset contents specified in the config file in a yaml file def write_dataset_log(dataset, logfile): - dataset_contents = get_dataset(datasets,dataset) + dataset_contents = get_dataset(_config.config.datasets,dataset) # safe_dump gives RepresenterError for an OrderedDict # config file has to convert the dataset from OrderedDict to dict to avoid this @@ -57,23 +61,22 @@ def write_dataset_log(dataset, logfile): def make_final_input(wildcards): final_input = [] - # TODO analysis could be parsed in the parse_config() function. - if config["analysis"]["summary"]["include"]: + if _config.config.analysis_include_summary: # add summary output file for each pathway # TODO: reuse in the future once we make summary work for mixed graphs. See https://github.com/Reed-CompBio/spras/issues/128 # final_input.extend(expand('{out_dir}{sep}{dataset}-{algorithm_params}{sep}summary.txt',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) # add table summarizing all pathways for each dataset final_input.extend(expand('{out_dir}{sep}{dataset}-pathway-summary.txt',out_dir=out_dir,sep=SEP,dataset=dataset_labels)) - if config["analysis"]["graphspace"]["include"]: + if _config.config.analysis_include_graphspace: # add graph and style JSON files. final_input.extend(expand('{out_dir}{sep}{dataset}-{algorithm_params}{sep}gs.json',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) final_input.extend(expand('{out_dir}{sep}{dataset}-{algorithm_params}{sep}gsstyle.json',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) - - if config["analysis"]["cytoscape"]["include"]: + + if _config.config.analysis_include_cytoscape: final_input.extend(expand('{out_dir}{sep}{dataset}-cytoscape.cys',out_dir=out_dir,sep=SEP,dataset=dataset_labels)) - if config["analysis"]["ml"]["include"]: + if _config.config.analysis_include_ml: final_input.extend(expand('{out_dir}{sep}{dataset}-pca.png',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) final_input.extend(expand('{out_dir}{sep}{dataset}-pca-variance.txt',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) final_input.extend(expand('{out_dir}{sep}{dataset}-hac-vertical.png',out_dir=out_dir,sep=SEP,dataset=dataset_labels,algorithm_params=algorithms_with_params)) @@ -117,7 +120,7 @@ rule log_datasets: # Return all files used in the dataset # Input preparation needs to be rerun if these files are modified def get_dataset_dependencies(wildcards): - dataset = datasets[wildcards.dataset] + dataset = _config.config.datasets[wildcards.dataset] all_files = dataset["node_files"] + dataset["edge_files"] + dataset["other_files"] # Add the relative file path all_files = [dataset["data_dir"] + SEP + data_file for data_file in all_files] @@ -131,7 +134,7 @@ rule merge_input: output: dataset_file = SEP.join([out_dir, '{dataset}-merged.pickle']) run: # Pass the dataset to PRRunner where the files will be merged and written to disk (i.e. pickled) - dataset_dict = get_dataset(datasets, wildcards.dataset) + dataset_dict = get_dataset(_config.config.datasets, wildcards.dataset) runner.merge_input(dataset_dict, output.dataset_file) # The checkpoint is like a rule but can be used in dynamic workflows @@ -207,9 +210,7 @@ rule reconstruct: # Remove the default placeholder parameter added for algorithms that have no parameters if 'spras_placeholder' in params: params.pop('spras_placeholder') - # TODO consider the best way to pass global configuration information to the run functions - # This approach requires that all run functions support a singularity option - params['singularity'] = SINGULARITY + params['container_framework'] = FRAMEWORK runner.run(wildcards.algorithm, params) # Original pathway reconstruction output to universal output @@ -246,7 +247,7 @@ rule viz_cytoscape: output: session = SEP.join([out_dir, '{dataset}-cytoscape.cys']) run: - cytoscape.run_cytoscape(input.pathways, output.session, SINGULARITY) + cytoscape.run_cytoscape(input.pathways, output.session, FRAMEWORK) # Write a single summary table for all pathways for each dataset diff --git a/config/config.yaml b/config/config.yaml index def7372b..b85c599b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,139 +1,148 @@ - # Global workflow control - - # The length of the hash used to identify a parameter combination - hash_length: 7 - # If true, use Singularity instead of Docker - # Singularity support is only available on Unix - singularity: false - - # This list of algorithms should be generated by a script which checks the filesystem for installs. - # It shouldn't be changed by mere mortals. (alternatively, we could add a path to executable for each algorithm - # in the list to reduce the number of assumptions of the program at the cost of making the config a little more involved) - # Each algorithm has an 'include' parameter. By toggling 'include' to true/false the user can change - # which algorithms are run in a given experiment. - # - # algorithm-specific parameters are embedded in lists so that users can specify multiple. If multiple - # parameters are specified then the algorithm will be run as many times as needed to cover all parameter - # combinations. For instance if we have the following: - # - name: "myAlg" - # params: - # include: true - # directed: true - # a: [1,2] - # b: [0.5,0.75] - # - # then myAlg will be run on (a=1,b=0.5),(a=1,b=0.75),(a=2,b=0.5), and (a=2,b=0,75). Pretty neat, but be - # careful: too many parameters might make your runs take a long time. - - algorithms: - - name: "pathlinker" - params: - include: true - run1: - k: range(100,201,100) - - - name: "omicsintegrator1" - params: - include: true - run1: - r: [5] - b: [5, 6] - w: np.linspace(0,5,2) - g: [3] - d: [10] - - - name: "omicsintegrator2" - params: - include: true - run1: - b: [4] - g: [0] - run2: - b: [2] - g: [3] - - - name: "meo" - params: - include: true - run1: - max_path_length: [3] - local_search: ["Yes"] - rand_restarts: [10] - - - name: "mincostflow" - params: - include: true - run1: - flow: [1] # The flow must be an int - capacity: [1] - - - name: "allpairs" - params: - include: true - - - name: "domino" - params: - include: true - run1: - slice_threshold: [0.3] - module_threshold: [0.05] - - - # Here we specify which pathways to run and other file location information. - # DataLoader.py can currently only load a single dataset - # Assume that if a dataset label does not change, the lists of associated input files do not change - datasets: - - - label: data0 - node_files: ["node-prizes.txt", "sources.txt", "targets.txt"] - # DataLoader.py can currently only load a single edge file, which is the primary network - edge_files: ["network.txt"] - # Placeholder - other_files: [] - # Relative path from the spras directory - data_dir: "input" - - - label: data1 - # Reuse some of the same sources file as 'data0' but different network and targets - node_files: ["node-prizes.txt", "sources.txt", "alternative-targets.txt"] - edge_files: ["alternative-network.txt"] - other_files: [] - # Relative path from the spras directory - data_dir: "input" - - # If we want to reconstruct then we should set run to true. - # TODO: if include is true above but run is false here, algs are not run. - # is this the behavior we want? - reconstruction_settings: - - #set where everything is saved - locations: - - #place the save path here - # TODO move to global - reconstruction_dir: "output" - - run: true - - analysis: - # Create one summary per pathway file and a single summary table for all pathways for each dataset - summary: - include: true - # Create output files for each pathway that can be visualized with GraphSpace - graphspace: - include: true - # Create Cytoscape session file with all pathway graphs for each dataset - cytoscape: - include: true - # Machine learning analysis (e.g. clustering) of the pathway output files for each dataset - ml: - include: true - # specify how many principal components to calculate - components: 2 - # boolean to show the labels on the pca graph - labels: true - # 'ward', 'complete', 'average', 'single' - # if linkage: ward, must use metric: euclidean - linkage: 'ward' - # 'euclidean', 'manhattan', 'cosine' - metric: 'euclidean' +# Global workflow control + +# The length of the hash used to identify a parameter combination +hash_length: 7 + +# Specify the container framework. Current supported versions include 'docker' and +# 'singularity'. If container_framework is not specified, SPRAS will default to docker. +container_framework: docker + +# Allow the user to configure which container registry containers should be pulled from +# Note that this assumes container names are consistent across registries, and that the +# registry being passed doesn't require authentication for pull actions +container_registry: + base_url: docker.io + # The owner or project of the registry + # For example, "reedcompbio" if the image is available as docker.io/reedcompbio/allpairs + owner: reedcompbio + +# This list of algorithms should be generated by a script which checks the filesystem for installs. +# It shouldn't be changed by mere mortals. (alternatively, we could add a path to executable for each algorithm +# in the list to reduce the number of assumptions of the program at the cost of making the config a little more involved) +# Each algorithm has an 'include' parameter. By toggling 'include' to true/false the user can change +# which algorithms are run in a given experiment. +# +# algorithm-specific parameters are embedded in lists so that users can specify multiple. If multiple +# parameters are specified then the algorithm will be run as many times as needed to cover all parameter +# combinations. For instance if we have the following: +# - name: "myAlg" +# params: +# include: true +# a: [1,2] +# b: [0.5,0.75] +# +# then myAlg will be run on (a=1,b=0.5),(a=1,b=0.75),(a=2,b=0.5), and (a=2,b=0,75). Pretty neat, but be +# careful: too many parameters might make your runs take a long time. + +algorithms: + - name: "pathlinker" + params: + include: false + run1: + k: range(100,201,100) + + - name: "omicsintegrator1" + params: + include: true + run1: + r: [5] + b: [5, 6] + w: np.linspace(0,5,2) + g: [3] + d: [10] + + - name: "omicsintegrator2" + params: + include: true + run1: + b: [4] + g: [0] + run2: + b: [2] + g: [3] + + - name: "meo" + params: + include: true + run1: + max_path_length: [3] + local_search: ["Yes"] + rand_restarts: [10] + + - name: "mincostflow" + params: + include: true + run1: + flow: [1] # The flow must be an int + capacity: [1] + + - name: "allpairs" + params: + include: true + + - name: "domino" + params: + include: true + run1: + slice_threshold: [0.3] + module_threshold: [0.05] + + +# Here we specify which pathways to run and other file location information. +# DataLoader.py can currently only load a single dataset +# Assume that if a dataset label does not change, the lists of associated input files do not change +datasets: + - + label: data0 + node_files: ["node-prizes.txt", "sources.txt", "targets.txt"] + # DataLoader.py can currently only load a single edge file, which is the primary network + edge_files: ["network.txt"] + # Placeholder + other_files: [] + # Relative path from the spras directory + data_dir: "input" + - + label: data1 + # Reuse some of the same sources file as 'data0' but different network and targets + node_files: ["node-prizes.txt", "sources.txt", "alternative-targets.txt"] + edge_files: ["alternative-network.txt"] + other_files: [] + # Relative path from the spras directory + data_dir: "input" + +# If we want to reconstruct then we should set run to true. +# TODO: if include is true above but run is false here, algs are not run. +# is this the behavior we want? +reconstruction_settings: + + #set where everything is saved + locations: + + #place the save path here + # TODO move to global + reconstruction_dir: "output" + + run: true + +analysis: + # Create one summary per pathway file and a single summary table for all pathways for each dataset + summary: + include: true + # Create output files for each pathway that can be visualized with GraphSpace + graphspace: + include: true + # Create Cytoscape session file with all pathway graphs for each dataset + cytoscape: + include: true + # Machine learning analysis (e.g. clustering) of the pathway output files for each dataset + ml: + include: true + # specify how many principal components to calculate + components: 2 + # boolean to show the labels on the pca graph + labels: true + # 'ward', 'complete', 'average', 'single' + # if linkage: ward, must use metric: euclidean + linkage: 'ward' + # 'euclidean', 'manhattan', 'cosine' + metric: 'euclidean' diff --git a/spras/allpairs.py b/spras/allpairs.py index 3de9029d..42f1041d 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -3,12 +3,12 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import ( convert_directed_to_undirected, reinsert_direction_col_undirected, ) from spras.prm import PRM -from spras.util import prepare_volume, run_container __all__ = ['AllPairs'] @@ -59,12 +59,12 @@ def generate_inputs(data, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(nodetypes=None, network=None, output_file=None, singularity=False): + def run(nodetypes=None, network=None, output_file=None, container_framework="docker"): """ Run All Pairs Shortest Paths with Docker @param nodetypes: input node types with sources and targets (required) @param network: input network file (required) - @param singularity: if True, run using the Singularity container instead of the Docker container + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) @param output_file: path to the output pathway file (required) """ if not nodetypes or not network or not output_file: @@ -94,9 +94,11 @@ def run(nodetypes=None, network=None, output_file=None, singularity=False): print('Running All Pairs Shortest Paths with arguments: {}'.format(' '.join(command)), flush=True) - container_framework = 'singularity' if singularity else 'docker' - out = run_container(container_framework, - 'reedcompbio/allpairs', + container_suffix = "allpairs" + + out = run_container( + container_framework, + container_suffix, command, volumes, work_dir) diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py index f39fff6d..beab1de8 100644 --- a/spras/analysis/cytoscape.py +++ b/spras/analysis/cytoscape.py @@ -2,15 +2,15 @@ from shutil import rmtree from typing import List, Union -from spras.util import prepare_volume, run_container +from spras.containers import prepare_volume, run_container -def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, singularity: bool = False) -> None: +def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, container_framework="docker") -> None: """ Create a Cytoscape session file with visualizations of each of the provided pathways @param pathways: a list of pathways to visualize @param output_file: the output Cytoscape session file - @param singularity: whether to run in a Singularity container + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ work_dir = '/spras' @@ -48,10 +48,9 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, singul print('Running Cytoscape with arguments: {}'.format(' '.join(command)), flush=True) - # TODO consider making this a string in the config file instead of a Boolean - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "py4cytoscape:v2" out = run_container(container_framework, - 'reedcompbio/py4cytoscape:v2', + container_suffix, command, volumes, work_dir, diff --git a/spras/config.py b/spras/config.py new file mode 100644 index 00000000..100b2779 --- /dev/null +++ b/spras/config.py @@ -0,0 +1,211 @@ +""" +This config file is being used as a singleton. Because python creates a single instance +of modules when they're imported, we rely on the Snakefile instantiating the module. +In particular, when the Snakefile calls init_config, it will reassign config +to take the value of the actual config provided by Snakemake. After that point, any +module that imports this module can access a config option by checking the object's +value. For example + +import spras.config as config +container_framework = config.config.container_framework + +will grab the top level registry configuration option as it appears in the config file +""" + +import copy as copy +import itertools as it +import os + +import numpy as np +import yaml + +from spras.util import hash_params_sha1_base32 + +# The default length of the truncated hash used to identify parameter combinations +DEFAULT_HASH_LENGTH = 7 +DEFAULT_CONTAINER_PREFIX = "docker.io/reedcompbio" + +config = None + +# This will get called in the Snakefile, instantiating the singleton with the raw config +def init_global(config_dict): + global config + config = Config(config_dict) + +def init_from_file(filepath): + global config + + # Handle opening the file and parsing the yaml + filepath = os.path.abspath(filepath) + try: + with open(filepath, 'r') as yaml_file: + config_dict = yaml.safe_load(yaml_file) + except FileNotFoundError: + print(f"Error: The specified config '{filepath}' could not be found.") + return False + except yaml.YAMLError as e: + print(f"Error: Failed to parse config '{filepath}': {e}") + return False + + # And finally, initialize + config = Config(config_dict) + + +class Config: + def __init__(self, raw_config): + # Since process_config winds up modifying the raw_config passed to it as a side effect, + # we'll make a deep copy here to guarantee we don't break anything. This preserves the + # config as it's given to the Snakefile by Snakemake + + # Member vars populated by process_config. Set to None before they are populated so that our + # __init__ makes clear exactly what is being configured. + # Directory used for storing output + self.out_dir = None + # Container framework used by PRMs. Valid options are "docker" and "singularity" + self.container_framework = None + # The container prefix (host and organization) to use for images. Default is "docker.io/reedcompbio" + self.container_prefix = DEFAULT_CONTAINER_PREFIX + # A dictionary to store configured datasets against which SPRAS will be run + self.datasets = None + # The hash length SPRAS will use to identify parameter combinations. Default is 7 + self.hash_length = DEFAULT_HASH_LENGTH + # The list of algorithms to run in the workflow. Each is a dict with 'name' as an expected key. + self.algorithms = None + # A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations. + # Only includes algorithms that are set to be run with 'include: true'. + self.algorithm_params = None + # Deprecated. Previously a dict mapping algorithm names to a Boolean tracking whether they used directed graphs. + self.algorithm_directed = None + # A dict with the analysis settings + self.analysis_params = None + # A dict with the ML settings + self.ml_params = None + # A dict with the PCA settings + self.pca_params = None + # A dict with the hierarchical clustering settings + self.hac_params = None + # A Boolean specifying whether to run the summary analysis + self.analysis_include_summary = None + # A Boolean specifying whether to run the GraphSpace analysis + self.analysis_include_graphspace = None + # A Boolean specifying whether to run the Cytoscape analysis + self.analysis_include_cytoscape = None + # A Boolean specifying whether to run the ML analysis + self.analysis_include_ml = None + + _raw_config = copy.deepcopy(raw_config) + self.process_config(_raw_config) + + def process_config(self, raw_config): + if raw_config == {}: + raise ValueError("Config file cannot be empty. Use --configfile to set a config file.") + + # Set up a few top-level config variables + self.out_dir = raw_config["reconstruction_settings"]["locations"]["reconstruction_dir"] + + # We allow the container framework not to be defined in the config. In the case it isn't, default to docker. + # However, if we get a bad value, we raise an exception. + if "container_framework" in raw_config: + container_framework = raw_config["container_framework"].lower() + if container_framework not in ("docker", "singularity"): + msg = "SPRAS was configured to run with an unknown container framework: '" + raw_config["container_framework"] + "'. Accepted values are 'docker' or 'singularity'." + raise ValueError(msg) + self.container_framework = container_framework + else: + self.container_framework = "docker" + + # Grab registry from the config, and if none is provided default to docker + if "container_registry" in raw_config and raw_config["container_registry"]["base_url"] != "" and raw_config["container_registry"]["owner"] != "": + self.container_prefix = raw_config["container_registry"]["base_url"] + "/" + raw_config["container_registry"]["owner"] + + # Parse dataset information + # Datasets is initially a list, where each list entry has a dataset label and lists of input files + # Convert the dataset list into a dict where the label is the key and update the config data structure + # TODO allow labels to be optional and assign default labels + # TODO check for collisions in dataset labels, warn, and make the labels unique + # Need to work more on input file naming to make less strict assumptions + # about the filename structure + # Currently assumes all datasets have a label and the labels are unique + # When Snakemake parses the config file it loads the datasets as OrderedDicts not dicts + # Convert to dicts to simplify the yaml logging + self.datasets = {dataset["label"]: dict(dataset) for dataset in raw_config["datasets"]} + + # Code snipped from Snakefile that may be useful for assigning default labels + # dataset_labels = [dataset.get('label', f'dataset{index}') for index, dataset in enumerate(datasets)] + # Maps from the dataset label to the dataset list index + # dataset_dict = {dataset.get('label', f'dataset{index}'): index for index, dataset in enumerate(datasets)} + + # Override the default parameter hash length if specified in the config file + if "hash_length" in raw_config and raw_config["hash_length"] != "": + self.hash_length = int(raw_config["hash_length"]) + + prior_params_hashes = set() + + # Parse algorithm information + # Each algorithm's parameters are provided as a list of dictionaries + # Defaults are handled in the Python function or class that wraps + # running that algorithm + # Keys in the parameter dictionary are strings + self.algorithm_params = dict() + self.algorithm_directed = dict() + self.algorithms = raw_config["algorithms"] + for alg in self.algorithms: + cur_params = alg["params"] + if "include" in cur_params and cur_params.pop("include"): + # This dict maps from parameter combinations hashes to parameter combination dictionaries + self.algorithm_params[alg["name"]] = dict() + else: + # Do not parse the rest of the parameters for this algorithm if it is not included + continue + + if "directed" in cur_params: + print("UPDATE: we no longer use the directed key in the config file") + cur_params.pop("directed") + + # The algorithm has no named arguments so create a default placeholder + if len(cur_params) == 0: + cur_params["run1"] = {"spras_placeholder": ["no parameters"]} + + # Each set of runs should be 1 level down in the config file + for run_params in cur_params: + all_runs = [] + + # We create the product of all param combinations for each run + param_name_list = [] + if cur_params[run_params]: + for p in cur_params[run_params]: + param_name_list.append(p) + all_runs.append(eval(str(cur_params[run_params][p]))) + run_list_tuples = list(it.product(*all_runs)) + param_name_tuple = tuple(param_name_list) + for r in run_list_tuples: + run_dict = dict(zip(param_name_tuple, r)) + # TODO temporary workaround for yaml.safe_dump in Snakefile write_parameter_log + for param, value in run_dict.copy().items(): + if isinstance(value, np.float64): + run_dict[param] = float(value) + params_hash = hash_params_sha1_base32(run_dict, self.hash_length) + if params_hash in prior_params_hashes: + raise ValueError(f'Parameter hash collision detected. Increase the hash_length in the config file ' + f'(current length {self.hash_length}).') + self.algorithm_params[alg["name"]][params_hash] = run_dict + + self.analysis_params = raw_config["analysis"] if "analysis" in raw_config else {} + self.ml_params = self.analysis_params["ml"] if "ml" in self.analysis_params else {} + + self.pca_params = {} + if "components" in self.ml_params: + self.pca_params["components"] = self.ml_params["components"] + if "labels" in self.ml_params: + self.pca_params["labels"] = self.ml_params["labels"] + + self.hac_params = {} + if "linkage" in self.ml_params: + self.hac_params["linkage"] = self.ml_params["linkage"] + if "metric" in self.ml_params: + self.hac_params["metric"] = self.ml_params ["metric"] + + self.analysis_include_summary = raw_config["analysis"]["summary"]["include"] + self.analysis_include_graphspace = raw_config["analysis"]["graphspace"]["include"] + self.analysis_include_cytoscape = raw_config["analysis"]["cytoscape"]["include"] + self.analysis_include_ml = raw_config["analysis"]["ml"]["include"] diff --git a/spras/containers.py b/spras/containers.py new file mode 100644 index 00000000..bdb18acd --- /dev/null +++ b/spras/containers.py @@ -0,0 +1,234 @@ +import os +import platform +import re +from pathlib import Path, PurePath, PurePosixPath +from typing import Any, Dict, List, Optional, Tuple, Union + +import docker + +import spras.config as config +from spras.util import hash_filename + + +def prepare_path_docker(orig_path: PurePath) -> str: + """ + Prepare an absolute path for mounting as a Docker volume. + Converts Windows file separators to posix separators. + Converts Windows drive letters in absolute paths. + """ + # TODO consider testing PurePath.is_absolute() + prepared_path = orig_path.as_posix() + # Check whether the path matches an absolute Windows path with a drive letter + match = re.match(r'^([A-Z]:)(.*)', prepared_path) + if match: + # The first group is the drive such as C: + drive = match.group(1).lower()[0] + # The second group is the rest of the path such as /Users/me + prepared_path = match.group(2) + prepared_path = '//' + drive + prepared_path + return prepared_path + + +def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Union[str, PurePath]) -> PurePosixPath: + """ + Convert a file_path that is in src_path to be in dest_path instead. + For example, convert /usr/mydir and /usr/mydir/myfile and /tmp to /tmp/myfile + @param src_path: source path that is a parent of file_path + @param dest_path: destination path + @param file_path: filename that is under the source path + @return: a new path with the filename relative to the destination path + """ + rel_path = file_path.relative_to(src_path) + return PurePosixPath(dest_path, rel_path) + + +# TODO consider a better default environment variable +# TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list? +# run_container_singularity assumes a single string +# Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) +# Technically the argument is an image, not a container, but we use container here. +def run_container(framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): + """ + Runs a command in the container using Singularity or Docker + @param framework: singularity or docker + @param container_suffix: name of the DockerHub container without the 'docker://' prefix + @param command: command to run in the container + @param volumes: a list of volumes to mount where each item is a (source, destination) tuple + @param working_dir: the working directory in the container + @param environment: environment variables to set in the container + @return: output from Singularity execute or Docker run + """ + normalized_framework = framework.casefold() + + container = config.config.container_prefix + "/" + container_suffix + if normalized_framework == 'docker': + return run_container_docker(container, command, volumes, working_dir, environment) + elif normalized_framework == 'singularity': + return run_container_singularity(container, command, volumes, working_dir, environment) + else: + raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".') + + +# TODO any issue with creating a new client each time inside this function? +def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): + """ + Runs a command in the container using Docker. + Attempts to automatically correct file owner and group for new files created by the container, setting them to the + current owner and group IDs. + Does not modify the owner or group for existing files modified by the container. + @param container: name of the DockerHub container without the 'docker://' prefix + @param command: command to run in the container + @param volumes: a list of volumes to mount where each item is a (source, destination) tuple + @param working_dir: the working directory in the container + @param environment: environment variables to set in the container + @return: output from Docker run + """ + out = None + try: + # Initialize a Docker client using environment variables + client = docker.from_env() + # Track the contents of the local directories that will be bound so that new files added can have their owner + # changed + pre_volume_contents = {} + src_dest_map = {} + for src, dest in volumes: + src_path = Path(src) + # The same source path can be in volumes more than once if there were multiple input or output files + # in the same directory + # Only check each unique source path once and track which of the possible destination paths was used + if src_path not in pre_volume_contents: + # Only list files in the directory, do not walk recursively because it could include + # a massive number of files + pre_volume_contents[src_path] = set(src_path.iterdir()) + src_dest_map[src_path] = dest + + bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] + + out = client.containers.run(container, + command, + stderr=True, + volumes=bind_paths, + working_dir=working_dir, + environment=[environment]).decode('utf-8') + + # TODO does this cleanup need to still run even if there was an error in the above run command? + # On Unix, files written by the above Docker run command will be owned by root and cannot be modified + # outside the container by a non-root user + # Reset the file owner and the group inside the container + try: + # Only available on Unix + uid = os.getuid() + gid = os.getgid() + + all_modified_volume_contents = set() + for src_path in pre_volume_contents.keys(): + # Assumes the Docker run call is the only process that modified the contents + # Only considers files that were added, not files that were modified + post_volume_contents = set(src_path.iterdir()) + modified_volume_contents = post_volume_contents - pre_volume_contents[src_path] + modified_volume_contents = [str(convert_docker_path(src_path, src_dest_map[src_path], p)) for p in + modified_volume_contents] + all_modified_volume_contents.update(modified_volume_contents) + + # This command changes the ownership of output files so we don't + # get a permissions error when snakemake or the user try to touch the files + # Use --recursive because new directories could have been created inside the container + # Do not run the command if no files were modified + if len(all_modified_volume_contents) > 0: + chown_command = ['chown', f'{uid}:{gid}', '--recursive'] + chown_command.extend(all_modified_volume_contents) + chown_command = ' '.join(chown_command) + client.containers.run(container, + chown_command, + stderr=True, + volumes=bind_paths, + working_dir=working_dir, + environment=[environment]).decode('utf-8') + + # Raised on non-Unix systems + except AttributeError: + pass + + # TODO: Not sure whether this is needed or where to close the client + client.close() + + except Exception as err: + print(err) + # Removed the finally block to address bugbear B012 + # "`return` inside `finally` blocks cause exceptions to be silenced" + # finally: + return out + + +def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): + """ + Runs a command in the container using Singularity. + Only available on Linux. + @param container: name of the DockerHub container without the 'docker://' prefix + @param command: command to run in the container + @param volumes: a list of volumes to mount where each item is a (source, destination) tuple + @param working_dir: the working directory in the container + @param environment: environment variable to set in the container + @return: output from Singularity execute + """ + # spython is not compatible with Windows + if platform.system() != 'Linux': + raise NotImplementedError('Singularity support is only available on Linux') + + # See https://stackoverflow.com/questions/3095071/in-python-what-happens-when-you-import-inside-of-a-function + from spython.main import Client + + bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] + + # TODO is try/finally needed for Singularity? + singularity_options = ['--cleanenv', '--containall', '--pwd', working_dir] + # Singularity does not allow $HOME to be set as a regular environment variable + # Capture it and use the special argument instead + if environment.startswith('HOME='): + home_dir = environment[5:] + singularity_options.extend(['--home', home_dir]) + else: + singularity_options.extend(['--env', environment]) + + # To debug a container add the execute arguments: singularity_options=['--debug'], quiet=False + # Adding 'docker://' to the container indicates this is a Docker image Singularity must convert + return Client.execute('docker://' + container, + command, + options=singularity_options, + bind=bind_paths) + +# Because this is called independently for each file, the same local path can be mounted to multiple volumes +def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PurePath]) -> Tuple[Tuple[PurePath, PurePath], str]: + """ + Makes a file on the local file system accessible within a container by mapping the local (source) path to a new + container (destination) path and renaming the file to be relative to the destination path. + The destination path will be a new path relative to the volume_base that includes a hash identifier derived from the + original filename. + An example mapped filename looks like '/spras/MG4YPNK/oi1-edges.txt'. + @param filename: The file on the local file system to map + @param volume_base: The base directory in the container, which must be an absolute directory + @return: first returned object is a tuple (source path, destination path) and the second returned object is the + updated filename relative to the destination path + """ + base_path = PurePosixPath(volume_base) + if not base_path.is_absolute(): + raise ValueError(f'Volume base must be an absolute path: {volume_base}') + + if isinstance(filename, PurePath): + filename = str(filename) + + filename_hash = hash_filename(filename, config.config.hash_length) + dest = PurePosixPath(base_path, filename_hash) + + abs_filename = Path(filename).resolve() + container_filename = str(PurePosixPath(dest, abs_filename.name)) + if abs_filename.is_dir(): + dest = PurePosixPath(dest, abs_filename.name) + src = abs_filename + else: + parent = abs_filename.parent + if parent.as_posix() == '.': + parent = Path.cwd() + src = parent + + return (src, dest), container_filename diff --git a/spras/domino.py b/spras/domino.py index 53434e3f..f9f9f146 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -3,12 +3,12 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import ( add_constant, reinsert_direction_col_undirected, ) from spras.prm import PRM -from spras.util import prepare_volume, run_container __all__ = ['DOMINO', 'pre_domino_id_transform', 'post_domino_id_transform'] @@ -70,7 +70,7 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(network=None, active_genes=None, output_file=None, slice_threshold=None, module_threshold=None, singularity=False): + def run(network=None, active_genes=None, output_file=None, slice_threshold=None, module_threshold=None, container_framework="docker"): """ Run DOMINO with Docker. Let visualization be always true, parallelization be always 1 thread, and use_cache be always false. @@ -80,7 +80,7 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None, @param output_file: path to the output pathway file (required) @param slice_threshold: the p-value threshold for considering a slice as relevant (optional) @param module_threshold: the p-value threshold for considering a putative module as final module (optional) - @param singularity: if True, run using the Singularity container instead of the Docker container (optional) + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ if not network or not active_genes or not output_file: @@ -113,9 +113,9 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None, print('Running slicer with arguments: {}'.format(' '.join(slicer_command)), flush=True) - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "domino" slicer_out = run_container(container_framework, - 'reedcompbio/domino', + container_suffix, slicer_command, volumes, work_dir) @@ -141,7 +141,7 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None, print('Running DOMINO with arguments: {}'.format(' '.join(domino_command)), flush=True) domino_out = run_container(container_framework, - 'reedcompbio/domino', + container_suffix, domino_command, volumes, work_dir) diff --git a/spras/meo.py b/spras/meo.py index d9cf2f24..ba962d92 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -2,12 +2,13 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import ( add_directionality_constant, reinsert_direction_col_directed, ) from spras.prm import PRM -from spras.util import add_rank_column, prepare_volume, run_container +from spras.util import add_rank_column __all__ = ['MEO', 'write_properties'] @@ -105,7 +106,7 @@ def generate_inputs(data, filename_map): # TODO document required arguments @staticmethod def run(edges=None, sources=None, targets=None, output_file=None, max_path_length=None, local_search=None, - rand_restarts=None, singularity=False): + rand_restarts=None, container_framework="docker"): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. @@ -114,7 +115,7 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt Only the edge output file is retained. All other output files are deleted. @param output_file: the name of the output edge file, which will overwrite any existing file with this name - @param singularity: if True, run using the Singularity container instead of the Docker container + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ if edges is None or sources is None or targets is None or output_file is None: raise ValueError('Required Maximum Edge Orientation arguments are missing') @@ -157,10 +158,9 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt print('Running Maximum Edge Orientation with arguments: {}'.format(' '.join(command)), flush=True) - # TODO consider making this a string in the config file instead of a Boolean - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "meo" out = run_container(container_framework, - 'reedcompbio/meo', + container_suffix, command, volumes, work_dir) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index 24ee9677..e3c75708 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -2,12 +2,13 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import ( convert_undirected_to_directed, reinsert_direction_col_undirected, ) from spras.prm import PRM -from spras.util import add_rank_column, prepare_volume, run_container +from spras.util import add_rank_column __all__ = ['MinCostFlow'] @@ -60,7 +61,7 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(sources=None, targets=None, edges=None, output_file=None, flow=None, capacity=None, singularity=False): + def run(sources=None, targets=None, edges=None, output_file=None, flow=None, capacity=None, container_framework="docker"): """ Run min cost flow with Docker (or singularity) @param sources: input sources (required) @@ -69,7 +70,7 @@ def run(sources=None, targets=None, edges=None, output_file=None, flow=None, cap @param output_file: output file name (required) @param flow: amount of flow going through the graph (optional) @param capacity: amount of capacity allowed on each edge (optional) - @param singularity: if True, run using the Singularity container instead of the Docker container (optional) + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ # ensures that these parameters are required @@ -113,11 +114,11 @@ def run(sources=None, targets=None, edges=None, output_file=None, flow=None, cap command.extend(['--capacity', str(capacity)]) # choosing to run in docker or singularity container - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "mincostflow" # constructs a docker run call out = run_container(container_framework, - 'reedcompbio/mincostflow', + container_suffix, command, volumes, work_dir) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 6e2c6807..e23ea9f9 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -2,9 +2,10 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import reinsert_direction_col_mixed from spras.prm import PRM -from spras.util import add_rank_column, prepare_volume, run_container +from spras.util import add_rank_column __all__ = ['OmicsIntegrator1', 'write_conf'] @@ -92,7 +93,7 @@ def generate_inputs(data, filename_map): @staticmethod def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms=None, output_file=None, noisy_edges=None, shuffled_prizes=None, random_terminals=None, - seed=None, w=None, b=None, d=None, mu=None, noise=None, g=None, r=None, singularity=False): + seed=None, w=None, b=None, d=None, mu=None, noise=None, g=None, r=None, container_framework="docker"): """ Run Omics Integrator 1 in the Docker image with the provided parameters. Does not support the garnet, cyto30, knockout, cv, or cv-reps arguments. @@ -103,7 +104,7 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms All other output files are deleted. @param output_file: the name of the output sif file for the optimal forest, which will overwrite any existing file with this name - @param singularity: if True, run using the Singularity container instead of the Docker container + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ if edges is None or prizes is None or output_file is None or w is None or b is None or d is None: raise ValueError('Required Omics Integrator 1 arguments are missing') @@ -158,10 +159,9 @@ def run(edges=None, prizes=None, dummy_mode=None, mu_squared=None, exclude_terms print('Running Omics Integrator 1 with arguments: {}'.format(' '.join(command)), flush=True) - # TODO consider making this a string in the config file instead of a Boolean - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "omics-integrator-1:no-conda" # no-conda version is the default out = run_container(container_framework, - 'reedcompbio/omics-integrator-1:no-conda', # no-conda version is the default + container_suffix, # no-conda version is the default command, volumes, work_dir, diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index 5099b8d9..473600a8 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -2,10 +2,11 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.dataset import Dataset from spras.interactome import reinsert_direction_col_undirected from spras.prm import PRM -from spras.util import add_rank_column, prepare_volume, run_container +from spras.util import add_rank_column __all__ = ['OmicsIntegrator2'] @@ -67,12 +68,13 @@ def generate_inputs(data: Dataset, filename_map): # TODO document required arguments @staticmethod def run(edges=None, prizes=None, output_file=None, w=None, b=None, g=None, noise=None, noisy_edges=None, - random_terminals=None, dummy_mode=None, seed=None, singularity=False): + random_terminals=None, dummy_mode=None, seed=None, container_framework="docker"): """ Run Omics Integrator 2 in the Docker image with the provided parameters. Only the .tsv output file is retained and then renamed. All other output files are deleted. @param output_file: the name of the output file, which will overwrite any existing file with this name + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ if edges is None or prizes is None or output_file is None: raise ValueError('Required Omics Integrator 2 arguments are missing') @@ -118,9 +120,9 @@ def run(edges=None, prizes=None, output_file=None, w=None, b=None, g=None, noise print('Running Omics Integrator 2 with arguments: {}'.format(' '.join(command)), flush=True) - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "omics-integrator-2:v2" out = run_container(container_framework, - 'reedcompbio/omics-integrator-2:v2', + container_suffix, command, volumes, work_dir) diff --git a/spras/pathlinker.py b/spras/pathlinker.py index 85b38fe9..cfc653c6 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -3,12 +3,12 @@ import pandas as pd +from spras.containers import prepare_volume, run_container from spras.interactome import ( convert_undirected_to_directed, reinsert_direction_col_directed, ) from spras.prm import PRM -from spras.util import prepare_volume, run_container __all__ = ['PathLinker'] @@ -67,14 +67,14 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(nodetypes=None, network=None, output_file=None, k=None, singularity=False): + def run(nodetypes=None, network=None, output_file=None, k=None, container_framework="docker"): """ Run PathLinker with Docker @param nodetypes: input node types with sources and targets (required) @param network: input network file (required) @param output_file: path to the output pathway file (required) @param k: path length (optional) - @param singularity: if True, run using the Singularity container instead of the Docker container + @param container_framework: choose the container runtime framework, currently supports "docker" or "singularity" (optional) """ # Add additional parameter validation # Do not require k @@ -115,10 +115,9 @@ def run(nodetypes=None, network=None, output_file=None, k=None, singularity=Fals print('Running PathLinker with arguments: {}'.format(' '.join(command)), flush=True) - # TODO consider making this a string in the config file instead of a Boolean - container_framework = 'singularity' if singularity else 'docker' + container_suffix = "pathlinker" out = run_container(container_framework, - 'reedcompbio/pathlinker', + container_suffix, command, volumes, work_dir) diff --git a/spras/util.py b/spras/util.py index 7d09c643..ea6cd952 100644 --- a/spras/util.py +++ b/spras/util.py @@ -4,207 +4,12 @@ import base64 import hashlib -import itertools as it import json -import os -import platform -import re from pathlib import Path, PurePath, PurePosixPath from typing import Any, Dict, List, Optional, Tuple, Union -import docker -import numpy as np # Required to eval some forms of parameter ranges import pandas as pd -# The default length of the truncated hash used to identify parameter combinations -DEFAULT_HASH_LENGTH = 7 - - -def prepare_path_docker(orig_path: PurePath) -> str: - """ - Prepare an absolute path for mounting as a Docker volume. - Converts Windows file separators to posix separators. - Converts Windows drive letters in absolute paths. - """ - # TODO consider testing PurePath.is_absolute() - prepared_path = orig_path.as_posix() - # Check whether the path matches an absolute Windows path with a drive letter - match = re.match(r'^([A-Z]:)(.*)', prepared_path) - if match: - # The first group is the drive such as C: - drive = match.group(1).lower()[0] - # The second group is the rest of the path such as /Users/me - prepared_path = match.group(2) - prepared_path = '//' + drive + prepared_path - return prepared_path - - -def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Union[str, PurePath]) -> PurePosixPath: - """ - Convert a file_path that is in src_path to be in dest_path instead. - For example, convert /usr/mydir and /usr/mydir/myfile and /tmp to /tmp/myfile - @param src_path: source path that is a parent of file_path - @param dest_path: destination path - @param file_path: filename that is under the source path - @return: a new path with the filename relative to the destination path - """ - rel_path = file_path.relative_to(src_path) - return PurePosixPath(dest_path, rel_path) - - -# TODO consider a better default environment variable -# TODO environment currently a single string (e.g. 'TMPDIR=/OmicsIntegrator1'), should it be a list? -# run_container_singularity assumes a single string -# Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) -# Technically the argument is an image, not a container, but we use container here. -def run_container(framework: str, container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): - """ - Runs a command in the container using Singularity or Docker - @param framework: singularity or docker - @param container: name of the DockerHub container without the 'docker://' prefix - @param command: command to run in the container - @param volumes: a list of volumes to mount where each item is a (source, destination) tuple - @param working_dir: the working directory in the container - @param environment: environment variable to set in the container - @return: output from Singularity execute or Docker run - """ - normalized_framework = framework.casefold() - if normalized_framework == 'docker': - return run_container_docker(container, command, volumes, working_dir, environment) - elif normalized_framework == 'singularity': - return run_container_singularity(container, command, volumes, working_dir, environment) - else: - raise ValueError(f'{framework} is not a recognized container framework. Choose "docker" or "singularity".') - - -# TODO any issue with creating a new client each time inside this function? -def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): - """ - Runs a command in the container using Docker. - Attempts to automatically correct file owner and group for new files created by the container, setting them to the - current owner and group IDs. - Does not modify the owner or group for existing files modified by the container. - @param container: name of the DockerHub container without the 'docker://' prefix - @param command: command to run in the container - @param volumes: a list of volumes to mount where each item is a (source, destination) tuple - @param working_dir: the working directory in the container - @param environment: environment variable to set in the container - @return: output from Docker run - """ - out = None - try: - # Initialize a Docker client using environment variables - client = docker.from_env() - # Track the contents of the local directories that will be bound so that new files added can have their owner - # changed - pre_volume_contents = {} - src_dest_map = {} - for src, dest in volumes: - src_path = Path(src) - # The same source path can be in volumes more than once if there were multiple input or output files - # in the same directory - # Only check each unique source path once and track which of the possible destination paths was used - if src_path not in pre_volume_contents: - # Only list files in the directory, do not walk recursively because it could include - # a massive number of files - pre_volume_contents[src_path] = set(src_path.iterdir()) - src_dest_map[src_path] = dest - - bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - - out = client.containers.run(container, - command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - environment=[environment]).decode('utf-8') - - # TODO does this cleanup need to still run even if there was an error in the above run command? - # On Unix, files written by the above Docker run command will be owned by root and cannot be modified - # outside the container by a non-root user - # Reset the file owner and the group inside the container - try: - # Only available on Unix - uid = os.getuid() - gid = os.getgid() - - all_modified_volume_contents = set() - for src_path in pre_volume_contents.keys(): - # Assumes the Docker run call is the only process that modified the contents - # Only considers files that were added, not files that were modified - post_volume_contents = set(src_path.iterdir()) - modified_volume_contents = post_volume_contents - pre_volume_contents[src_path] - modified_volume_contents = [str(convert_docker_path(src_path, src_dest_map[src_path], p)) for p in - modified_volume_contents] - all_modified_volume_contents.update(modified_volume_contents) - - # This command changes the ownership of output files so we don't - # get a permissions error when snakemake or the user try to touch the files - # Use --recursive because new directories could have been created inside the container - # Do not run the command if no files were modified - if len(all_modified_volume_contents) > 0: - chown_command = ['chown', f'{uid}:{gid}', '--recursive'] - chown_command.extend(all_modified_volume_contents) - chown_command = ' '.join(chown_command) - client.containers.run(container, - chown_command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - environment=[environment]).decode('utf-8') - - # Raised on non-Unix systems - except AttributeError: - pass - - # TODO: Not sure whether this is needed or where to close the client - client.close() - - except Exception as err: - print(err) - # Removed the finally block to address bugbear B012 - # "`return` inside `finally` blocks cause exceptions to be silenced" - # finally: - return out - - -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): - """ - Runs a command in the container using Singularity. - Only available on Linux. - @param container: name of the DockerHub container without the 'docker://' prefix - @param command: command to run in the container - @param volumes: a list of volumes to mount where each item is a (source, destination) tuple - @param working_dir: the working directory in the container - @param environment: environment variable to set in the container - @return: output from Singularity execute - """ - # spython is not compatible with Windows - if platform.system() != 'Linux': - raise NotImplementedError('Singularity support is only available on Linux') - - # See https://stackoverflow.com/questions/3095071/in-python-what-happens-when-you-import-inside-of-a-function - from spython.main import Client - - bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - - # TODO is try/finally needed for Singularity? - singularity_options = ['--cleanenv', '--containall', '--pwd', working_dir] - # Singularity does not allow $HOME to be set as a regular environment variable - # Capture it and use the special argument instead - if environment.startswith('HOME='): - home_dir = environment[5:] - singularity_options.extend(['--home', home_dir]) - else: - singularity_options.extend(['--env', environment]) - - # To debug a container add the execute arguments: singularity_options=['--debug'], quiet=False - # Adding 'docker://' to the container indicates this is a Docker image Singularity must convert - return Client.execute('docker://' + container, - command, - options=singularity_options, - bind=bind_paths) - def hash_params_sha1_base32(params_dict: Dict[str, Any], length: Optional[int] = None) -> str: """ @@ -237,144 +42,6 @@ def hash_filename(filename: str, length: Optional[int] = None) -> str: """ return hash_params_sha1_base32({'filename': filename}, length) - -# Because this is called independently for each file, the same local path can be mounted to multiple volumes -def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PurePath]) -> Tuple[Tuple[PurePath, PurePath], str]: - """ - Makes a file on the local file system accessible within a container by mapping the local (source) path to a new - container (destination) path and renaming the file to be relative to the destination path. - The destination path will be a new path relative to the volume_base that includes a hash identifier derived from the - original filename. - An example mapped filename looks like '/spras/MG4YPNK/oi1-edges.txt'. - @param filename: The file on the local file system to map - @param volume_base: The base directory in the container, which must be an absolute directory - @return: first returned object is a tuple (source path, destination path) and the second returned object is the - updated filename relative to the destination path - """ - base_path = PurePosixPath(volume_base) - if not base_path.is_absolute(): - raise ValueError(f'Volume base must be an absolute path: {volume_base}') - - if isinstance(filename, PurePath): - filename = str(filename) - filename_hash = hash_filename(filename, DEFAULT_HASH_LENGTH) - dest = PurePosixPath(base_path, filename_hash) - - abs_filename = Path(filename).resolve() - container_filename = str(PurePosixPath(dest, abs_filename.name)) - if abs_filename.is_dir(): - dest = PurePosixPath(dest, abs_filename.name) - src = abs_filename - else: - parent = abs_filename.parent - if parent.as_posix() == '.': - parent = Path.cwd() - src = parent - - return (src, dest), container_filename - - -def process_config(config): - """ - Process the dictionary config and return the full yaml structure as well as processed portions - @param config: configuration loaded by Snakemake, from config file and any command line arguments - @return: (config, datasets, out_dir, algorithm_params) - """ - if config == {}: - raise ValueError("Config file cannot be empty. Use --configfile to set a config file.") - out_dir = config["reconstruction_settings"]["locations"]["reconstruction_dir"] - - # Parse dataset information - # Datasets is initially a list, where each list entry has a dataset label and lists of input files - # Convert the dataset list into a dict where the label is the key and update the config data structure - # TODO allow labels to be optional and assign default labels - # TODO check for collisions in dataset labels, warn, and make the labels unique - # Need to work more on input file naming to make less strict assumptions - # about the filename structure - # Currently assumes all datasets have a label and the labels are unique - # When Snakemake parses the config file it loads the datasets as OrderedDicts not dicts - # Convert to dicts to simplify the yaml logging - datasets = {dataset["label"]: dict(dataset) for dataset in config["datasets"]} - config["datasets"] = datasets - - # Code snipped from Snakefile that may be useful for assigning default labels - # dataset_labels = [dataset.get('label', f'dataset{index}') for index, dataset in enumerate(datasets)] - # Maps from the dataset label to the dataset list index - # dataset_dict = {dataset.get('label', f'dataset{index}'): index for index, dataset in enumerate(datasets)} - - # Override the default parameter hash length if specified in the config file - try: - hash_length = int(config["hash_length"]) - except (ValueError, KeyError): - hash_length = DEFAULT_HASH_LENGTH - prior_params_hashes = set() - - # Parse algorithm information - # Each algorithm's parameters are provided as a list of dictionaries - # Defaults are handled in the Python function or class that wraps - # running that algorithm - # Keys in the parameter dictionary are strings - algorithm_params = dict() - algorithm_directed = dict() - for alg in config["algorithms"]: - cur_params = alg["params"] - if "include" in cur_params and cur_params.pop("include"): - # This dict maps from parameter combinations hashes to parameter combination dictionaries - algorithm_params[alg["name"]] = dict() - else: - # Do not parse the rest of the parameters for this algorithm if it is not included - continue - - if "directed" in cur_params: - print("UPDATE: we no longer use the directed key in the config file") - cur_params.pop("directed") - - # The algorithm has no named arguments so create a default placeholder - if len(cur_params) == 0: - cur_params["run1"] = {"spras_placeholder": ["no parameters"]} - - # Each set of runs should be 1 level down in the config file - for run_params in cur_params: - all_runs = [] - - # We create the product of all param combinations for each run - param_name_list = [] - if cur_params[run_params]: - for p in cur_params[run_params]: - param_name_list.append(p) - all_runs.append(eval(str(cur_params[run_params][p]))) - run_list_tuples = list(it.product(*all_runs)) - param_name_tuple = tuple(param_name_list) - for r in run_list_tuples: - run_dict = dict(zip(param_name_tuple, r)) - # TODO temporary workaround for yaml.safe_dump in Snakefile write_parameter_log - for param, value in run_dict.copy().items(): - if isinstance(value, np.float64): - run_dict[param] = float(value) - params_hash = hash_params_sha1_base32(run_dict, hash_length) - if params_hash in prior_params_hashes: - raise ValueError(f'Parameter hash collision detected. Increase the hash_length in the config file ' - f'(current length {hash_length}).') - algorithm_params[alg["name"]][params_hash] = run_dict - - analysis_params = config["analysis"] if "analysis" in config else {} - ml_params = analysis_params["ml"] if "ml" in analysis_params else {} - - pca_params = {} - if "components" in ml_params: - pca_params["components"] = ml_params["components"] - if "labels" in ml_params: - pca_params["labels"] = ml_params["labels"] - - hac_params = {} - if "linkage" in ml_params: - hac_params["linkage"] = ml_params["linkage"] - if "metric" in ml_params: - hac_params["metric"] = ml_params ["metric"] - - return config, datasets, out_dir, algorithm_params, algorithm_directed, pca_params, hac_params - - def make_required_dirs(path: str): """ Create the directory and parent directories required before an output file can be written to the specified path. diff --git a/test/AllPairs/test_ap.py b/test/AllPairs/test_ap.py index b5874a9c..b6aab9aa 100644 --- a/test/AllPairs/test_ap.py +++ b/test/AllPairs/test_ap.py @@ -4,8 +4,14 @@ import pytest +import spras.config as config from spras.allpairs import AllPairs +# Note that we don't directly use the config in the test, but we need the config +# to be initialized under the hood nonetheless. Initializing the config has implications +# like setting hash length behaviors, container registries, etc. +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/AllPairs/' OUT_DIR = TEST_DIR+'output/' EXPECTED_DIR = TEST_DIR+'expected/' @@ -45,8 +51,7 @@ def test_allpairs_singularity(self): nodetypes=TEST_DIR+'input/sample-in-nodetypes.txt', network=TEST_DIR+'input/sample-in-net.txt', output_file=str(out_path), - singularity=True - ) + container_framework="singularity") assert out_path.exists() def test_allpairs_correctness(self): diff --git a/test/DOMINO/test_domino.py b/test/DOMINO/test_domino.py index e5305e4a..7f09fa97 100644 --- a/test/DOMINO/test_domino.py +++ b/test/DOMINO/test_domino.py @@ -4,8 +4,11 @@ import pytest +import spras.config as config from spras.domino import DOMINO, post_domino_id_transform, pre_domino_id_transform +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/DOMINO/' OUT_FILE_DEFAULT = TEST_DIR+'output/domino-output.txt' OUT_FILE_OPTIONAL = TEST_DIR+'output/domino-output-thresholds.txt' @@ -72,7 +75,7 @@ def test_domino_singularity(self): network=TEST_DIR+'input/domino-network.txt', active_genes=TEST_DIR+'input/domino-active-genes.txt', output_file=OUT_FILE_DEFAULT, - singularity=True) + container_framework="singularity") assert out_path.exists() def test_pre_id_transform(self): diff --git a/test/LocalNeighborhood/test_ln.py b/test/LocalNeighborhood/test_ln.py index c7cf2491..391c5fb1 100644 --- a/test/LocalNeighborhood/test_ln.py +++ b/test/LocalNeighborhood/test_ln.py @@ -4,6 +4,10 @@ import pytest +import spras.config as config + +config.init_from_file("config/config.yaml") + # TODO consider refactoring to simplify the import # Modify the path because of the - in the directory SPRAS_ROOT = Path(__file__).parent.parent.parent.absolute() diff --git a/test/MEO/test_meo.py b/test/MEO/test_meo.py index 724e4ae5..e2abdb72 100644 --- a/test/MEO/test_meo.py +++ b/test/MEO/test_meo.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.meo import MEO, write_properties +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/MEO/' OUT_FILE = TEST_DIR + 'output/edges.txt' @@ -63,5 +66,5 @@ def test_meo_singularity(self): sources=TEST_DIR + 'input/meo-sources.txt', targets=TEST_DIR + 'input/meo-targets.txt', output_file=OUT_FILE, - singularity=True) + container_framework="singularity") assert out_path.exists() diff --git a/test/MinCostFlow/test_mcf.py b/test/MinCostFlow/test_mcf.py index 2a404387..89bd61d0 100644 --- a/test/MinCostFlow/test_mcf.py +++ b/test/MinCostFlow/test_mcf.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.mincostflow import MinCostFlow +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/MinCostFlow/' OUT_FILE = TEST_DIR + 'output/mincostflow-output.txt' @@ -86,8 +89,7 @@ def test_mincostflow_all_optional(self, graph): edges=TEST_DIR + 'input/' + graph + '/edges.txt', output_file=OUT_FILE, flow=1, - capacity=1, - singularity=False) + capacity=1) assert out_path.exists() @pytest.mark.parametrize('graph', ['graph1']) @@ -110,5 +112,6 @@ def test_mincostflow_singularity(self, graph): output_file=OUT_FILE, flow=1, capacity=1, - singularity=True) + container_framework="singularity") assert out_path.exists() + diff --git a/test/OmicsIntegrator1/test_oi1.py b/test/OmicsIntegrator1/test_oi1.py index c361b309..6664f744 100644 --- a/test/OmicsIntegrator1/test_oi1.py +++ b/test/OmicsIntegrator1/test_oi1.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.omicsintegrator1 import OmicsIntegrator1, write_conf +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/OmicsIntegrator1/' OUT_FILE = TEST_DIR+'output/test_optimalForest.sif' @@ -91,5 +94,5 @@ def test_oi1_singularity(self): w=5, b=1, d=10, - singularity=True) + container_framework="singularity") assert out_path.exists() diff --git a/test/OmicsIntegrator2/test_oi2.py b/test/OmicsIntegrator2/test_oi2.py index b0ba3ce3..2a0a3e3c 100644 --- a/test/OmicsIntegrator2/test_oi2.py +++ b/test/OmicsIntegrator2/test_oi2.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.omicsintegrator2 import OmicsIntegrator2 +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/OmicsIntegrator2/' EDGE_FILE = TEST_DIR+'input/oi2-edges.txt' PRIZE_FILE = TEST_DIR+'input/oi2-prizes.txt' @@ -45,8 +48,7 @@ def test_oi2_all_optional(self): noisy_edges=0, random_terminals=0, dummy_mode='terminals', - seed=2, - singularity=False) + seed=2) assert OUT_FILE.exists() def test_oi2_missing(self): @@ -65,5 +67,5 @@ def test_oi2_singularity(self): OmicsIntegrator2.run(edges=EDGE_FILE, prizes=PRIZE_FILE, output_file=OUT_FILE, - singularity=True) + container_framework="singularity") assert OUT_FILE.exists() diff --git a/test/PathLinker/test_pathlinker.py b/test/PathLinker/test_pathlinker.py index 0ddb3a84..3fd6a96b 100644 --- a/test/PathLinker/test_pathlinker.py +++ b/test/PathLinker/test_pathlinker.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.pathlinker import PathLinker +config.init_from_file("config/config.yaml") + TEST_DIR = 'test/PathLinker/' OUT_FILE_DEFAULT = TEST_DIR+'output/pathlinker-ranked-edges.txt' OUT_FILE_100 = TEST_DIR+'output/pathlinker-ranked-edges-k100.txt' @@ -57,6 +60,5 @@ def test_pathlinker_singularity(self): nodetypes=TEST_DIR+'input/sample-in-nodetypes.txt', network=TEST_DIR+'input/sample-in-net.txt', output_file=OUT_FILE_DEFAULT, - singularity=True - ) + container_framework="singularity") assert out_path.exists() diff --git a/test/analysis/test_cytoscape.py b/test/analysis/test_cytoscape.py index 6ea0daff..f3399cc7 100644 --- a/test/analysis/test_cytoscape.py +++ b/test/analysis/test_cytoscape.py @@ -3,8 +3,11 @@ import pytest +import spras.config as config from spras.analysis.cytoscape import run_cytoscape +config.init_from_file("config/config.yaml") + INPUT_DIR = 'test/analysis/input/example/' INPUT_PATHWAYS = [INPUT_DIR + 'data0-meo-params-GKEDDFZ_pathway.txt', INPUT_DIR + 'data0-omicsintegrator1-params-RQCQ4YN_pathway.txt', @@ -35,5 +38,5 @@ def test_cytoscape(self): def test_cytoscape_singularity(self): out_path = Path(OUT_FILE) out_path.unlink(missing_ok=True) - run_cytoscape(INPUT_PATHWAYS, OUT_FILE, True) + run_cytoscape(INPUT_PATHWAYS, OUT_FILE, "singularity") assert out_path.exists() diff --git a/test/test_config.py b/test/test_config.py new file mode 100644 index 00000000..c7497cec --- /dev/null +++ b/test/test_config.py @@ -0,0 +1,107 @@ +import pytest + +import spras.config as config + + +# Set up a dummy config for testing. For now, only include things that MUST exist in the dict +# in order for the config init to complete. To test particular parts of the config initialization, +# individual values of the dict can be changed and the whole initialization can be re-run. +def get_test_config(): + test_raw_config = { + "container_framework": "singularity", + "container_registry": { + "base_url": "docker.io", + "owner": "reedcompbio", + }, + "hash_length": 7, + "reconstruction_settings": { + "locations": { + "reconstruction_dir": "my_dir" + } + }, + "datasets": [{"label":"alg1"}, {"label":"alg2"}], + "algorithms": [{"params": ["param2", "param2"]}], + "analysis": { + "summary": { + "include": False + }, + "ml": { + "include": False + }, + "graphspace": { + "include": False + }, + "cytoscape": { + "include": False + }, + }, + } + + return test_raw_config + +class TestConfig: + """ + Tests various parts of the configuration mechanism + """ + def test_config_hash_length(self): + # Initialize the configuration + test_config = get_test_config() + config.init_global(test_config) + assert (config.config.hash_length == 7) + + test_config["hash_length"] = "" + config.init_global(test_config) + assert (config.config.hash_length == config.DEFAULT_HASH_LENGTH) + + # Initialize the configuration + test_config["hash_length"] = "12" + config.init_global(test_config) + assert (config.config.hash_length == 12) + + def test_config_container_framework_normalization(self): + # Test singularity + test_config = get_test_config() + + test_config["container_framework"] = "singularity" + config.init_global(test_config) + assert (config.config.container_framework == "singularity") + + # Test singularity with capitalization + test_config["container_framework"] = "Singularity" + config.init_global(test_config) + assert (config.config.container_framework == "singularity") + + # Test docker + test_config["container_framework"] = "docker" + config.init_global(test_config) + assert (config.config.container_framework == "docker") + + # Test docker with capitalization + test_config["container_framework"] = "Docker" + config.init_global(test_config) + assert (config.config.container_framework == "docker") + + # Test unknown framework + test_config["container_framework"] = "badFramework" + with pytest.raises(ValueError): + config.init_global(test_config) + + def test_config_container_registry(self): + test_config = get_test_config() + test_config["container_registry"]["base_url"] = "docker.io" + test_config["container_registry"]["owner"] = "reedcompbio" + config.init_global(test_config) + assert (config.config.container_prefix == "docker.io/reedcompbio") + + test_config["container_registry"]["base_url"] = "another.repo" + test_config["container_registry"]["owner"] = "different-owner" + config.init_global(test_config) + assert (config.config.container_prefix == "another.repo/different-owner") + + test_config["container_registry"]["base_url"] = "" + test_config["container_registry"]["owner"] = "" + config.init_global(test_config) + assert (config.config.container_prefix == config.DEFAULT_CONTAINER_PREFIX) + + + diff --git a/test/test_util.py b/test/test_util.py index 10b36c5e..baf9db0e 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -2,13 +2,11 @@ import pytest -from spras.util import ( - convert_docker_path, - hash_params_sha1_base32, - prepare_path_docker, - prepare_volume, -) +import spras.config as config +from spras.containers import convert_docker_path, prepare_path_docker, prepare_volume +from spras.util import hash_params_sha1_base32 +config.init_from_file("config/config.yaml") class TestUtil: def test_prepare_path_docker(self):