From 0c0476df8af810cecaf24143813a01600c574239 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Thu, 9 Oct 2025 11:11:01 +0200 Subject: [PATCH 01/11] add lognormalization to celltypist component --- src/annotate/celltypist/config.vsh.yaml | 6 +- src/annotate/celltypist/script.py | 92 ++++++++++++++----------- src/annotate/celltypist/test.py | 54 --------------- 3 files changed, 54 insertions(+), 98 deletions(-) diff --git a/src/annotate/celltypist/config.vsh.yaml b/src/annotate/celltypist/config.vsh.yaml index ccb8d9fc17c..fe5a8ed3460 100644 --- a/src/annotate/celltypist/config.vsh.yaml +++ b/src/annotate/celltypist/config.vsh.yaml @@ -26,7 +26,7 @@ argument_groups: required: false - name: "--input_layer" type: string - description: The layer in the input data containing log normalized counts to be used for cell type annotation if .X is not to be used. + description: The layer in the input data containing raw counts to be used for cell type annotation if .X is not to be used. - name: "--input_var_gene_names" type: string required: false @@ -50,7 +50,7 @@ argument_groups: required: false - name: "--reference_layer" type: string - description: The layer in the reference data to be used for cell type annotation if .X is not to be used. Data are expected to be processed in the same way as the --input query dataset. + description: The layer in the reference data containing raw counts to be used for cell type annotation if .X is not to be used. required: false - name: "--reference_obs_target" type: string @@ -152,7 +152,7 @@ engines: packages: - celltypist==1.6.3 - type: python - __merge__: [ /src/base/requirements/anndata_mudata.yaml, .] + __merge__: [ /src/base/requirements/anndata_mudata.yaml, /src/base/requirements/scanpy.yaml, .] __merge__: [ /src/base/requirements/python_test_setup.yaml, .] runners: - type: executable diff --git a/src/annotate/celltypist/script.py b/src/annotate/celltypist/script.py index e3efb749d3d..f9f61d59b9f 100644 --- a/src/annotate/celltypist/script.py +++ b/src/annotate/celltypist/script.py @@ -3,23 +3,20 @@ import mudata as mu import anndata as ad import pandas as pd -import numpy as np +import scanpy as sc ## VIASH START par = { "input": "resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu", "output": "output.h5mu", "modality": "rna", - # "reference": None, "reference": "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", "model": None, - # "model": "resources_test/annotation_test_data/celltypist_model_Immune_All_Low.pkl", "input_layer": "log_normalized", "reference_layer": "log_normalized", "input_reference_gene_overlap": 100, "reference_obs_target": "cell_ontology_class", "reference_var_input": None, - "check_expression": False, "feature_selection": True, "majority_voting": True, "output_compression": "gzip", @@ -44,10 +41,43 @@ logger = setup_logger() -def check_celltypist_format(indata): - if np.abs(np.expm1(indata[0]).sum() - 10000) > 1: - return False - return True +def setup_anndata( + adata: ad.AnnData, + layer: str | None = None, + gene_names: str | None = None, + var_input: str | None = None, +) -> ad.AnnData: + """Creates an AnnData object in the expected format for CellTypist, + with lognormalized data (with a target sum of 10000) in the .X slot. + + Parameters + ---------- + adata + AnnData object. + layer + Layer in AnnData object to lognormalize. + gene_names + .obs field with the gene names to be used + var_input + .var field with a boolean array of the genes to be used (e.g. highly variable genes) + Returns + ------- + AnnData object in CellTypist format. + """ + + adata = set_var_index(adata, gene_names) + + if var_input: + adata = subset_vars(adata, var_input) + + raw_counts = adata.layers[layer].copy() if layer else adata.X.copy() + + input_modality = ad.AnnData(X=raw_counts, var=pd.DataFrame(index=adata.var.index)) + + sc.pp.normalize_total(input_modality, target_sum=10000) + sc.pp.log1p(input_modality) + + return input_modality def main(par): @@ -63,17 +93,8 @@ def main(par): input_modality = input_adata.copy() # Provide correct format of query data for celltypist annotation - ## Sanitize gene names and set as index - input_modality = set_var_index(input_modality, par["input_var_gene_names"]) - ## Fetch lognormalized counts - lognorm_counts = ( - input_modality.layers[par["input_layer"]].copy() - if par["input_layer"] - else input_modality.X.copy() - ) - ## Create AnnData object - input_modality = ad.AnnData( - X=lognorm_counts, var=pd.DataFrame(index=input_modality.var.index) + input_modality = setup_anndata( + input_modality, par["input_layer"], par["input_var_gene_names"] ) if par["model"]: @@ -86,18 +107,15 @@ def main(par): ) elif par["reference"]: - reference_modality = mu.read_h5mu(par["reference"]).mod[par["modality"]] - - # subset to HVG if required - if par["reference_var_input"]: - reference_modality = subset_vars( - reference_modality, par["reference_var_input"] - ) - - # Set var names to the desired gene name format (gene symbol, ensembl id, etc.) - # CellTypist requires query gene names to be in index - reference_modality = set_var_index( - reference_modality, par["reference_var_gene_names"] + reference_adata = mu.read_h5mu(par["reference"]).mod[par["modality"]] + reference_modality = reference_adata.copy() + + # Provide correct format of query data for celltypist annotation + reference_modality = setup_anndata( + reference_modality, + par["reference_layer"], + par["reference_var_gene_names"], + par["reference_var_input"], ) # Ensure enough overlap between genes in query and reference @@ -107,18 +125,10 @@ def main(par): min_gene_overlap=par["input_reference_gene_overlap"], ) - reference_matrix = ( - reference_modality.layers[par["reference_layer"]] - if par["reference_layer"] - else reference_modality.X - ) - - labels = reference_modality.obs[par["reference_obs_target"]] - logger.info("Training CellTypist model on reference") model = celltypist.train( - reference_matrix, - labels=labels, + reference_modality.X, + labels=reference_adata.obs[par["reference_obs_target"]], genes=reference_modality.var.index, C=par["C"], max_iter=par["max_iter"], diff --git a/src/annotate/celltypist/test.py b/src/annotate/celltypist/test.py index 60704b3c24c..52670523a91 100644 --- a/src/annotate/celltypist/test.py +++ b/src/annotate/celltypist/test.py @@ -1,8 +1,6 @@ import sys import os import pytest -import subprocess -import re import mudata as mu from openpipeline_testutils.asserters import assert_annotation_objects_equal @@ -27,12 +25,8 @@ def test_simple_execution(run_component, random_h5mu_path): [ "--input", input_file, - "--input_layer", - "log_normalized", "--reference", reference_file, - "--reference_layer", - "log_normalized", "--reference_obs_target", "cell_ontology_class", "--reference_var_gene_names", @@ -75,12 +69,8 @@ def test_set_params(run_component, random_h5mu_path): [ "--input", input_file, - "--input_layer", - "log_normalized", "--reference", reference_file, - "--reference_layer", - "log_normalized", "--reference_obs_target", "cell_ontology_class", "--reference_var_gene_names", @@ -159,49 +149,5 @@ def test_with_model(run_component, random_h5mu_path): ) -def test_fail_invalid_input_expression(run_component, random_h5mu_path): - output_file = random_h5mu_path() - - # fails because input data are not lognormalized - with pytest.raises(subprocess.CalledProcessError) as err: - run_component( - [ - "--input", - input_file, - "--reference", - reference_file, - "--reference_var_gene_names", - "ensemblid", - "--output", - output_file, - ] - ) - assert re.search( - r"Invalid expression matrix, expect log1p normalized expression to 10000 counts per cell", - err.value.stdout.decode("utf-8"), - ) - - # fails because reference data are not lognormalized - with pytest.raises(subprocess.CalledProcessError) as err: - run_component( - [ - "--input", - input_file, - "--layer", - "log_normalized", - "--reference", - reference_file, - "--reference_var_gene_names", - "ensemblid", - "--output", - output_file, - ] - ) - assert re.search( - r"Invalid expression matrix, expect log1p normalized expression to 10000 counts per cell", - err.value.stdout.decode("utf-8"), - ) - - if __name__ == "__main__": sys.exit(pytest.main([__file__])) From c853100941602d517133a569640a19fd5e39e047 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Thu, 9 Oct 2025 11:15:24 +0200 Subject: [PATCH 02/11] update changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 233c6c955a6..920776c9bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,9 @@ ## BREAKING -* `differential_expression/create_pseudobulks`: Removed functionality to filter psuedobulk samples based on number of aggregated samples threshold, as this functionality is now covered in `filter/delimit_count` (PR #1044). +* `differential_expression/create_pseudobulks`: Removed functionality to filter pseudobulk samples based on number of aggregated samples threshold, as this functionality is now covered in `filter/delimit_count` (PR #1044). + +* `annotate/celtypist`: This component now requires to pass a raw count layer, that will be lognormalized with a target sum of 10000, the required count format for CellTypist (PR #1083). ## NEW FUNCTIONALITY From 747208a435f39a01235a572a4172dfdb8f6a3a3e Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 13:45:33 +0200 Subject: [PATCH 03/11] update descriptions component --- src/annotate/celltypist/config.vsh.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/annotate/celltypist/config.vsh.yaml b/src/annotate/celltypist/config.vsh.yaml index fe5a8ed3460..8455d034d32 100644 --- a/src/annotate/celltypist/config.vsh.yaml +++ b/src/annotate/celltypist/config.vsh.yaml @@ -26,7 +26,7 @@ argument_groups: required: false - name: "--input_layer" type: string - description: The layer in the input data containing raw counts to be used for cell type annotation if .X is not to be used. + description: The layer in the input data containing counts that are lognormalized to 10000, .X is not to be used. - name: "--input_var_gene_names" type: string required: false @@ -50,7 +50,7 @@ argument_groups: required: false - name: "--reference_layer" type: string - description: The layer in the reference data containing raw counts to be used for cell type annotation if .X is not to be used. + description: The layer in the reference data containing counts that are lognormalized to 10000, if .X is not to be used. required: false - name: "--reference_obs_target" type: string From dbe3a51138b741fc721ddbdad8c9fb2bc7a4ad19 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 13:46:17 +0200 Subject: [PATCH 04/11] update changelog --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44ae42f8da5..d7124cc5e76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,6 @@ * `integrate/scarches` and `workflows/annotate/scanvi_scarches`: Enable correction for technical variability by multiple continuous and categorical covariates. - ## BUG FIXES * `filter/filter_with_counts`: this component would sometimes crash (segfault) when processing malformatted sparse matrices. A proper error message is now provided in this case (PR #1086). From 791a418f4c80f816c83e9d0ec967de3b9a12ef0a Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 13:58:43 +0200 Subject: [PATCH 05/11] update changelog --- CHANGELOG.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7124cc5e76..f8a47e7e576 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,3 @@ -# openpipelines x.x.x - -## BREAKING - -* `annotate/celltypist`: This component now requires to pass a raw count layer, that will be lognormalized with a target sum of 10000, the required count format for CellTypist (PR #1083). - # openpipelines 3.1.0 ## NEW FUNCTIONALITY @@ -14,6 +8,10 @@ * `convert/from_seurat_to_h5mu`: Converts a Seurat object to a MuData object (PR #1078, #1079, #1082). +* `annotate/celltypist`: enable CUDA acceleration (PR #1083). + +* `workflows/annotation/celltypist`: Performs lognormalization (target count of 10000) followed by cell type annotation using CellTypist (PR #1083). + ## EXPERIMENTAL * `differential_expression/deseq2`: Performs differential expression analysis using DESeq2 on bulk or pseudobulk datasets (PR #1044). From 21eb3e2eaf260af969a36453dc81795e1162e9c8 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 13:59:08 +0200 Subject: [PATCH 06/11] update changelog --- src/annotate/celltypist/script.py | 92 ++++++++++++++----------------- 1 file changed, 41 insertions(+), 51 deletions(-) diff --git a/src/annotate/celltypist/script.py b/src/annotate/celltypist/script.py index f9f61d59b9f..e3efb749d3d 100644 --- a/src/annotate/celltypist/script.py +++ b/src/annotate/celltypist/script.py @@ -3,20 +3,23 @@ import mudata as mu import anndata as ad import pandas as pd -import scanpy as sc +import numpy as np ## VIASH START par = { "input": "resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu", "output": "output.h5mu", "modality": "rna", + # "reference": None, "reference": "resources_test/annotation_test_data/TS_Blood_filtered.h5mu", "model": None, + # "model": "resources_test/annotation_test_data/celltypist_model_Immune_All_Low.pkl", "input_layer": "log_normalized", "reference_layer": "log_normalized", "input_reference_gene_overlap": 100, "reference_obs_target": "cell_ontology_class", "reference_var_input": None, + "check_expression": False, "feature_selection": True, "majority_voting": True, "output_compression": "gzip", @@ -41,43 +44,10 @@ logger = setup_logger() -def setup_anndata( - adata: ad.AnnData, - layer: str | None = None, - gene_names: str | None = None, - var_input: str | None = None, -) -> ad.AnnData: - """Creates an AnnData object in the expected format for CellTypist, - with lognormalized data (with a target sum of 10000) in the .X slot. - - Parameters - ---------- - adata - AnnData object. - layer - Layer in AnnData object to lognormalize. - gene_names - .obs field with the gene names to be used - var_input - .var field with a boolean array of the genes to be used (e.g. highly variable genes) - Returns - ------- - AnnData object in CellTypist format. - """ - - adata = set_var_index(adata, gene_names) - - if var_input: - adata = subset_vars(adata, var_input) - - raw_counts = adata.layers[layer].copy() if layer else adata.X.copy() - - input_modality = ad.AnnData(X=raw_counts, var=pd.DataFrame(index=adata.var.index)) - - sc.pp.normalize_total(input_modality, target_sum=10000) - sc.pp.log1p(input_modality) - - return input_modality +def check_celltypist_format(indata): + if np.abs(np.expm1(indata[0]).sum() - 10000) > 1: + return False + return True def main(par): @@ -93,8 +63,17 @@ def main(par): input_modality = input_adata.copy() # Provide correct format of query data for celltypist annotation - input_modality = setup_anndata( - input_modality, par["input_layer"], par["input_var_gene_names"] + ## Sanitize gene names and set as index + input_modality = set_var_index(input_modality, par["input_var_gene_names"]) + ## Fetch lognormalized counts + lognorm_counts = ( + input_modality.layers[par["input_layer"]].copy() + if par["input_layer"] + else input_modality.X.copy() + ) + ## Create AnnData object + input_modality = ad.AnnData( + X=lognorm_counts, var=pd.DataFrame(index=input_modality.var.index) ) if par["model"]: @@ -107,15 +86,18 @@ def main(par): ) elif par["reference"]: - reference_adata = mu.read_h5mu(par["reference"]).mod[par["modality"]] - reference_modality = reference_adata.copy() - - # Provide correct format of query data for celltypist annotation - reference_modality = setup_anndata( - reference_modality, - par["reference_layer"], - par["reference_var_gene_names"], - par["reference_var_input"], + reference_modality = mu.read_h5mu(par["reference"]).mod[par["modality"]] + + # subset to HVG if required + if par["reference_var_input"]: + reference_modality = subset_vars( + reference_modality, par["reference_var_input"] + ) + + # Set var names to the desired gene name format (gene symbol, ensembl id, etc.) + # CellTypist requires query gene names to be in index + reference_modality = set_var_index( + reference_modality, par["reference_var_gene_names"] ) # Ensure enough overlap between genes in query and reference @@ -125,10 +107,18 @@ def main(par): min_gene_overlap=par["input_reference_gene_overlap"], ) + reference_matrix = ( + reference_modality.layers[par["reference_layer"]] + if par["reference_layer"] + else reference_modality.X + ) + + labels = reference_modality.obs[par["reference_obs_target"]] + logger.info("Training CellTypist model on reference") model = celltypist.train( - reference_modality.X, - labels=reference_adata.obs[par["reference_obs_target"]], + reference_matrix, + labels=labels, genes=reference_modality.var.index, C=par["C"], max_iter=par["max_iter"], From 7a062cde88ee34b704f8c0001e080b486ced797b Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 14:11:58 +0200 Subject: [PATCH 07/11] update changelog --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8a47e7e576..97ccc220ef9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,6 @@ * `convert/from_seurat_to_h5mu`: Converts a Seurat object to a MuData object (PR #1078, #1079, #1082). -* `annotate/celltypist`: enable CUDA acceleration (PR #1083). - * `workflows/annotation/celltypist`: Performs lognormalization (target count of 10000) followed by cell type annotation using CellTypist (PR #1083). ## EXPERIMENTAL From 737f771b9f07d86748c7e375c23217a328fd359a Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 14:23:49 +0200 Subject: [PATCH 08/11] undo test changes --- src/annotate/celltypist/test.py | 54 +++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/annotate/celltypist/test.py b/src/annotate/celltypist/test.py index 52670523a91..60704b3c24c 100644 --- a/src/annotate/celltypist/test.py +++ b/src/annotate/celltypist/test.py @@ -1,6 +1,8 @@ import sys import os import pytest +import subprocess +import re import mudata as mu from openpipeline_testutils.asserters import assert_annotation_objects_equal @@ -25,8 +27,12 @@ def test_simple_execution(run_component, random_h5mu_path): [ "--input", input_file, + "--input_layer", + "log_normalized", "--reference", reference_file, + "--reference_layer", + "log_normalized", "--reference_obs_target", "cell_ontology_class", "--reference_var_gene_names", @@ -69,8 +75,12 @@ def test_set_params(run_component, random_h5mu_path): [ "--input", input_file, + "--input_layer", + "log_normalized", "--reference", reference_file, + "--reference_layer", + "log_normalized", "--reference_obs_target", "cell_ontology_class", "--reference_var_gene_names", @@ -149,5 +159,49 @@ def test_with_model(run_component, random_h5mu_path): ) +def test_fail_invalid_input_expression(run_component, random_h5mu_path): + output_file = random_h5mu_path() + + # fails because input data are not lognormalized + with pytest.raises(subprocess.CalledProcessError) as err: + run_component( + [ + "--input", + input_file, + "--reference", + reference_file, + "--reference_var_gene_names", + "ensemblid", + "--output", + output_file, + ] + ) + assert re.search( + r"Invalid expression matrix, expect log1p normalized expression to 10000 counts per cell", + err.value.stdout.decode("utf-8"), + ) + + # fails because reference data are not lognormalized + with pytest.raises(subprocess.CalledProcessError) as err: + run_component( + [ + "--input", + input_file, + "--layer", + "log_normalized", + "--reference", + reference_file, + "--reference_var_gene_names", + "ensemblid", + "--output", + output_file, + ] + ) + assert re.search( + r"Invalid expression matrix, expect log1p normalized expression to 10000 counts per cell", + err.value.stdout.decode("utf-8"), + ) + + if __name__ == "__main__": sys.exit(pytest.main([__file__])) From dec15513cfda7b016e2bead0e98b4a3d5a1e5d8f Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 14:48:14 +0200 Subject: [PATCH 09/11] wip --- src/annotate/celltypist/config.vsh.yaml | 2 +- src/annotate/celltypist/script.py | 8 - .../annotation/celltypist/config.vsh.yaml | 150 ++++++++++ .../annotation/celltypist/integration_test.sh | 15 + src/workflows/annotation/celltypist/main.nf | 259 ++++++++++++++++++ .../annotation/celltypist/nextflow.config | 10 + src/workflows/annotation/celltypist/test.nf | 66 +++++ .../annotation/celltypist/config.vsh.yaml | 25 ++ .../annotation/celltypist/script.py | 48 ++++ 9 files changed, 574 insertions(+), 9 deletions(-) create mode 100644 src/workflows/annotation/celltypist/config.vsh.yaml create mode 100755 src/workflows/annotation/celltypist/integration_test.sh create mode 100644 src/workflows/annotation/celltypist/main.nf create mode 100644 src/workflows/annotation/celltypist/nextflow.config create mode 100644 src/workflows/annotation/celltypist/test.nf create mode 100644 src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml create mode 100644 src/workflows/test_workflows/annotation/celltypist/script.py diff --git a/src/annotate/celltypist/config.vsh.yaml b/src/annotate/celltypist/config.vsh.yaml index 8455d034d32..94acd7a591b 100644 --- a/src/annotate/celltypist/config.vsh.yaml +++ b/src/annotate/celltypist/config.vsh.yaml @@ -152,7 +152,7 @@ engines: packages: - celltypist==1.6.3 - type: python - __merge__: [ /src/base/requirements/anndata_mudata.yaml, /src/base/requirements/scanpy.yaml, .] + __merge__: [ /src/base/requirements/anndata_mudata.yaml, .] __merge__: [ /src/base/requirements/python_test_setup.yaml, .] runners: - type: executable diff --git a/src/annotate/celltypist/script.py b/src/annotate/celltypist/script.py index e3efb749d3d..c83debfb913 100644 --- a/src/annotate/celltypist/script.py +++ b/src/annotate/celltypist/script.py @@ -3,7 +3,6 @@ import mudata as mu import anndata as ad import pandas as pd -import numpy as np ## VIASH START par = { @@ -19,7 +18,6 @@ "input_reference_gene_overlap": 100, "reference_obs_target": "cell_ontology_class", "reference_var_input": None, - "check_expression": False, "feature_selection": True, "majority_voting": True, "output_compression": "gzip", @@ -44,12 +42,6 @@ logger = setup_logger() -def check_celltypist_format(indata): - if np.abs(np.expm1(indata[0]).sum() - 10000) > 1: - return False - return True - - def main(par): if (not par["model"] and not par["reference"]) or ( par["model"] and par["reference"] diff --git a/src/workflows/annotation/celltypist/config.vsh.yaml b/src/workflows/annotation/celltypist/config.vsh.yaml new file mode 100644 index 00000000000..1fe4a0ef444 --- /dev/null +++ b/src/workflows/annotation/celltypist/config.vsh.yaml @@ -0,0 +1,150 @@ +name: "celltypist" +namespace: "workflows/annotation" +scope: "public" +description: "Cell type annotation workflow by performing lognormalization of the raw counts layer followed by cell type annotation with CellTypist." +info: + name: "CellTypist annotation" + test_dependencies: + - name: celltypist_test + namespace: test_workflows/annotation +authors: + - __merge__: /src/authors/dorien_roosen.yaml + roles: [ author, maintainer ] + - __merge__: /src/authors/weiwei_schultz.yaml + roles: [ contributor ] + +argument_groups: + - name: Inputs + description: Input dataset (query) arguments + arguments: + - name: "--input" + alternatives: [-i] + type: file + description: The input (query) data to be labeled. Should be a .h5mu file. + direction: input + required: true + example: input.h5mu + - name: "--modality" + description: Which modality to process. + type: string + default: "rna" + required: false + - name: "--input_layer" + type: string + description: The layer in the input data containing raw counts, if .X is not to be used. + - name: "--input_var_gene_names" + type: string + required: false + description: | + The name of the adata var column in the input data containing gene names; when no gene_name_layer is provided, the var index will be used. + - name: "--input_reference_gene_overlap" + type: integer + default: 100 + min: 1 + description: | + The minimum number of genes present in both the reference and query datasets. + + - name: Reference + description: Arguments related to the reference dataset. + arguments: + - name: "--reference" + type: file + description: "The reference data to train the CellTypist classifiers on. Only required if a pre-trained --model is not provided." + example: reference.h5mu + direction: input + required: false + - name: "--reference_layer" + type: string + description: The layer in the reference data containing raw counts, if .X is not to be used. + required: false + - name: "--reference_obs_target" + type: string + description: The name of the adata obs column in the reference data containing cell type annotations. + default: "cell_ontology_class" + - name: "--reference_var_gene_names" + type: string + required: false + description: | + The name of the adata var column in the reference data containing gene names; when no gene_name_layer is provided, the var index will be used. + - name: "--reference_var_input" + type: string + required: false + description: | + .var column containing highly variable genes. By default, do not subset genes. + + - name: Model arguments + description: Model arguments. + arguments: + - name: "--model" + type: file + description: "Pretrained model in pkl format. If not provided, the model will be trained on the reference data and --reference should be provided." + required: false + example: pretrained_model.pkl + - name: "--feature_selection" + type: boolean + description: "Whether to perform feature selection." + default: false + - name: "--majority_voting" + type: boolean + description: "Whether to refine the predicted labels by running the majority voting classifier after over-clustering." + default: false + - name: "--C" + type: double + description: "Inverse of regularization strength in logistic regression." + default: 1.0 + - name: "--max_iter" + type: integer + description: "Maximum number of iterations before reaching the minimum of the cost function." + default: 1000 + - name: "--use_SGD" + type: boolean_true + description: "Whether to use the stochastic gradient descent algorithm." + - name: "--min_prop" + type: double + description: | + "For the dominant cell type within a subcluster, the minimum proportion of cells required to + support naming of the subcluster by this cell type. Ignored if majority_voting is set to False. + Subcluster that fails to pass this proportion threshold will be assigned 'Heterogeneous'." + default: 0 + + - name: Outputs + description: Output arguments. + arguments: + - name: "--output" + type: file + description: Output h5mu file. + direction: output + example: output.h5mu + - name: "--output_obs_predictions" + type: string + default: celltypist_pred + required: false + description: | + In which `.obs` slots to store the predicted information. + - name: "--output_obs_probability" + type: string + default: celltypist_probability + required: false + description: | + In which `.obs` slots to store the probability of the predictions. + __merge__: [., /src/base/h5_compression_argument.yaml] + +dependencies: + - name: transform/normalize_total + - name: transform/log1p + - name: dataflow/merge + +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + +test_resources: + - type: nextflow_script + path: test.nf + entrypoint: test_wf + - path: /resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu + - path: /resources_test/annotation_test_data/TS_Blood_filtered.h5mu + +runners: + - type: nextflow diff --git a/src/workflows/annotation/celltypist/integration_test.sh b/src/workflows/annotation/celltypist/integration_test.sh new file mode 100755 index 00000000000..e8597bfef3e --- /dev/null +++ b/src/workflows/annotation/celltypist/integration_test.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# get the root of the directory +REPO_ROOT=$(git rev-parse --show-toplevel) + +# ensure that the command below is run from the root of the repository +cd "$REPO_ROOT" + +nextflow \ + run . \ + -main-script src/workflows/annotation/harmony_knn/test.nf \ + -entry test_wf \ + -profile docker,no_publish \ + -c src/workflows/utils/labels_ci.config \ + -c src/workflows/utils/integration_tests.config \ diff --git a/src/workflows/annotation/celltypist/main.nf b/src/workflows/annotation/celltypist/main.nf new file mode 100644 index 00000000000..935ab3a24cd --- /dev/null +++ b/src/workflows/annotation/celltypist/main.nf @@ -0,0 +1,259 @@ +workflow run_wf { + take: + input_ch + + main: + + modalities_ch = input_ch + // Set aside the output for this workflow to avoid conflicts + | map {id, state -> + def new_state = state + ["workflow_output": state.output] + [id, new_state] + } + // Align query and reference datasets + | align_query_reference.run( + fromState: [ + "input": "input", + "modality": "modality", + "input_layer": "input_layer", + "input_obs_batch": "input_obs_batch_label", + "input_var_gene_names": "input_var_gene_names", + "reference": "reference", + "reference_layer": "reference_layer", + "reference_obs_batch": "reference_obs_batch_label", + "reference_obs_label": "reference_obs_target", + "reference_var_gene_names": "reference_var_gene_names", + "input_reference_gene_overlap": "input_reference_gene_overlap", + "overwrite_existing_key": "overwrite_existing_key" + ], + args: [ + "input_id": "query", + "reference_id": "reference", + "output_layer": "_counts", + "output_var_gene_names": "_gene_names", + "output_obs_batch": "_sample_id", + "output_obs_label": "_cell_type", + "output_obs_id": "_dataset", + "output_var_common_genes": "_common_vars" + ], + toState: [ + "input": "output_query", + "reference": "output_reference" + ] + ) + + | split_modalities.run( + fromState: {id, state -> + def newState = ["input": state.input, "id": id] + }, + toState: ["output": "output", "output_types": "output_types"] + ) + | flatMap {id, state -> + def outputDir = state.output + def types = readCsv(state.output_types.toUriString()) + + types.collect{ dat -> + // def new_id = id + "_" + dat.name + def new_id = id // it's okay because the channel will get split up anyways + def new_data = outputDir.resolve(dat.filename) + [ new_id, state + ["input": new_data, modality: dat.name]] + } + } + // Remove arguments from split modalities from state + | map {id, state -> + def keysToRemove = ["output_types", "output_files"] + def newState = state.findAll{it.key !in keysToRemove} + [id, newState] + } + | view {"After splitting modalities: $it"} + + + rna_ch = modalities_ch + + | filter{id, state -> state.modality == "rna"} + + // Concatenate query and reference datasets prior to integration + // Only concatenate rna modality in this channel + | concatenate_h5mu.run( + fromState: { id, state -> [ + "input": [state.input, state.reference] + ] + }, + args: [ + "input_id": ["query", "reference"], + "modality": "rna", + "other_axis_mode": "move" + ], + toState: ["input": "output"] + ) + | view {"After concatenation: $it"} + | highly_variable_features_scanpy.run( + fromState: [ + "input": "input", + "modality": "modality", + "n_top_features": "n_hvg" + ], + args: [ + "layer": "_counts", + "var_input": "_common_vars", + "var_name_filter": "_common_hvg", + "obs_batch_key": "_sample_id" + ], + toState: [ + "input": "output" + ] + ) + | pca.run( + fromState: [ + "input": "input", + "modality": "modality", + "overwrite": "overwrite_existing_key", + "num_compontents": "pca_num_components" + ], + args: [ + "layer": "_counts", + "var_input": "_common_hvg", + "obsm_output": "X_pca_query_reference", + "varm_output": "pca_loadings_query_reference", + "uns_output": "pca_variance_query_reference", + ], + toState: [ + "input": "output" + ] + ) + | delete_layer.run( + key: "delete_aligned_lognormalized_counts_layer", + fromState: [ + "input": "input", + "modality": "modality", + ], + args: [ + "layer": "_counts", + "missing_ok": "true" + ], + toState: [ + "input": "output" + ] + ) + // Run harmony integration with leiden clustering + | harmony_leiden_workflow.run( + fromState: { id, state -> [ + "id": id, + "input": state.input, + "modality": state.modality, + "obsm_integrated": state.output_obsm_integrated, + "theta": state.harmony_theta, + "leiden_resolution": state.leiden_resolution, + ]}, + args: [ + "embedding": "X_pca_query_reference", + "uns_neighbors": "harmonypy_integration_neighbors", + "obsp_neighbor_distances": "harmonypy_integration_distances", + "obsp_neighbor_connectivities": "harmonypy_integration_connectivities", + "obs_cluster": "harmony_integration_leiden", + "obsm_umap": "X_leiden_harmony_umap", + "obs_covariates": "_sample_id" + ], + toState: ["input": "output"] + ) + | view {"After integration: $it"} + // Split integrated dataset back into a separate reference and query dataset + | split_h5mu.run( + fromState: [ + "input": "input", + "modality": "modality" + ], + args: [ + "obs_feature": "_dataset", + "output_files": "sample_files.csv", + "drop_obs_nan": "true", + "output": "ref_query" + ], + toState: [ + "output": "output", + "output_files": "output_files" + ], + auto: [ publish: true ] + ) + | view {"After sample splitting: $it"} + // map the integrated query and reference datasets back to the state + | map {id, state -> + def outputDir = state.output + if (workflow.stubRun) { + def output_files = outputDir.listFiles() + def new_state = state + [ + "input": output_files[0], + "reference": output_files[1], + ] + return [id, new_state] + } + def files = readCsv(state.output_files.toUriString()) + def query_file = files.findAll{ dat -> dat.name == 'query' } + assert query_file.size() == 1, 'there should only be one query file' + def reference_file = files.findAll{ dat -> dat.name == 'reference' } + assert reference_file.size() == 1, 'there should only be one reference file' + def integrated_query = outputDir.resolve(query_file.filename) + def integrated_reference = outputDir.resolve(reference_file.filename) + def newKeys = ["input": integrated_query, "reference": integrated_reference] + [id, state + newKeys] + } + // remove keys from split files + | map {id, state -> + def keysToRemove = ["output_files"] + def newState = state.findAll{it.key !in keysToRemove} + [id, newState] + } + // Perform KNN label transfer from integrated reference to integrated query + | knn.run( + fromState: [ + "input": "input", + "modality": "modality", + "input_obsm_features": "output_obsm_integrated", + "reference": "reference", + "reference_obsm_features": "output_obsm_integrated", + "reference_obs_targets": "reference_obs_target", + "output_obs_predictions": "output_obs_predictions", + "output_obs_probability": "output_obs_probability", + "output_compression": "output_compression", + "weights": "knn_weights", + "n_neighbors": "knn_n_neighbors", + "output": "workflow_output" + ], + toState: ["input": "output"] + // toState: {id, output, state -> ["output": output.output]} + ) + | view {"After processing RNA modality: $it"} + + other_mod_ch = modalities_ch + | filter{id, state -> state.modality != "rna"} + + output_ch = rna_ch.mix(other_mod_ch) + | groupTuple(by: 0, sort: "hash") + | map { id, states -> + def new_input = states.collect{it.input} + def modalities = states.collect{it.modality}.unique() + def other_state_keys = states.inject([].toSet()){ current_keys, state -> + def new_keys = current_keys + state.keySet() + return new_keys + }.minus(["output", "input", "modality", "reference"]) + def new_state = other_state_keys.inject([:]){ old_state, argument_name -> + argument_values = states.collect{it.get(argument_name)}.unique() + assert argument_values.size() == 1, "Arguments should be the same across modalities. Please report this \ + as a bug. Argument name: $argument_name, \ + argument value: $argument_values" + def argument_value + argument_values.each { argument_value = it } + def current_state = old_state + [(argument_name): argument_value] + return current_state + } + [id, new_state + ["input": new_input, "modalities": modalities]] + } + | merge.run( + fromState: ["input": "input"], + toState: ["output": "output"], + ) + | setState(["output"]) + + emit: + output_ch +} diff --git a/src/workflows/annotation/celltypist/nextflow.config b/src/workflows/annotation/celltypist/nextflow.config new file mode 100644 index 00000000000..059100c489c --- /dev/null +++ b/src/workflows/annotation/celltypist/nextflow.config @@ -0,0 +1,10 @@ +manifest { + nextflowVersion = '!>=20.12.1-edge' +} + +params { + rootDir = java.nio.file.Paths.get("$projectDir/../../../../").toAbsolutePath().normalize().toString() +} + +// include common settings +includeConfig("${params.rootDir}/src/workflows/utils/labels.config") diff --git a/src/workflows/annotation/celltypist/test.nf b/src/workflows/annotation/celltypist/test.nf new file mode 100644 index 00000000000..cbe7833fb20 --- /dev/null +++ b/src/workflows/annotation/celltypist/test.nf @@ -0,0 +1,66 @@ +nextflow.enable.dsl=2 + +include { harmony_knn } from params.rootDir + "/target/nextflow/workflows/annotation/harmony_knn/main.nf" +include { harmony_knn_test } from params.rootDir + "/target/_test/nextflow/test_workflows/annotation/harmony_knn_test/main.nf" +params.resources_test = params.rootDir + "/resources_test" + +workflow test_wf { + // allow changing the resources_test dir + resources_test = file(params.resources_test) + + output_ch = Channel.fromList( + [ + [ + id: "simple_execution_test", + input: resources_test.resolve("pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu"), + input_layer: "log_normalized", + reference: resources_test.resolve("annotation_test_data/TS_Blood_filtered.h5mu"), + reference_var_gene_names: "ensemblid", + input_obs_batch_label: "sample_id", + reference_layer: "log_normalized", + reference_obs_batch_label: "donor_assay", + reference_obs_target: "cell_type", + leiden_resolution: [1.0, 0.25] + ], + [ + id: "no_leiden_resolutions_test", + input: resources_test.resolve("pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu"), + input_layer: "log_normalized", + reference: resources_test.resolve("annotation_test_data/TS_Blood_filtered.h5mu"), + reference_var_gene_names: "ensemblid", + input_obs_batch_label: "sample_id", + reference_layer: "log_normalized", + reference_obs_batch_label: "donor_assay", + reference_obs_target: "cell_type", + leiden_resolution: [] + ] + ]) + | map{ state -> [state.id, state] } + | harmony_knn + | view { output -> + assert output.size() == 2 : "Outputs should contain two elements; [id, state]" + + // check id + def id = output[0] + assert id.endsWith("_test") : "Output ID should be same as input ID" + + // check output + def state = output[1] + assert state instanceof Map : "State should be a map. Found: ${state}" + assert state.containsKey("output") : "Output should contain key 'output'." + assert state.output.isFile() : "'output' should be a file." + assert state.output.toString().endsWith(".h5mu") : "Output file should end with '.h5mu'. Found: ${state.output}" + + "Output: $output" + } + | harmony_knn_test.run( + fromState: [ + "input": "output" + ] + ) + | toSortedList({a, b -> a[0] <=> b[0]}) + | map { output_list -> + assert output_list.size() == 2 : "output channel should contain 2 events" + assert output_list.collect{it[0]} == ["no_leiden_resolutions_test", "simple_execution_test"] + } + } \ No newline at end of file diff --git a/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml b/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml new file mode 100644 index 00000000000..dbd3a464dbf --- /dev/null +++ b/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml @@ -0,0 +1,25 @@ +name: "celltypist_test" +namespace: "test_workflows/annotation" +scope: "test" +description: "This component tests the output of the annotation of the celltypist workflow." +authors: + - __merge__: /src/authors/dorien_roosen.yaml +argument_groups: + - name: Inputs + arguments: + - name: "--input" + type: file + required: true + description: Path to h5mu output. + example: foo.final.h5mu +resources: + - type: python_script + path: script.py + - path: /src/utils/setup_logger.py +engines: + - type: docker + image: python:3.12-slim + __merge__: /src/base/requirements/testworkflows_setup.yaml +runners: + - type: executable + - type: nextflow \ No newline at end of file diff --git a/src/workflows/test_workflows/annotation/celltypist/script.py b/src/workflows/test_workflows/annotation/celltypist/script.py new file mode 100644 index 00000000000..2e1a71b2613 --- /dev/null +++ b/src/workflows/test_workflows/annotation/celltypist/script.py @@ -0,0 +1,48 @@ +from mudata import read_h5mu +import sys +import pytest + +##VIASH START +par = {"input": "harmony_knn/output.h5mu"} + +meta = {"resources_dir": "resources_test"} +##VIASH END + + +def test_run(): + input_mudata = read_h5mu(par["input"]) + expected_obsm = ["X_integrated_harmony", "X_leiden_harmony_umap"] + expected_obs = ["cell_type_pred", "cell_type_probability"] + expected_obsp = [ + "harmonypy_integration_distances", + "harmonypy_integration_connectivities", + ] + expected_mod = ["rna", "prot"] + + assert all(key in list(input_mudata.mod) for key in expected_mod), ( + f"Input modalities should be: {expected_mod}, found: {input_mudata.mod.keys()}." + ) + assert all(key in list(input_mudata.mod["rna"].obsm) for key in expected_obsm), ( + f"Input mod['rna'] obsm columns should be: {expected_obsm}, found: {input_mudata.mod['rna'].obsm.keys()}." + ) + assert all(key in list(input_mudata.mod["rna"].obs) for key in expected_obs), ( + f"Input mod['rna'] obs columns should be: {expected_obs}, found: {input_mudata.mod['rna'].obs.keys()}." + ) + assert all(key in list(input_mudata.mod["rna"].obsp) for key in expected_obsp), ( + f"Input mod['rna'] obsp columns should be: {expected_obsp}, found: {input_mudata.mod['rna'].obsp.keys()}." + ) + + assert input_mudata.mod["rna"].obs["cell_type_pred"].dtype == "category", ( + "Cell type predictions should be of dtype category." + ) + assert input_mudata.mod["rna"].obs["cell_type_probability"].dtype == "float64", ( + "Cell type probabilities should be of dtype float64." + ) + + assert input_mudata.mod["rna"].shape[0] == input_mudata.mod["prot"].shape[0], ( + "Number of observations should be equal in all modalities." + ) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "--import-mode=importlib"])) From 2a11b496fe31e8d8d439e094fe87fd994527d913 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Fri, 17 Oct 2025 16:41:15 +0200 Subject: [PATCH 10/11] create celltypist workflow --- .../annotation/celltypist/config.vsh.yaml | 6 +- .../annotation/celltypist/integration_test.sh | 10 +- src/workflows/annotation/celltypist/main.nf | 342 ++++++------------ src/workflows/annotation/celltypist/test.nf | 74 ++-- .../annotation/celltypist/config.vsh.yaml | 6 + .../annotation/celltypist/script.py | 27 +- 6 files changed, 182 insertions(+), 283 deletions(-) diff --git a/src/workflows/annotation/celltypist/config.vsh.yaml b/src/workflows/annotation/celltypist/config.vsh.yaml index 1fe4a0ef444..ab9c266f6dc 100644 --- a/src/workflows/annotation/celltypist/config.vsh.yaml +++ b/src/workflows/annotation/celltypist/config.vsh.yaml @@ -132,7 +132,9 @@ argument_groups: dependencies: - name: transform/normalize_total - name: transform/log1p - - name: dataflow/merge + - name: transform/delete_layer + - name: annotate/celltypist + alias: celltypist_component resources: - type: nextflow_script @@ -145,6 +147,8 @@ test_resources: entrypoint: test_wf - path: /resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu - path: /resources_test/annotation_test_data/TS_Blood_filtered.h5mu + - path: /resources_test/annotation_test_data/celltypist_model_Immune_All_Low.pkl + - path: /resources_test/annotation_test_data/demo_2000_cells.h5mu runners: - type: nextflow diff --git a/src/workflows/annotation/celltypist/integration_test.sh b/src/workflows/annotation/celltypist/integration_test.sh index e8597bfef3e..e5870cfc714 100755 --- a/src/workflows/annotation/celltypist/integration_test.sh +++ b/src/workflows/annotation/celltypist/integration_test.sh @@ -8,8 +8,16 @@ cd "$REPO_ROOT" nextflow \ run . \ - -main-script src/workflows/annotation/harmony_knn/test.nf \ + -main-script src/workflows/annotation/celltypist/test.nf \ -entry test_wf \ -profile docker,no_publish \ -c src/workflows/utils/labels_ci.config \ -c src/workflows/utils/integration_tests.config \ + +nextflow \ + run . \ + -main-script src/workflows/annotation/celltypist/test.nf \ + -entry test_wf_2 \ + -profile docker,no_publish \ + -c src/workflows/utils/labels_ci.config \ + -c src/workflows/utils/integration_tests.config \ diff --git a/src/workflows/annotation/celltypist/main.nf b/src/workflows/annotation/celltypist/main.nf index 935ab3a24cd..ab1080bf044 100644 --- a/src/workflows/annotation/celltypist/main.nf +++ b/src/workflows/annotation/celltypist/main.nf @@ -4,256 +4,122 @@ workflow run_wf { main: - modalities_ch = input_ch + output_ch = input_ch // Set aside the output for this workflow to avoid conflicts | map {id, state -> - def new_state = state + ["workflow_output": state.output] - [id, new_state] + def new_state = state + ["workflow_output": state.output] + [id, new_state] } - // Align query and reference datasets - | align_query_reference.run( - fromState: [ - "input": "input", - "modality": "modality", - "input_layer": "input_layer", - "input_obs_batch": "input_obs_batch_label", - "input_var_gene_names": "input_var_gene_names", - "reference": "reference", - "reference_layer": "reference_layer", - "reference_obs_batch": "reference_obs_batch_label", - "reference_obs_label": "reference_obs_target", - "reference_var_gene_names": "reference_var_gene_names", - "input_reference_gene_overlap": "input_reference_gene_overlap", - "overwrite_existing_key": "overwrite_existing_key" - ], - args: [ - "input_id": "query", - "reference_id": "reference", - "output_layer": "_counts", - "output_var_gene_names": "_gene_names", - "output_obs_batch": "_sample_id", - "output_obs_label": "_cell_type", - "output_obs_id": "_dataset", - "output_var_common_genes": "_common_vars" - ], - toState: [ - "input": "output_query", - "reference": "output_reference" - ] + // Log normalize query dataset to target sum of 10000 + | normalize_total.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality, + "input_layer": state.input_layer, + ]}, + args: [ + "output_layer": "normalized_10k", + "target_sum": "10000", + ], + toState: [ + "input": "output", + ] ) - | split_modalities.run( - fromState: {id, state -> - def newState = ["input": state.input, "id": id] - }, - toState: ["output": "output", "output_types": "output_types"] - ) - | flatMap {id, state -> - def outputDir = state.output - def types = readCsv(state.output_types.toUriString()) - - types.collect{ dat -> - // def new_id = id + "_" + dat.name - def new_id = id // it's okay because the channel will get split up anyways - def new_data = outputDir.resolve(dat.filename) - [ new_id, state + ["input": new_data, modality: dat.name]] - } - } - // Remove arguments from split modalities from state - | map {id, state -> - def keysToRemove = ["output_types", "output_files"] - def newState = state.findAll{it.key !in keysToRemove} - [id, newState] - } - | view {"After splitting modalities: $it"} - - - rna_ch = modalities_ch - - | filter{id, state -> state.modality == "rna"} - - // Concatenate query and reference datasets prior to integration - // Only concatenate rna modality in this channel - | concatenate_h5mu.run( - fromState: { id, state -> [ - "input": [state.input, state.reference] - ] - }, - args: [ - "input_id": ["query", "reference"], - "modality": "rna", - "other_axis_mode": "move" - ], - toState: ["input": "output"] - ) - | view {"After concatenation: $it"} - | highly_variable_features_scanpy.run( - fromState: [ - "input": "input", - "modality": "modality", - "n_top_features": "n_hvg" - ], - args: [ - "layer": "_counts", - "var_input": "_common_vars", - "var_name_filter": "_common_hvg", - "obs_batch_key": "_sample_id" - ], - toState: [ - "input": "output" - ] - ) - | pca.run( - fromState: [ - "input": "input", - "modality": "modality", - "overwrite": "overwrite_existing_key", - "num_compontents": "pca_num_components" - ], - args: [ - "layer": "_counts", - "var_input": "_common_hvg", - "obsm_output": "X_pca_query_reference", - "varm_output": "pca_loadings_query_reference", - "uns_output": "pca_variance_query_reference", - ], - toState: [ - "input": "output" - ] + | log1p.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality + ]}, + args: [ + "input_layer": "normalized_10k", + "output_layer": "log_normalized_10k", + ], + toState: [ + "input": "output" + ] ) | delete_layer.run( - key: "delete_aligned_lognormalized_counts_layer", - fromState: [ - "input": "input", - "modality": "modality", - ], - args: [ - "layer": "_counts", - "missing_ok": "true" - ], - toState: [ - "input": "output" - ] + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality + ]}, + args: [ + "layer": "normalized_10k" + ], + toState: [ + "input": "output" + ] ) - // Run harmony integration with leiden clustering - | harmony_leiden_workflow.run( - fromState: { id, state -> [ - "id": id, - "input": state.input, - "modality": state.modality, - "obsm_integrated": state.output_obsm_integrated, - "theta": state.harmony_theta, - "leiden_resolution": state.leiden_resolution, - ]}, - args: [ - "embedding": "X_pca_query_reference", - "uns_neighbors": "harmonypy_integration_neighbors", - "obsp_neighbor_distances": "harmonypy_integration_distances", - "obsp_neighbor_connectivities": "harmonypy_integration_connectivities", - "obs_cluster": "harmony_integration_leiden", - "obsm_umap": "X_leiden_harmony_umap", - "obs_covariates": "_sample_id" - ], - toState: ["input": "output"] + // Log normalize reference dataset to target sum of 10000 + | normalize_total.run( + key: "normalize_total_reference", + runIf: { id, state -> + state.reference + }, + fromState: { id, state -> [ + "input": state.reference, + "modality": state.modality, + "input_layer": state.reference_layer, + ]}, + args: [ + "output_layer": "normalized_10k", + "target_sum": "10000", + ], + toState: [ + "reference": "output", + ] ) - | view {"After integration: $it"} - // Split integrated dataset back into a separate reference and query dataset - | split_h5mu.run( - fromState: [ - "input": "input", - "modality": "modality" - ], - args: [ - "obs_feature": "_dataset", - "output_files": "sample_files.csv", - "drop_obs_nan": "true", - "output": "ref_query" - ], - toState: [ - "output": "output", - "output_files": "output_files" - ], - auto: [ publish: true ] + | log1p.run( + key: "log1p_reference", + runIf: { id, state -> + state.reference + }, + fromState: { id, state -> [ + "input": state.reference, + "modality": state.modality + ]}, + args: [ + "input_layer": "normalized_10k", + "output_layer": "log_normalized_10k", + ], + toState: [ + "reference": "output" + ] ) - | view {"After sample splitting: $it"} - // map the integrated query and reference datasets back to the state - | map {id, state -> - def outputDir = state.output - if (workflow.stubRun) { - def output_files = outputDir.listFiles() - def new_state = state + [ - "input": output_files[0], - "reference": output_files[1], - ] - return [id, new_state] - } - def files = readCsv(state.output_files.toUriString()) - def query_file = files.findAll{ dat -> dat.name == 'query' } - assert query_file.size() == 1, 'there should only be one query file' - def reference_file = files.findAll{ dat -> dat.name == 'reference' } - assert reference_file.size() == 1, 'there should only be one reference file' - def integrated_query = outputDir.resolve(query_file.filename) - def integrated_reference = outputDir.resolve(reference_file.filename) - def newKeys = ["input": integrated_query, "reference": integrated_reference] - [id, state + newKeys] - } - // remove keys from split files - | map {id, state -> - def keysToRemove = ["output_files"] - def newState = state.findAll{it.key !in keysToRemove} - [id, newState] - } - // Perform KNN label transfer from integrated reference to integrated query - | knn.run( - fromState: [ - "input": "input", - "modality": "modality", - "input_obsm_features": "output_obsm_integrated", - "reference": "reference", - "reference_obsm_features": "output_obsm_integrated", - "reference_obs_targets": "reference_obs_target", - "output_obs_predictions": "output_obs_predictions", - "output_obs_probability": "output_obs_probability", - "output_compression": "output_compression", - "weights": "knn_weights", - "n_neighbors": "knn_n_neighbors", - "output": "workflow_output" - ], - toState: ["input": "output"] - // toState: {id, output, state -> ["output": output.output]} + // Run harmony integration with leiden clustering + | celltypist_component.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality, + "input_var_gene_names": state.input_var_gene_names, + "input_reference_gene_overlap": state.input_reference_gene_overlap, + "reference": state.reference, + "reference_obs_target": state.reference_obs_target, + "reference_var_gene_names": state.reference_var_gene_names, + "reference_var_input": state.reference_var_input, + "model": state.model, + "feature_selection": state.feature_selection, + "majority_voting": state.majority_voting, + "C": state.C, + "max_iter": state.max_iter, + "use_SGD": state.use_SGD, + "min_prop": state.min_prop, + "output": state.workflow_output, + "output_obs_predictions": state.output_obs_predictions, + "output_obs_probability": state.output_obs_probability + ]}, + args: [ + "input_layer": "log_normalized_10k", + "reference_layer": "log_normalized_10k" + ], + toState: [ + "output": "output" + ] ) - | view {"After processing RNA modality: $it"} - - other_mod_ch = modalities_ch - | filter{id, state -> state.modality != "rna"} + | view {"After annotation: $it"} + | setState(["output"]) - output_ch = rna_ch.mix(other_mod_ch) - | groupTuple(by: 0, sort: "hash") - | map { id, states -> - def new_input = states.collect{it.input} - def modalities = states.collect{it.modality}.unique() - def other_state_keys = states.inject([].toSet()){ current_keys, state -> - def new_keys = current_keys + state.keySet() - return new_keys - }.minus(["output", "input", "modality", "reference"]) - def new_state = other_state_keys.inject([:]){ old_state, argument_name -> - argument_values = states.collect{it.get(argument_name)}.unique() - assert argument_values.size() == 1, "Arguments should be the same across modalities. Please report this \ - as a bug. Argument name: $argument_name, \ - argument value: $argument_values" - def argument_value - argument_values.each { argument_value = it } - def current_state = old_state + [(argument_name): argument_value] - return current_state - } - [id, new_state + ["input": new_input, "modalities": modalities]] - } - | merge.run( - fromState: ["input": "input"], - toState: ["output": "output"], - ) - | setState(["output"]) - emit: output_ch } diff --git a/src/workflows/annotation/celltypist/test.nf b/src/workflows/annotation/celltypist/test.nf index cbe7833fb20..3ff541fe65f 100644 --- a/src/workflows/annotation/celltypist/test.nf +++ b/src/workflows/annotation/celltypist/test.nf @@ -1,7 +1,7 @@ nextflow.enable.dsl=2 -include { harmony_knn } from params.rootDir + "/target/nextflow/workflows/annotation/harmony_knn/main.nf" -include { harmony_knn_test } from params.rootDir + "/target/_test/nextflow/test_workflows/annotation/harmony_knn_test/main.nf" +include { celltypist } from params.rootDir + "/target/nextflow/workflows/annotation/celltypist/main.nf" +include { celltypist_test } from params.rootDir + "/target/_test/nextflow/test_workflows/annotation/celltypist_test/main.nf" params.resources_test = params.rootDir + "/resources_test" workflow test_wf { @@ -11,32 +11,17 @@ workflow test_wf { output_ch = Channel.fromList( [ [ - id: "simple_execution_test", + id: "reference_dataset_test", input: resources_test.resolve("pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu"), - input_layer: "log_normalized", reference: resources_test.resolve("annotation_test_data/TS_Blood_filtered.h5mu"), reference_var_gene_names: "ensemblid", input_obs_batch_label: "sample_id", - reference_layer: "log_normalized", reference_obs_batch_label: "donor_assay", reference_obs_target: "cell_type", - leiden_resolution: [1.0, 0.25] - ], - [ - id: "no_leiden_resolutions_test", - input: resources_test.resolve("pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu"), - input_layer: "log_normalized", - reference: resources_test.resolve("annotation_test_data/TS_Blood_filtered.h5mu"), - reference_var_gene_names: "ensemblid", - input_obs_batch_label: "sample_id", - reference_layer: "log_normalized", - reference_obs_batch_label: "donor_assay", - reference_obs_target: "cell_type", - leiden_resolution: [] ] ]) | map{ state -> [state.id, state] } - | harmony_knn + | celltypist | view { output -> assert output.size() == 2 : "Outputs should contain two elements; [id, state]" @@ -53,14 +38,53 @@ workflow test_wf { "Output: $output" } - | harmony_knn_test.run( + | celltypist_test.run( fromState: [ "input": "output" + ], + args: [ + "expected_modalities": ["rna", "prot"] ] ) - | toSortedList({a, b -> a[0] <=> b[0]}) - | map { output_list -> - assert output_list.size() == 2 : "output channel should contain 2 events" - assert output_list.collect{it[0]} == ["no_leiden_resolutions_test", "simple_execution_test"] + } + +workflow test_wf_2 { + // allow changing the resources_test dir + resources_test = file(params.resources_test) + + output_ch = Channel.fromList( + [ + [ + id: "reference_model_test", + input: resources_test.resolve("annotation_test_data/demo_2000_cells.h5mu"), + model: resources_test.resolve("annotation_test_data/celltypist_model_Immune_All_Low.pkl"), + reference_obs_target: "cell_type", + ], + ]) + | map{ state -> [state.id, state] } + | celltypist + | view { output -> + assert output.size() == 2 : "Outputs should contain two elements; [id, state]" + + // check id + def id = output[0] + assert id.endsWith("_test") : "Output ID should be same as input ID" + + // check output + def state = output[1] + assert state instanceof Map : "State should be a map. Found: ${state}" + assert state.containsKey("output") : "Output should contain key 'output'." + assert state.output.isFile() : "'output' should be a file." + assert state.output.toString().endsWith(".h5mu") : "Output file should end with '.h5mu'. Found: ${state.output}" + + "Output: $output" } - } \ No newline at end of file + | celltypist_test.run( + fromState: [ + "input": "output" + ], + args: [ + "expected_modalities": ["rna"] + ] + ) + } diff --git a/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml b/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml index dbd3a464dbf..340ffaca47f 100644 --- a/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml +++ b/src/workflows/test_workflows/annotation/celltypist/config.vsh.yaml @@ -12,6 +12,12 @@ argument_groups: required: true description: Path to h5mu output. example: foo.final.h5mu + - name: "--expected_modalities" + type: string + multiple: true + required: true + description: List of expected modalities in the output h5mu. + example: ["rna", "prot"] resources: - type: python_script path: script.py diff --git a/src/workflows/test_workflows/annotation/celltypist/script.py b/src/workflows/test_workflows/annotation/celltypist/script.py index 2e1a71b2613..baaaa5a25b3 100644 --- a/src/workflows/test_workflows/annotation/celltypist/script.py +++ b/src/workflows/test_workflows/annotation/celltypist/script.py @@ -11,37 +11,28 @@ def test_run(): input_mudata = read_h5mu(par["input"]) - expected_obsm = ["X_integrated_harmony", "X_leiden_harmony_umap"] - expected_obs = ["cell_type_pred", "cell_type_probability"] - expected_obsp = [ - "harmonypy_integration_distances", - "harmonypy_integration_connectivities", - ] - expected_mod = ["rna", "prot"] + expected_obs = ["celltypist_pred", "celltypist_probability"] + expected_mod = par["expected_modalities"] assert all(key in list(input_mudata.mod) for key in expected_mod), ( f"Input modalities should be: {expected_mod}, found: {input_mudata.mod.keys()}." ) - assert all(key in list(input_mudata.mod["rna"].obsm) for key in expected_obsm), ( - f"Input mod['rna'] obsm columns should be: {expected_obsm}, found: {input_mudata.mod['rna'].obsm.keys()}." - ) assert all(key in list(input_mudata.mod["rna"].obs) for key in expected_obs), ( f"Input mod['rna'] obs columns should be: {expected_obs}, found: {input_mudata.mod['rna'].obs.keys()}." ) - assert all(key in list(input_mudata.mod["rna"].obsp) for key in expected_obsp), ( - f"Input mod['rna'] obsp columns should be: {expected_obsp}, found: {input_mudata.mod['rna'].obsp.keys()}." - ) - assert input_mudata.mod["rna"].obs["cell_type_pred"].dtype == "category", ( + assert input_mudata.mod["rna"].obs["celltypist_pred"].dtype == "category", ( "Cell type predictions should be of dtype category." ) - assert input_mudata.mod["rna"].obs["cell_type_probability"].dtype == "float64", ( + assert input_mudata.mod["rna"].obs["celltypist_probability"].dtype == "float64", ( "Cell type probabilities should be of dtype float64." ) - assert input_mudata.mod["rna"].shape[0] == input_mudata.mod["prot"].shape[0], ( - "Number of observations should be equal in all modalities." - ) + if len(expected_mod) == 2: + assert ( + input_mudata.mod[expected_mod[0]].shape[0] + == input_mudata.mod[expected_mod[1]].shape[0] + ), "Number of observations should be equal in all modalities." if __name__ == "__main__": From 904f1aa2d6e0a27d7bf69e8a766796549176cee0 Mon Sep 17 00:00:00 2001 From: dorien-er Date: Wed, 29 Oct 2025 11:40:07 +0100 Subject: [PATCH 11/11] parallelize --- src/workflows/annotation/celltypist/main.nf | 173 ++++++++++---------- 1 file changed, 90 insertions(+), 83 deletions(-) diff --git a/src/workflows/annotation/celltypist/main.nf b/src/workflows/annotation/celltypist/main.nf index ab1080bf044..496a8b0e1ba 100644 --- a/src/workflows/annotation/celltypist/main.nf +++ b/src/workflows/annotation/celltypist/main.nf @@ -4,89 +4,96 @@ workflow run_wf { main: - output_ch = input_ch - // Set aside the output for this workflow to avoid conflicts - | map {id, state -> - def new_state = state + ["workflow_output": state.output] - [id, new_state] - } - // Log normalize query dataset to target sum of 10000 - | normalize_total.run( - fromState: { id, state -> [ - "input": state.input, - "modality": state.modality, - "input_layer": state.input_layer, - ]}, - args: [ - "output_layer": "normalized_10k", - "target_sum": "10000", - ], - toState: [ - "input": "output", - ] - ) + query_ch = input_ch + // Log normalize query dataset to target sum of 10000 + | normalize_total.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality, + "input_layer": state.input_layer, + ]}, + args: [ + "output_layer": "normalized_10k", + "target_sum": "10000", + ], + toState: [ + "input": "output", + ] + ) + | log1p.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality + ]}, + args: [ + "input_layer": "normalized_10k", + "output_layer": "log_normalized_10k", + ], + toState: [ + "input": "output" + ] + ) + | delete_layer.run( + fromState: { id, state -> [ + "input": state.input, + "modality": state.modality + ]}, + args: [ + "layer": "normalized_10k" + ], + toState: [ + "input": "output" + ] + ) + | view {"After query normalization: $it"} - | log1p.run( - fromState: { id, state -> [ - "input": state.input, - "modality": state.modality - ]}, - args: [ - "input_layer": "normalized_10k", - "output_layer": "log_normalized_10k", - ], - toState: [ - "input": "output" - ] - ) - | delete_layer.run( - fromState: { id, state -> [ - "input": state.input, - "modality": state.modality - ]}, - args: [ - "layer": "normalized_10k" - ], - toState: [ - "input": "output" - ] - ) - // Log normalize reference dataset to target sum of 10000 - | normalize_total.run( - key: "normalize_total_reference", - runIf: { id, state -> - state.reference - }, - fromState: { id, state -> [ - "input": state.reference, - "modality": state.modality, - "input_layer": state.reference_layer, - ]}, - args: [ - "output_layer": "normalized_10k", - "target_sum": "10000", - ], - toState: [ - "reference": "output", - ] - ) - | log1p.run( - key: "log1p_reference", - runIf: { id, state -> - state.reference - }, - fromState: { id, state -> [ - "input": state.reference, - "modality": state.modality - ]}, - args: [ - "input_layer": "normalized_10k", - "output_layer": "log_normalized_10k", - ], - toState: [ - "reference": "output" - ] - ) + ref_ch = input_ch + // Log normalize reference dataset to target sum of 10000 + | normalize_total.run( + key: "normalize_total_reference", + runIf: { id, state -> + state.reference + }, + fromState: { id, state -> [ + "input": state.reference, + "modality": state.modality, + "input_layer": state.reference_layer, + ]}, + args: [ + "output_layer": "normalized_10k", + "target_sum": "10000", + ], + toState: [ + "reference": "output", + ] + ) + | log1p.run( + key: "log1p_reference", + runIf: { id, state -> + state.reference + }, + fromState: { id, state -> [ + "input": state.reference, + "modality": state.modality + ]}, + args: [ + "input_layer": "normalized_10k", + "output_layer": "log_normalized_10k", + ], + toState: [ + "reference": "output" + ] + ) + | view {"After reference normalization: $it"} + + + output_ch = query_ch.join(ref_ch, failOnMismatch: true, failOnDuplicate: true) + | view {"After channel mixing: $it"} + // Set aside the output for this workflow to avoid conflicts + | map {id, query_state, ref_state -> + def newState = query_state + ["reference": ref_state.reference] + [id, newState] + } // Run harmony integration with leiden clustering | celltypist_component.run( fromState: { id, state -> [ @@ -105,7 +112,7 @@ workflow run_wf { "max_iter": state.max_iter, "use_SGD": state.use_SGD, "min_prop": state.min_prop, - "output": state.workflow_output, + "output": state.output, "output_obs_predictions": state.output_obs_predictions, "output_obs_probability": state.output_obs_probability ]},