From aba4902b99d6dfa7d564ab2a7fcdc0e7d088f279 Mon Sep 17 00:00:00 2001 From: Dries Schaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Thu, 23 Oct 2025 13:31:12 +0000 Subject: [PATCH 1/3] scvi_leiden: add support for writing to a tileDB database. --- .../from_tiledb_to_h5mu/config.vsh.yaml | 5 + src/convert/from_tiledb_to_h5mu/script.py | 5 +- .../move_mudata_obs_to_tiledb/config.vsh.yaml | 104 +++++++ .../move_mudata_obs_to_tiledb/script.py | 163 +++++++++++ src/tiledb/move_mudata_obs_to_tiledb/test.py | 253 ++++++++++++++++++ .../config.vsh.yaml | 21 +- .../move_mudata_obsm_to_tiledb/script.py | 68 ++++- src/tiledb/move_mudata_obsm_to_tiledb/test.py | 90 ++++++- .../integration/scvi_leiden/config.vsh.yaml | 41 ++- .../scvi_leiden/integration_test.sh | 11 +- src/workflows/integration/scvi_leiden/main.nf | 65 ++++- src/workflows/integration/scvi_leiden/test.nf | 51 ++++ 12 files changed, 840 insertions(+), 37 deletions(-) create mode 100644 src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml create mode 100644 src/tiledb/move_mudata_obs_to_tiledb/script.py create mode 100644 src/tiledb/move_mudata_obs_to_tiledb/test.py diff --git a/src/convert/from_tiledb_to_h5mu/config.vsh.yaml b/src/convert/from_tiledb_to_h5mu/config.vsh.yaml index a663c75cfc1..fcb548abce4 100644 --- a/src/convert/from_tiledb_to_h5mu/config.vsh.yaml +++ b/src/convert/from_tiledb_to_h5mu/config.vsh.yaml @@ -23,6 +23,11 @@ argument_groups: description: | Custom endpoint to use to connect to S3 required: false + - name: "--s3_no_sign_request" + description: | + Do not sign S3 requests. Credentials will not be loaded if this argument is provided. + type: boolean + default: false - name: "--input_modality" required: true type: string diff --git a/src/convert/from_tiledb_to_h5mu/script.py b/src/convert/from_tiledb_to_h5mu/script.py index 269e3f443bb..dc7a647a8cc 100644 --- a/src/convert/from_tiledb_to_h5mu/script.py +++ b/src/convert/from_tiledb_to_h5mu/script.py @@ -45,10 +45,9 @@ def _log_arguments(function_obj, arg_dict): def main(par): logger.info("Component %s started", meta["name"]) - tiledb_config = { - "vfs.s3.no_sign_request": "false", - } + tiledb_config = {} optional_config = { + "vfs.s3.no_sign_request": par["s3_no_sign_request"], "vfs.s3.region": par["s3_region"], "vfs.s3.endpoint_override": par["endpoint"], } diff --git a/src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml b/src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml new file mode 100644 index 00000000000..666ec6dc75b --- /dev/null +++ b/src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml @@ -0,0 +1,104 @@ +name: move_mudata_obs_to_tiledb +namespace: tiledb +scope: "private" +description: | + Move .obs columns from a MuData modality to an existing tileDB database. + The .obs keys should not exist in the database yet; and the observations from the modality and + their order should match with what is already present the tiledb database. +authors: + - __merge__: /src/authors/dries_schaumont.yaml + roles: [ author, maintainer ] +argument_groups: + - name: Input database + description: "Open a tileDB-SOMA database by URI or as a local directory." + arguments: + - name: "--input_uri" + type: string + description: "A URI pointing to a TileDB-SOMA database. Mutually exclusive with 'input_dir'" + required: false + example: "s3://bucket/path" + - name: "--input_dir" + type: file + required: false + description: "Path to a TileDB-SOMA database as a local directory" + example: "./tiledb_database" + - name: "--s3_region" + description: | + Region where the TileDB-SOMA database is hosted. + type: string + required: false + - name: "--endpoint" + type: string + description: | + Custom endpoint to use to connect to S3 + required: false + - name: "--s3_no_sign_request" + description: | + Do not sign S3 requests. Credentials will not be loaded if this argument is provided. + type: boolean + default: false + - name: "--output_modality" + type: string + description: | + TileDB-SOMA measurement to add the output to. + - name: "--obs_index_name_input" + description: | + Name of the index that is used to describe the cells (observations). + type: string + default: cell_id + - name: "MuData input" + arguments: + - name: "--input_mudata" + type: file + required: true + description: | + MuData object to take the columns from. The observations and their order should + match between the database and the input modality. + - name: "--modality" + required: true + type: string + description: | + Modality where to take the .obs from. + - name: "--obs_input" + type: string + multiple: true + description: | + Columns from .obs to copy. The keys should not be present yet in the database. + - name: "TileDB-SOMA output" + arguments: + - name: "--output_tiledb" + type: file + direction: output + required: False + description: | + Output to a directory instead of adding to the existing database. +resources: + - type: python_script + path: script.py + - path: /src/utils/setup_logger.py +test_resources: + - type: python_script + path: test.py + - path: /resources_test/tiledb/ + - path: /resources_test/pbmc_1k_protein_v3/ +engines: + - type: docker + image: python:3.12 + setup: + - type: python + packages: + - tiledbsoma + - boto3 + - awscli + __merge__: /src/base/requirements/anndata_mudata.yaml + test_setup: + - type: python + packages: + - moto[server] + __merge__: [ /src/base/requirements/python_test_setup.yaml, .] +runners: + - type: executable + docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY", "--env", "AWS_DEFAULT_REGION"] + - type: nextflow + directives: + label: [highmem, midcpu] \ No newline at end of file diff --git a/src/tiledb/move_mudata_obs_to_tiledb/script.py b/src/tiledb/move_mudata_obs_to_tiledb/script.py new file mode 100644 index 00000000000..26134f5dd30 --- /dev/null +++ b/src/tiledb/move_mudata_obs_to_tiledb/script.py @@ -0,0 +1,163 @@ +import sys +import mudata +import tiledbsoma +import tiledbsoma.io +import pandas as pd +from pathlib import Path + +## VIASH START +par = { + "input_uri": "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms/", + "input_mudata": "./resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu", + "obs_input": ["test_slot"], + "modality": "rna", + "input_dir": None, + "s3_region": "eu-west-3", + "endpoint": None, + "output_tiledb": "./output", + "output_modality": "rna", + "s3_no_sign_request": True, + "obs_index_name_input": "cell_id", +} +meta = {"resources_dir": "src/utils", "name": "move_mudata_obs_to_tiledb"} + +test_path = "./mudata_for_testing.h5mu" +test_mudata = mudata.read_h5mu(par["input_mudata"]) +test_mudata["rna"].obs["test_slot"] = ( + test_mudata["rna"].obs["filter_with_counts"].copy() +) +test_mudata.write(test_path) +par["input_mudata"] = test_path +## VIASH END + +sys.path.append(meta["resources_dir"]) +from setup_logger import setup_logger + +logger = setup_logger() + +tiledbsoma.logging.info() + + +def download_s3_dir(input_uri, output_dir_path): + logger.info("Requested to output to a directory. Downloading database...") + output_dir_path.mkdir(parents=True, exist_ok=True) + import boto3 + from awscli.customizations.s3.utils import split_s3_bucket_key + + bucket, key = split_s3_bucket_key(input_uri) + connection_args = { + "endpoint_url": par["endpoint"], + "region_name": par["s3_region"], + } + if par["s3_no_sign_request"]: + import botocore + + connection_args["config"] = botocore.config.Config( + signature_version=botocore.UNSIGNED + ) + + client = boto3.resource("s3", **connection_args) + bucket = client.Bucket(bucket) + for i, s3_obj in enumerate(bucket.objects.filter(Prefix=key)): + output_path = output_dir_path / s3_obj.key.removeprefix(key).lstrip("/") + output_path.parent.mkdir(parents=True, exist_ok=True) + bucket.download_file(s3_obj.key, output_path) + print(f"Downloaded {i} files.", file=sys.stdout, flush=True, end="\r") + logger.info("Download completed!") + + +def main(par): + logger.info(f"Component {meta['name']} started.") + if par["input_uri"]: + par["input_uri"] = par["input_uri"].rstrip("/") + if par["input_uri"] and par["input_dir"]: + raise ValueError("Cannot provide both 'input_uri' and 'input_dir'.") + if not par["input_uri"] and not par["input_dir"]: + raise ValueError("Must provide either 'input_uri' or 'input_dir'") + if not par["obs_input"]: + raise ValueError("Please provide at least one .obs column.") + logger.info( + "Opening mudata file '%s', modality '%s'.", par["input_mudata"], par["modality"] + ) + modality_data = mudata.read_h5ad(par["input_mudata"], mod=par["modality"]) + logger.info( + "Done reading modality. Looking at .obs for keys: '%s'", + ",".join(par["obs_input"]), + ) + try: + for obs_key in par["obs_input"]: + modality_data.obs[obs_key] + except KeyError as e: + raise KeyError("Not all .obs keys were found in the input!") from e + + logger.info("Done getting .obs keys.") + optional_config = { + "vfs.s3.region": par["s3_region"], + "vfs.s3.endpoint_override": par["endpoint"], + "vfs.s3.no_sign_request": par["s3_no_sign_request"], + } + tiledb_config = {} + for config_setting, config_val in optional_config.items(): + if config_val is not None: + tiledb_config[config_setting] = config_val + logger.info("Using the following config to connect to S3: %s", tiledb_config) + + if par["output_tiledb"]: + output_dir_path = Path(par["output_tiledb"]) + if par["input_dir"]: + import shutil + + shutil.copytree(par["input_dir"], output_dir_path, dirs_exist_ok=True) + else: + download_s3_dir(par["input_uri"], output_dir_path) + logger.info("Setting input to '%s'", output_dir_path) + par["input_uri"] = f"file://{output_dir_path.resolve()}" + logger.info("Overwriting TileDB config because S3 connection is not required.") + tiledb_config = {} + + logger.info("Trying to access '%s'", par["input_uri"]) + logger.info("Fetching .obs") + with tiledbsoma.open( + par["input_uri"], + mode="r", + context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config), + ) as open_experiment: + logger.info("Connection established.") + obs_df = open_experiment.obs.read().concat().to_pandas() + logger.info("Done downloading .obs from databse.") + logger.info("Adding obs columns to fetched .obs dataframe.") + overlapping_obs = set(par["obs_input"]).intersection(set(obs_df.columns.to_list())) + if overlapping_obs: + raise ValueError( + f"The following keys already exist in the database: {','.join(overlapping_obs)}." + ) + + columns_to_add = modality_data.obs[par["obs_input"]] + new_obs = pd.merge( + obs_df, + columns_to_add, + left_on=par["obs_index_name_input"], + right_index=True, + how="right", + ) + logger.info( + "Writing obs back to database. Connection to %s with config %s", + par["input_uri"], + tiledb_config, + ) + with tiledbsoma.open( + par["input_uri"], + mode="w", + context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config), + ) as open_experiment: + tiledbsoma.io.update_obs( + open_experiment, + new_data=new_obs, + default_index_name=par["obs_index_name_input"], + context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config), + ) + logger.info("Finished!") + + +if __name__ == "__main__": + main(par) diff --git a/src/tiledb/move_mudata_obs_to_tiledb/test.py b/src/tiledb/move_mudata_obs_to_tiledb/test.py new file mode 100644 index 00000000000..0fd17fec587 --- /dev/null +++ b/src/tiledb/move_mudata_obs_to_tiledb/test.py @@ -0,0 +1,253 @@ +import sys +import pytest +import boto3 +import os +from moto.server import ThreadedMotoServer +from contextlib import contextmanager +import socket +import tiledbsoma +import subprocess +import re +import pandas as pd +import numpy as np +import mudata +import requests + + +## VIASH START +meta = { + "executable": "target/executable/tiledb/move_mudata_obs_to_tiledb/move_mudata_obs_to_tiledb", + "resources_dir": "./resources_test", + "cpus": 2, + "config": "./src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml", +} +sys.path.append("src/utils") +## VIASH END + +sys.path.append(meta["resources_dir"]) + +input_dir = f"{meta['resources_dir']}/tiledb/pbmc_1k_protein_v3_mms" + + +@pytest.fixture +def input_mudata(): + return mudata.read_h5mu( + f"{meta['resources_dir']}/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu" + ) + + +@pytest.fixture +def input_mudata_extra_output_slot(input_mudata): + new_obs_col = pd.Series( + np.random.rand(input_mudata["rna"].n_obs, 1).ravel(), + index=input_mudata["rna"].obs_names, + ) + input_mudata["rna"].obs["test_input_slot"] = new_obs_col + return input_mudata + + +@pytest.fixture +def input_mudata_path(random_h5mu_path, input_mudata): + output_path = random_h5mu_path() + input_mudata.write(output_path) + return output_path + + +@pytest.fixture(scope="module") +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + +@contextmanager +def managed_moto_server(*args, **kwargs): + server = ThreadedMotoServer(*args, **kwargs) + server.start() + try: + yield server + finally: + server.stop() + + +@pytest.fixture(scope="function") +def moto_server(aws_credentials): + """Fixture to run a mocked AWS server for testing.""" + # Note: pass `port=0` to get a random free port. + with managed_moto_server( + ip_address=socket.gethostbyname(socket.gethostname()), port=0 + ) as moto_server: + yield moto_server + + +@pytest.fixture +def initiated_database(moto_server): + host, port = moto_server.get_host_and_port() + server_uri = f"http://{host}:{port}" + client = boto3.client("s3", endpoint_url=server_uri, region_name="us-east-1") + client.create_bucket(Bucket="test") + + def raise_(ex): + raise ex + + for root, _, files in os.walk(input_dir, onerror=raise_): + for filename in files: + local_path = os.path.join(root, filename) + relative_path = os.path.relpath(local_path, input_dir) + client.upload_file(local_path, "test", relative_path) + client.close() + yield server_uri + requests.post(f"{server_uri}/moto-api/reset") + + +def test_key_already_exists_raises( + run_component, input_mudata_path, initiated_database +): + with pytest.raises(subprocess.CalledProcessError) as err: + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--output_modality", + "rna", + "--input_mudata", + str(input_mudata_path), + "--modality", + "rna", + "--obs_input", + "filter_with_counts", + ] + ) + assert re.search( + r"ValueError: The following keys already exist in the database: filter_with_counts", + err.value.stdout.decode("utf-8"), + ) + + +def test_missing_obsm_key_raises(run_component, initiated_database, input_mudata_path): + with pytest.raises(subprocess.CalledProcessError) as err: + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--output_modality", + "rna", + "--input_mudata", + str(input_mudata_path), + "--modality", + "rna", + "--obs_input", + "doesnotexist", + ] + ) + assert re.search( + r"Not all \.obs keys were found in the input!", + err.value.stdout.decode("utf-8"), + ) + + +def test_add( + run_component, initiated_database, input_mudata_extra_output_slot, random_h5mu_path +): + input_path = random_h5mu_path() + input_mudata_extra_output_slot.write(input_path) + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--output_modality", + "rna", + "--input_mudata", + str(input_path), + "--modality", + "rna", + "--obs_input", + "test_input_slot", + ] + ) + obs_key_uri = "s3://test/obs" + tiledb_config = { + "vfs.s3.no_sign_request": "false", + "vfs.s3.region": "us-east-1", + "vfs.s3.endpoint_override": initiated_database, + } + context = tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config) + with tiledbsoma.open(uri=obs_key_uri, mode="r", context=context) as open_array: + obs_data = open_array.read().concat().to_pandas() + obs_data = obs_data.set_index("cell_id") + assert "test_input_slot" in obs_data.columns + mudata_col = mudata.read_h5ad(input_path, mod="rna").obs["test_input_slot"] + pd.testing.assert_series_equal( + obs_data.loc[:, "test_input_slot"], + mudata_col, + check_like=True, + check_names=False, + ) + + +def test_output_folder( + run_component, + initiated_database, + input_mudata_extra_output_slot, + random_h5mu_path, + tmp_path, +): + input_path = random_h5mu_path() + input_mudata_extra_output_slot.write(input_path) + output_path = tmp_path / "tiledb_out" + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--output_modality", + "rna", + "--input_mudata", + str(input_path), + "--modality", + "rna", + "--obs_input", + "test_input_slot", + "--output_tiledb", + output_path, + ] + ) + assert output_path.is_dir() + obs_key_uri = output_path / "obs" + assert obs_key_uri.is_dir() + print(list(obs_key_uri.iterdir()), file=sys.stderr, flush=True) + with tiledbsoma.open( + uri=f"file://{str(obs_key_uri.resolve())}", + mode="r", + context=tiledbsoma.SOMATileDBContext(), + ) as open_array: + obs_data = open_array.read().concat().to_pandas() + obs_data = obs_data.set_index("cell_id") + assert "test_input_slot" in obs_data.columns + mudata_col = mudata.read_h5ad(input_path, mod="rna").obs["test_input_slot"] + pd.testing.assert_series_equal( + obs_data.loc[:, "test_input_slot"], + mudata_col, + check_like=True, + check_names=False, + ) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-s", __file__])) diff --git a/src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml b/src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml index 19ac5168497..0697bb297f7 100644 --- a/src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml +++ b/src/tiledb/move_mudata_obsm_to_tiledb/config.vsh.yaml @@ -27,6 +27,11 @@ argument_groups: description: | Custom endpoint to use to connect to S3 required: false + - name: "--s3_no_sign_request" + description: | + Do not sign S3 requests. Credentials will not be loaded if this argument is provided. + type: boolean + default: false - name: "--output_modality" type: string description: | @@ -35,10 +40,12 @@ argument_groups: arguments: - name: "--input_mudata" type: file + required: true description: | MuData object to take the columns from. The observations and their order should match between the database and the input modality. - name: "--modality" + required: true type: string description: | Modality where to take the .obsm from. @@ -47,7 +54,14 @@ argument_groups: multiple: true description: | Keys from .obm to copy. The keys should not be present yet in the database. - + - name: "TileDB-SOMA output" + arguments: + - name: "--output_tiledb" + type: file + direction: output + required: False + description: | + Output to a directory instead of adding to the existing database. resources: - type: python_script path: script.py @@ -64,16 +78,17 @@ engines: - type: python packages: - tiledbsoma + - boto3 + - awscli __merge__: /src/base/requirements/anndata_mudata.yaml test_setup: - type: python packages: - moto[server] - - boto3 __merge__: [ /src/base/requirements/python_test_setup.yaml, .] runners: - type: executable - docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY"] + docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY", "--env", "AWS_DEFAULT_REGION"] - type: nextflow directives: label: [highmem, midcpu] \ No newline at end of file diff --git a/src/tiledb/move_mudata_obsm_to_tiledb/script.py b/src/tiledb/move_mudata_obsm_to_tiledb/script.py index a5b6aadbb5a..140d5720801 100644 --- a/src/tiledb/move_mudata_obsm_to_tiledb/script.py +++ b/src/tiledb/move_mudata_obsm_to_tiledb/script.py @@ -4,10 +4,26 @@ import tiledbsoma.io import pandas as pd import json +from pathlib import Path ## VIASH START -par = {} -meta = {"resources_dir": "src/utils"} +par = { + "input_uri": "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms/", + "input_mudata": "./resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu", + "obsm_input": ["test_slot"], + "modality": "rna", + "s3_region": "eu-west-3", + "endpoint": None, + "output_tiledb": "./output", + "output_modality": "rna", +} +meta = {"resources_dir": "src/utils", "name": "move_mudata_obsm_to_tiledb"} + +test_path = "./mudata_for_testing.h5mu" +test_mudata = mudata.read_h5mu(par["input_mudata"]) +test_mudata["rna"].obsm["test_slot"] = test_mudata["rna"].obsm["X_pca"].copy() +test_mudata.write(test_path) +par["input_mudata"] = test_path ## VIASH END sys.path.append(meta["resources_dir"]) @@ -39,22 +55,51 @@ def main(par): raise KeyError("Not all .obsm keys were found in the input!") from e logger.info("Done getting .obsm keys.") - tiledb_config = { - "vfs.s3.no_sign_request": "false", - } optional_config = { "vfs.s3.region": par["s3_region"], "vfs.s3.endpoint_override": par["endpoint"], + "vfs.s3.no_sign_request": par["s3_no_sign_request"], } + tiledb_config = {} for config_setting, config_val in optional_config.items(): if config_val is not None: tiledb_config[config_setting] = config_val - logger.info("Using the following config to connect: %s", tiledb_config) + logger.info("Using the following config to connect to S3: %s", tiledb_config) context = tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config) - logger.info( - "Trying to access '%s' in region '%s'", par["input_uri"], par["s3_region"] - ) + if par["output_tiledb"]: + logger.info("Requested to output to a directory. Downloading database...") + output_dir_path = Path(par["output_tiledb"]) + output_dir_path.mkdir(parents=True, exist_ok=True) + import boto3 + from awscli.customizations.s3.utils import split_s3_bucket_key + + bucket, key = split_s3_bucket_key(par["input_uri"]) + connection_args = { + "endpoint_url": par["endpoint"], + "region_name": par["s3_region"], + } + if par["s3_no_sign_request"]: + import botocore + + connection_args["config"] = botocore.config.Config( + signature_version=botocore.UNSIGNED + ) + + client = boto3.resource("s3", **connection_args) + bucket = client.Bucket(bucket) + for i, s3_obj in enumerate(bucket.objects.filter(Prefix=key)): + output_path = output_dir_path / s3_obj.key.removeprefix(key).lstrip("/") + output_path.parent.mkdir(parents=True, exist_ok=True) + bucket.download_file(s3_obj.key, output_path) + print(f"Downloaded {i} files.", file=sys.stdout, flush=True, end="\r") + logger.info("Download completed!") + logger.info("Setting input to %s", par["input_uri"]) + par["input_uri"] = f"file://{output_dir_path.resolve()}" + logger.info("Overwriting TileDB config because S3 connection is not required.") + context = tiledbsoma.SOMATileDBContext() + + logger.info("Trying to access '%s'", par["input_uri"]) with tiledbsoma.open( par["input_uri"], mode="w", context=context ) as open_experiment: @@ -63,7 +108,8 @@ def main(par): measurement = open_experiment.ms[par["output_modality"]] logger.info("Checking if keys do not already exist.") existing_keys = measurement.obsm.keys() - overlap = set(existing_keys).intersection(set(keys_to_transfer)) + logger.info("Existing keys: %s", ",".join(existing_keys)) + overlap = set(existing_keys).intersection(set(keys_to_transfer.keys())) if overlap: raise ValueError( f"The following keys already exist in the database: {','.join(overlap)}." @@ -89,6 +135,8 @@ def main(par): matrix_data=obsm_val, context=context, ) + # Allow for more than one obsm slot to be transferred + open_experiment = open_experiment.reopen(mode="w") if index_as_json: uri = f"{par['input_uri']}/ms/{par['output_modality']}/obsm/{key}" with tiledbsoma.open( diff --git a/src/tiledb/move_mudata_obsm_to_tiledb/test.py b/src/tiledb/move_mudata_obsm_to_tiledb/test.py index 5ec626ac585..b0755c5e9a2 100644 --- a/src/tiledb/move_mudata_obsm_to_tiledb/test.py +++ b/src/tiledb/move_mudata_obsm_to_tiledb/test.py @@ -11,6 +11,8 @@ import numpy as np import mudata import json +from contextlib import contextmanager +import requests ## VIASH START @@ -24,9 +26,6 @@ ## VIASH END sys.path.append(meta["resources_dir"]) -from setup_logger import setup_logger - -logger = setup_logger() input_dir = f"{meta['resources_dir']}/tiledb/pbmc_1k_protein_v3_mms" @@ -64,21 +63,31 @@ def aws_credentials(): os.environ["AWS_DEFAULT_REGION"] = "us-east-1" +@contextmanager +def managed_moto_server(*args, **kwargs): + server = ThreadedMotoServer(*args, **kwargs) + server.start() + try: + yield server + finally: + server.stop() + + @pytest.fixture(scope="function") def moto_server(aws_credentials): """Fixture to run a mocked AWS server for testing.""" - ip_addr = socket.gethostbyname(socket.gethostname()) # Note: pass `port=0` to get a random free port. - server = ThreadedMotoServer(ip_address=ip_addr, port=0) - server.start() - host, port = server.get_host_and_port() - yield f"http://{host}:{port}" - server.stop() + with managed_moto_server( + ip_address=socket.gethostbyname(socket.gethostname()), port=0 + ) as moto_server: + yield moto_server -@pytest.fixture +@pytest.fixture(scope="function") def initiated_database(moto_server): - client = boto3.client("s3", endpoint_url=moto_server, region_name="us-east-1") + host, port = moto_server.get_host_and_port() + server_uri = f"http://{host}:{port}" + client = boto3.client("s3", endpoint_url=server_uri, region_name="us-east-1") client.create_bucket(Bucket="test") def raise_(ex): @@ -89,7 +98,9 @@ def raise_(ex): local_path = os.path.join(root, filename) relative_path = os.path.relpath(local_path, input_dir) client.upload_file(local_path, "test", relative_path) - return moto_server + client.close() + yield server_uri + requests.post(f"{server_uri}/moto-api/reset") def test_key_already_exists_raises( @@ -192,5 +203,60 @@ def test_add( ] +def test_output_folder( + run_component, + initiated_database, + input_mudata_extra_output_slot, + random_h5mu_path, + tmp_path_factory, + tmp_path, +): + input_path = random_h5mu_path() + input_mudata_extra_output_slot.write(input_path) + output_path = tmp_path_factory.mktemp(tmp_path) + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--output_modality", + "rna", + "--input_mudata", + str(input_path), + "--modality", + "rna", + "--obsm_input", + "test_input_slot", + "--output_tiledb", + output_path, + ] + ) + assert output_path.is_dir() + obsm_key_uri = output_path / "ms/rna/obsm/test_input_slot" + assert obsm_key_uri.is_dir() + print(list(obsm_key_uri.iterdir()), file=sys.stderr, flush=True) + with tiledbsoma.open( + uri=f"file://{str(obsm_key_uri.resolve())}", + mode="r", + context=tiledbsoma.SOMATileDBContext(), + ) as open_array: + obsm_data = open_array.read().coos().concat().to_scipy().todense() + assert obsm_data.shape == (713, 5) + original_data = ( + input_mudata_extra_output_slot["rna"].obsm["test_input_slot"].to_numpy() + ) + np.testing.assert_allclose(original_data, obsm_data) + assert json.loads(open_array.metadata["column_index"]) == [ + "a", + "b", + "c", + "d", + "e", + ] + + if __name__ == "__main__": sys.exit(pytest.main(["-s", __file__])) diff --git a/src/workflows/integration/scvi_leiden/config.vsh.yaml b/src/workflows/integration/scvi_leiden/config.vsh.yaml index ae564eeec87..139c2934065 100644 --- a/src/workflows/integration/scvi_leiden/config.vsh.yaml +++ b/src/workflows/integration/scvi_leiden/config.vsh.yaml @@ -10,16 +10,32 @@ info: argument_groups: - name: "Inputs" arguments: - - name: "--id" - required: true - type: string - description: ID of the sample. - example: foo - name: "--input" - required: true type: file - description: Path to the sample. + description: Path to the sample. Mutually exclusive with the 'input_uri' argument. example: dataset.h5mu + - name: "--tiledb_input_uri" + type: string + description: "A URI containing the TileDB-SOMA objects. Mutually exclusive with the 'input' argument." + example: "s3://bucket/path" + - name: "--tiledb_s3_region" + type: string + description: | + Region where the TileDB-SOMA database is hosted. + type: string + example: "us-west-2" + - name: "--tiledb_endpoint" + type: string + description: | + Custom endpoint to use to connect to S3 + required: false + - name: "--tiledb_s3_no_sign_request" + description: | + Do not sign S3 requests. Credentials will not be loaded if this argument is provided. + type: boolean + default: false + - name: Input slots + arguments: - name: "--layer" type: string description: use specified layer for expression values instead of the .X object from the modality. @@ -43,6 +59,13 @@ argument_groups: required: true direction: output example: output_dir/ + - name: "--output_tiledb" + type: file + direction: output + description: | + Output the TileDB database to the specified directory instead of adding it to the existing database. + required: false + - name: Neighbour calculation arguments: - name: "--uns_neighbors" @@ -159,6 +182,10 @@ argument_groups: dependencies: - name: integrate/scvi - name: workflows/multiomics/neighbors_leiden_umap + - name: convert/from_tiledb_to_h5mu + - name: tiledb/move_mudata_obsm_to_tiledb + - name: tiledb/move_mudata_obs_to_tiledb + resources: - type: nextflow_script path: main.nf diff --git a/src/workflows/integration/scvi_leiden/integration_test.sh b/src/workflows/integration/scvi_leiden/integration_test.sh index d9577b5dd48..3cdcde71132 100755 --- a/src/workflows/integration/scvi_leiden/integration_test.sh +++ b/src/workflows/integration/scvi_leiden/integration_test.sh @@ -6,10 +6,19 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" +# nextflow \ +# run . \ +# -main-script src/workflows/integration/scvi_leiden/test.nf \ +# -entry test_wf \ +# -profile docker,no_publish \ +# -c src/workflows/utils/labels_ci.config \ +# -c src/workflows/utils/integration_tests.config + + nextflow \ run . \ -main-script src/workflows/integration/scvi_leiden/test.nf \ - -entry test_wf \ + -entry test_tiledb_wf \ -profile docker,no_publish \ -c src/workflows/utils/labels_ci.config \ -c src/workflows/utils/integration_tests.config diff --git a/src/workflows/integration/scvi_leiden/main.nf b/src/workflows/integration/scvi_leiden/main.nf index baed2aa6c29..2fe50aa10f5 100644 --- a/src/workflows/integration/scvi_leiden/main.nf +++ b/src/workflows/integration/scvi_leiden/main.nf @@ -8,6 +8,30 @@ workflow run_wf { def new_state = state + ["workflow_output": state.output] [id, new_state] } + | map {id, state -> + assert state.input || state.tiledb_input_uri: "Either --input or --tiledb_input_uri must be defined" + assert !(state.input && state.tiledb_input_uri): "Values were specified for both --input and --tiledb_input_uri. Please choose one." + assert state.tiledb_s3_region || !state.tiledb_input_uri: "Specifying 'tiledb_s3_region' also requires 'tiledb_input_uri'." + assert !state.tiledb_input_uri || state.layer: "When using tileDB input, you must specify a layer using --layer" + [id, state] + } + | from_tiledb_to_h5mu.run( + runIf: {id, state -> state.tiledb_input_uri}, + fromState: { id, state -> + [ + "input_uri": state.tiledb_input_uri, + "s3_region": state.tiledb_s3_region, + "endpoint": state.tiledb_endpoint, + "input_modality": state.modality, + "output_modality": state.modality, + "input_layers": [ state.layer ], + "s3_no_sign_request": state.tiledb_s3_no_sign_request + ] + }, + toState: [ + "input": "output", + ] + ) | scvi.run( fromState: [ "input": "input", @@ -49,7 +73,46 @@ workflow run_wf { ], toState: ["output": "output"] ) - | setState(["output", "output_model"]) + | move_mudata_obsm_to_tiledb.run( + runIf: {id, state -> state.tiledb_input_uri}, + fromState: {id, state -> + [ + "input_mudata": state.output, + "modality": state.modality, + "input_uri": state.tiledb_input_uri, + "s3_region": state.tiledb_s3_region, + "endpoint": state.tiledb_endpoint, + "output_modality": state.modality, + "obsm_input": [state.obsm_umap, state.obsm_output], + "output_tiledb": state.output_tiledb, + "s3_no_sign_request": state.tiledb_s3_no_sign_request + ] + }, + toState: ["previous_output_tiledb": "output_tiledb"] + ) + | move_mudata_obs_to_tiledb.run( + runIf: {id, state -> state.tiledb_input_uri}, + fromState: {id, state -> + def new_state = [ + "input_mudata": state.output, + "modality": state.modality, + "s3_region": state.tiledb_s3_region, + "endpoint": state.tiledb_endpoint, + "output_modality": state.modality, + "obs_input": state.leiden_resolution.collect{state.obs_cluster + "_" + it}, + "output_tiledb": state.output_tiledb, + "s3_no_sign_request": state.tiledb_s3_no_sign_request + ] + if (state.previous_output_tiledb && file(state.previous_output_tiledb).exists()) { + new_state += ["input_dir": state.previous_output_tiledb] + } else { + new_state += ["input_uri": state.tiledb_input_uri] + } + return new_state + }, + toState: ["output_tiledb": "output_tiledb"] + ) + | setState(["output", "output_model", "output_tiledb"]) emit: output_ch diff --git a/src/workflows/integration/scvi_leiden/test.nf b/src/workflows/integration/scvi_leiden/test.nf index 27706f29627..c51cfec0dd2 100644 --- a/src/workflows/integration/scvi_leiden/test.nf +++ b/src/workflows/integration/scvi_leiden/test.nf @@ -57,3 +57,54 @@ workflow test_wf { } } + +workflow test_tiledb_wf { + + resources_test = file(params.resources_test) + + output_ch = Channel.fromList([ + [ + id: "tiledb_test", + tiledb_input_uri: "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms", + tiledb_s3_region: "eu-west-3", + layer: "raw", + obs_batch: "sample_id", + max_epochs: 1, + output_model: "simple_execution_test_model/", + output_tiledb: "tiledb_out", + tiledb_s3_no_sign_request: true + ], + ]) + | map{ state -> [state.id, state] } + | scvi_leiden + | view { output -> + assert output.size() == 2 : "Outputs should contain two elements; [id, state]" + + // check id + def id = output[0] + assert id.endsWith("_test") + + // check output + def state = output[1] + assert state instanceof Map : "State should be a map. Found: ${state}" + assert state.containsKey("output") : "Output should contain key 'output'." + assert state.output.isFile() : "'output' should be a file." + assert state.output.toString().endsWith(".h5mu") : "Output file should end with '.h5mu'. Found: ${state.output}" + + // check model_output + assert state.containsKey("output_model") : "Output should contain key 'output_model'." + assert state.output_model.isDirectory() : "'output_model' should be a directory." + assert state.output_model.toString().endsWith("_model") : "Model output directory should end with '_model'. Found: ${state.output_model}" + + // check tiledb output + assert state.containsKey("output_tiledb") : "Output should contain key 'output_tiledb'." + assert state.output_tiledb.isDirectory() : "'output_tiledb' should be a directory." + + "Output: $output" + } + | toSortedList({a, b -> a[0] <=> b[0]}) + | map { output_list -> + assert output_list.size() == 1 : "output channel should contain 1 events" + assert output_list.collect{it[0]} == ["tiledb_test"] + } +} From 0db41f217899aa4f5d50434028c985cd9cf581a0 Mon Sep 17 00:00:00 2001 From: Dries Schaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Thu, 23 Oct 2025 13:34:13 +0000 Subject: [PATCH 2/3] Undo change to integration_test script --- .../integration/scvi_leiden/integration_test.sh | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/workflows/integration/scvi_leiden/integration_test.sh b/src/workflows/integration/scvi_leiden/integration_test.sh index 3cdcde71132..e4760732dca 100755 --- a/src/workflows/integration/scvi_leiden/integration_test.sh +++ b/src/workflows/integration/scvi_leiden/integration_test.sh @@ -6,13 +6,13 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -# nextflow \ -# run . \ -# -main-script src/workflows/integration/scvi_leiden/test.nf \ -# -entry test_wf \ -# -profile docker,no_publish \ -# -c src/workflows/utils/labels_ci.config \ -# -c src/workflows/utils/integration_tests.config +nextflow \ + run . \ + -main-script src/workflows/integration/scvi_leiden/test.nf \ + -entry test_wf \ + -profile docker,no_publish \ + -c src/workflows/utils/labels_ci.config \ + -c src/workflows/utils/integration_tests.config nextflow \ From 6c049c93c7eac485343581b42ac7d99fe7c37636 Mon Sep 17 00:00:00 2001 From: Dries Schaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Thu, 23 Oct 2025 13:37:12 +0000 Subject: [PATCH 3/3] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 615b44a0c78..c426e368bd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ * Added `from_tiledb_to_h5mu` component (PR #1068). +* `workflows/integration/scvi_leiden`: add support for adding the output slots to a tileDB-SOMA database (PR 1094). + ## MAJOR CHANGES * `mapping/samtools_sort` has been deprecated and will be removed in openpipeline 4.0. Use [vsh://biobox/samtools/samtools_sort](https://www.viash-hub.com/packages/biobox/latest/components/samtools/samtools_sort) instead.