From 14259b17ee3b6d565f1c0d643f5e772252441c2f Mon Sep 17 00:00:00 2001 From: Robrecht Cannoodt Date: Thu, 29 Aug 2024 17:15:07 +0200 Subject: [PATCH] fix process_datasets wf --- src/data_processors/process_dataset/script.py | 25 +++ src/workflows/process_datasets/main.nf | 160 ++---------------- src/workflows/process_datasets/test.sh | 52 +++--- 3 files changed, 63 insertions(+), 174 deletions(-) create mode 100644 src/data_processors/process_dataset/script.py diff --git a/src/data_processors/process_dataset/script.py b/src/data_processors/process_dataset/script.py new file mode 100644 index 00000000..5f616e0c --- /dev/null +++ b/src/data_processors/process_dataset/script.py @@ -0,0 +1,25 @@ +import scanpy as sc +import spatialdata as sd + +### VIASH START +par = { + "input_sc": "resources_test/common/2023_yao_mouse_brain_scrnaseq_10xv2/dataset.h5ad", + "input_sp": "resources_test/common/2023_10x_mouse_brain_xenium/dataset.zarr", + "output_sc": "resources_test/preprocessing_imagingbased_st/2023_yao_mouse_brain_scrnaseq_10xv2/dataset.h5ad", + "output_sp": "resources_test/preprocessing_imagingbased_st/2023_10x_mouse_brain_xenium/dataset.zarr" +} +### VIASH END + +# Load the single-cell data +adata_sc = sc.read(par["input_sc"]) + +# Load the spatial data +adata_sp = sd.read_zarr(par["input_sp"]) + +# Process if need be + +# Save the single-cell data +adata_sc.write_h5ad(par["output_sc"]) + +# Save the spatial data +adata_sp.write(par["output_sp"]) diff --git a/src/workflows/process_datasets/main.nf b/src/workflows/process_datasets/main.nf index eae19f7c..8f26788c 100644 --- a/src/workflows/process_datasets/main.nf +++ b/src/workflows/process_datasets/main.nf @@ -14,160 +14,24 @@ workflow run_wf { main: output_ch = input_ch - | check_dataset_schema.run( - fromState: { id, state -> - def schema = findArgumentSchema(meta.config, "input") - def schemaYaml = tempFile("schema.yaml") - writeYaml(schema, schemaYaml) - [ - "input": state.input, - "schema": schemaYaml - ] - }, - toState: { id, output, state -> - // read the output to see if dataset passed the qc - def checks = readYaml(output.output) - state + [ - "dataset": checks["exit_code"] == 0 ? state.input : null, - ] - } + // example of channel event: + // ["my_id", ["input_sc": file("..."), "input_sp": file("...")]] + + | process_datasets.run( + fromState: ["input_sc", "input_sp"], + toState: ["output_sc", "output_sp"] ) - // remove datasets which didn't pass the schema check - | filter { id, state -> - state.dataset != null - } + // example of channel event at this point: + // ["my_id", ["input_sc": ..., "input_sp": ..., + // "output_sc": file("..."), "output_sp": file("...")]] - | process_dataset.run( - fromState: [ input: "dataset" ], - toState: [ - output_train: "output_train", - output_test: "output_test", - output_solution: "output_solution" - ] - ) + | setState(["output_sp", "output_sc"]) - // only output the files for which an output file was specified - | setState(["output_train", "output_test", "output_solution"]) + // example of channel event at this point: + // ["my_id", ["output_sc": file("..."), "output_sp": file("...")]] emit: output_ch } - -// temp fix for rename_keys typo - -def findStatesTemp(Map params, Map config) { - def auto_config = deepClone(config) - def auto_params = deepClone(params) - - auto_config = auto_config.clone() - // override arguments - auto_config.argument_groups = [] - auto_config.arguments = [ - [ - type: "string", - name: "--id", - description: "A dummy identifier", - required: false - ], - [ - type: "file", - name: "--input_states", - example: "/path/to/input/directory/**/state.yaml", - description: "Path to input directory containing the datasets to be integrated.", - required: true, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--filter", - example: "foo/.*/state.yaml", - description: "Regex to filter state files by path.", - required: false - ], - // to do: make this a yaml blob? - [ - type: "string", - name: "--rename_keys", - example: ["newKey1:oldKey1", "newKey2:oldKey2"], - description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.", - required: false, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--settings", - example: '{"output_dataset": "dataset.h5ad", "k": 10}', - description: "Global arguments as a JSON glob to be passed to all components.", - required: false - ] - ] - if (!(auto_params.containsKey("id"))) { - auto_params["id"] = "auto" - } - - // run auto config through processConfig once more - auto_config = processConfig(auto_config) - - workflow findStatesTempWf { - helpMessage(auto_config) - - output_ch = - channelFromParams(auto_params, auto_config) - | flatMap { autoId, args -> - - def globalSettings = args.settings ? readYamlBlob(args.settings) : [:] - - // look for state files in input dir - def stateFiles = args.input_states - - // filter state files by regex - if (args.filter) { - stateFiles = stateFiles.findAll{ stateFile -> - def stateFileStr = stateFile.toString() - def matcher = stateFileStr =~ args.filter - matcher.matches()} - } - - // read in states - def states = stateFiles.collect { stateFile -> - def state_ = readTaggedYaml(stateFile) - [state_.id, state_] - } - - // construct renameMap - if (args.rename_keys) { - def renameMap = args.rename_keys.collectEntries{renameString -> - def split = renameString.split(":") - assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey;newKey:oldKey'" - split - } - - // rename keys in state, only let states through which have all keys - // also add global settings - states = states.collectMany{id, state -> - def newState = [:] - - for (key in renameMap.keySet()) { - def origKey = renameMap[key] - if (!(state.containsKey(origKey))) { - return [] - } - newState[key] = state[origKey] - } - - [[id, globalSettings + newState]] - } - } - - states - } - emit: - output_ch - } - - return findStatesTempWf -} \ No newline at end of file diff --git a/src/workflows/process_datasets/test.sh b/src/workflows/process_datasets/test.sh index d9181027..9636ce49 100755 --- a/src/workflows/process_datasets/test.sh +++ b/src/workflows/process_datasets/test.sh @@ -1,33 +1,33 @@ #!/bin/bash -# Run this prior to executing this script: -# bin/viash_build -q 'batch_integration' -# get the root of the directory -REPO_ROOT=$(git rev-parse --show-toplevel) - -# ensure that the command below is run from the root of the repository -cd "$REPO_ROOT" - -set -e - -DATASETS_DIR="resources_test/common" -OUTPUT_DIR="output/process_datasets_test" - -if [ ! -d "$OUTPUT_DIR" ]; then - mkdir -p "$OUTPUT_DIR" -fi - -export NXF_VER=24.04.3 +cat > /tmp/params.yaml <