Skip to content

Commit

Permalink
fix process_datasets wf
Browse files Browse the repository at this point in the history
  • Loading branch information
rcannood committed Aug 29, 2024
1 parent 736cd86 commit 14259b1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 174 deletions.
25 changes: 25 additions & 0 deletions src/data_processors/process_dataset/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import scanpy as sc
import spatialdata as sd

### VIASH START
par = {
"input_sc": "resources_test/common/2023_yao_mouse_brain_scrnaseq_10xv2/dataset.h5ad",
"input_sp": "resources_test/common/2023_10x_mouse_brain_xenium/dataset.zarr",
"output_sc": "resources_test/preprocessing_imagingbased_st/2023_yao_mouse_brain_scrnaseq_10xv2/dataset.h5ad",
"output_sp": "resources_test/preprocessing_imagingbased_st/2023_10x_mouse_brain_xenium/dataset.zarr"
}
### VIASH END

# Load the single-cell data
adata_sc = sc.read(par["input_sc"])

# Load the spatial data
adata_sp = sd.read_zarr(par["input_sp"])

# Process if need be

# Save the single-cell data
adata_sc.write_h5ad(par["output_sc"])

# Save the spatial data
adata_sp.write(par["output_sp"])
160 changes: 12 additions & 148 deletions src/workflows/process_datasets/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -14,160 +14,24 @@ workflow run_wf {
main:
output_ch = input_ch

| check_dataset_schema.run(
fromState: { id, state ->
def schema = findArgumentSchema(meta.config, "input")
def schemaYaml = tempFile("schema.yaml")
writeYaml(schema, schemaYaml)
[
"input": state.input,
"schema": schemaYaml
]
},
toState: { id, output, state ->
// read the output to see if dataset passed the qc
def checks = readYaml(output.output)
state + [
"dataset": checks["exit_code"] == 0 ? state.input : null,
]
}
// example of channel event:
// ["my_id", ["input_sc": file("..."), "input_sp": file("...")]]

| process_datasets.run(
fromState: ["input_sc", "input_sp"],
toState: ["output_sc", "output_sp"]
)

// remove datasets which didn't pass the schema check
| filter { id, state ->
state.dataset != null
}
// example of channel event at this point:
// ["my_id", ["input_sc": ..., "input_sp": ...,
// "output_sc": file("..."), "output_sp": file("...")]]

| process_dataset.run(
fromState: [ input: "dataset" ],
toState: [
output_train: "output_train",
output_test: "output_test",
output_solution: "output_solution"
]
)
| setState(["output_sp", "output_sc"])

// only output the files for which an output file was specified
| setState(["output_train", "output_test", "output_solution"])
// example of channel event at this point:
// ["my_id", ["output_sc": file("..."), "output_sp": file("...")]]

emit:
output_ch
}


// temp fix for rename_keys typo

def findStatesTemp(Map params, Map config) {
def auto_config = deepClone(config)
def auto_params = deepClone(params)

auto_config = auto_config.clone()
// override arguments
auto_config.argument_groups = []
auto_config.arguments = [
[
type: "string",
name: "--id",
description: "A dummy identifier",
required: false
],
[
type: "file",
name: "--input_states",
example: "/path/to/input/directory/**/state.yaml",
description: "Path to input directory containing the datasets to be integrated.",
required: true,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--filter",
example: "foo/.*/state.yaml",
description: "Regex to filter state files by path.",
required: false
],
// to do: make this a yaml blob?
[
type: "string",
name: "--rename_keys",
example: ["newKey1:oldKey1", "newKey2:oldKey2"],
description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.",
required: false,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--settings",
example: '{"output_dataset": "dataset.h5ad", "k": 10}',
description: "Global arguments as a JSON glob to be passed to all components.",
required: false
]
]
if (!(auto_params.containsKey("id"))) {
auto_params["id"] = "auto"
}

// run auto config through processConfig once more
auto_config = processConfig(auto_config)

workflow findStatesTempWf {
helpMessage(auto_config)

output_ch =
channelFromParams(auto_params, auto_config)
| flatMap { autoId, args ->

def globalSettings = args.settings ? readYamlBlob(args.settings) : [:]

// look for state files in input dir
def stateFiles = args.input_states

// filter state files by regex
if (args.filter) {
stateFiles = stateFiles.findAll{ stateFile ->
def stateFileStr = stateFile.toString()
def matcher = stateFileStr =~ args.filter
matcher.matches()}
}

// read in states
def states = stateFiles.collect { stateFile ->
def state_ = readTaggedYaml(stateFile)
[state_.id, state_]
}

// construct renameMap
if (args.rename_keys) {
def renameMap = args.rename_keys.collectEntries{renameString ->
def split = renameString.split(":")
assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey;newKey:oldKey'"
split
}

// rename keys in state, only let states through which have all keys
// also add global settings
states = states.collectMany{id, state ->
def newState = [:]

for (key in renameMap.keySet()) {
def origKey = renameMap[key]
if (!(state.containsKey(origKey))) {
return []
}
newState[key] = state[origKey]
}

[[id, globalSettings + newState]]
}
}

states
}
emit:
output_ch
}

return findStatesTempWf
}
52 changes: 26 additions & 26 deletions src/workflows/process_datasets/test.sh
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
#!/bin/bash

# Run this prior to executing this script:
# bin/viash_build -q 'batch_integration'

# get the root of the directory
REPO_ROOT=$(git rev-parse --show-toplevel)

# ensure that the command below is run from the root of the repository
cd "$REPO_ROOT"

set -e

DATASETS_DIR="resources_test/common"
OUTPUT_DIR="output/process_datasets_test"

if [ ! -d "$OUTPUT_DIR" ]; then
mkdir -p "$OUTPUT_DIR"
fi

export NXF_VER=24.04.3
cat > /tmp/params.yaml <<EOF
param_list:
- id: my_dataset_id0
input_sp: path/to/my_dataset0/dataset.h5ad
input_sc: path/to/my_dataset0/dataset.csv
- id: my_dataset_id1
input_sp: path/to/my_dataset1/dataset.h5ad
input_sc: path/to/my_dataset1/dataset.csv
reference: my_reference.csv
EOF

nextflow run . \
-main-script target/nextflow/workflows/process_datasets/main.nf \
-profile docker \
-entry auto \
-c common/nextflow_helpers/labels_ci.config \
--id run_test \
--input_states "$DATASETS_DIR/**/state.yaml" \
--rename_keys 'input:output_dataset' \
--settings '{"output_train": "train.h5ad", "output_test": "test.h5ad"}' \
--publish_dir "$OUTPUT_DIR" \
--output_state "state.yaml"
-params-file /tmp/params.yaml \
--publish_dir output \
--output_sp '$id/output_sp.zarr' \
--output_sc '$id/output_sc.h5ad'

# created files:
# output/my_dataset_id0.process_datasets.output_sp.zarr
# output/my_dataset_id0.process_datasets.output_sc.h5ad
# output/my_dataset_id1.process_datasets.output_sp.zarr
# output/my_dataset_id1.process_datasets.output_sc.h5ad

# created files:
# output/my_dataset_id0/output_sp.zarr
# output/my_dataset_id0/output_sc.h5ad
# output/my_dataset_id1/output_sp.zarr
# output/my_dataset_id1/output_sc.h5ad

0 comments on commit 14259b1

Please sign in to comment.