diff --git a/CHANGELOG.md b/CHANGELOG.md index ffd173a3ebf..9fbd6894b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ -# openpipelines 3.x.x +# openpipelines 3.1.0 + +## Experimental + +* Added `from_tiledb_to_h5mu` component (PR #1068). ## BREAKING diff --git a/src/convert/from_tiledb_to_h5mu/config.vsh.yaml b/src/convert/from_tiledb_to_h5mu/config.vsh.yaml new file mode 100644 index 00000000000..a663c75cfc1 --- /dev/null +++ b/src/convert/from_tiledb_to_h5mu/config.vsh.yaml @@ -0,0 +1,80 @@ +name: from_tiledb_to_h5mu +namespace: convert +scope: public +authors: + - __merge__: /src/authors/dries_schaumont.yaml + roles: [ author, maintainer ] +argument_groups: + - name: Input database + description: "Open a tileDB-SOMA database by URI." + arguments: + - name: "--input_uri" + type: string + description: "A URI pointing to a TileDB-SOMA database." + required: true + example: "s3://bucket/path" + - name: "--s3_region" + description: | + Region where the TileDB-SOMA database is hosted. + type: string + required: false + - name: "--endpoint" + type: string + description: | + Custom endpoint to use to connect to S3 + required: false + - name: "--input_modality" + required: true + type: string + description: | + TileDB-SOMA measurement to take the input from. + - name: "--input_layers" + type: string + multiple: true + required: true + description: | + Layers to import from TileDB-SOMA database. + - name: "MuData output" + arguments: + - name: "--output" + type: file + direction: output + required: true + example: output.h5mu + description: | + The output MuData file. + - name: "--output_modality" + type: string + required: true + description: | + Name of the modality to write the results to. + __merge__: [., /src/base/h5_compression_argument.yaml] + +resources: + - type: python_script + path: script.py + - path: /src/utils/setup_logger.py +test_resources: + - type: python_script + path: test.py + - path: /resources_test/tiledb/ +engines: + - type: docker + image: python:3.12 + setup: + - type: python + packages: + - tiledbsoma + __merge__: /src/base/requirements/anndata_mudata.yaml + test_setup: + - type: python + packages: + - moto[server] + - boto3 + __merge__: [ /src/base/requirements/python_test_setup.yaml, .] +runners: + - type: executable + docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY"] + - type: nextflow + directives: + label: [highmem, midcpu] \ No newline at end of file diff --git a/src/convert/from_tiledb_to_h5mu/script.py b/src/convert/from_tiledb_to_h5mu/script.py new file mode 100644 index 00000000000..269e3f443bb --- /dev/null +++ b/src/convert/from_tiledb_to_h5mu/script.py @@ -0,0 +1,113 @@ +import tiledbsoma +import tiledbsoma.io +import sys +import mudata +import json +import pandas as pd + +## VIASH START +par = { + "input_uri": "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms/", + "input_modality": "rna", + "output": "output.h5mu", + "output_modality": "rna", + "s3_region": "eu-west-3", + "endpoint": None, + "input_layers": ["raw", "log_normalized"], + "output_compression": "gzip", +} + +meta = { + "resources_dir": "src/utils", + "name": "from_tiledb_to_h5mu", +} +## VIASH END + + +sys.path.append(meta["resources_dir"]) +from setup_logger import setup_logger + +logger = setup_logger() +tiledbsoma.logging.info() + + +def _log_arguments(function_obj, arg_dict): + """ + Format a dictionairy of arguments into a string that is put into the script logs. + """ + args_str = [f"\t{param}: {param_val}\n" for param, param_val in arg_dict.items()] + logger.info( + "Calling %s with arguments:\n%s", + function_obj.__name__, + "".join(args_str).rstrip(), + ) + + +def main(par): + logger.info("Component %s started", meta["name"]) + tiledb_config = { + "vfs.s3.no_sign_request": "false", + } + optional_config = { + "vfs.s3.region": par["s3_region"], + "vfs.s3.endpoint_override": par["endpoint"], + } + for config_setting, config_val in optional_config.items(): + if config_val is not None: + tiledb_config[config_setting] = config_val + logger.info("Using the following config to connect: %s", tiledb_config) + context = tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config) + + logger.info( + "Trying to access '%s' in region '%s'", par["input_uri"], par["s3_region"] + ) + with tiledbsoma.open( + par["input_uri"], mode="r", context=context + ) as open_experiment: + logger.info("Connection successful") + to_anndata_args = { + "experiment": open_experiment, + "measurement_name": par["input_modality"], + "extra_X_layer_names": par["input_layers"], + } + func_to_call = tiledbsoma.io.to_anndata + _log_arguments(func_to_call, to_anndata_args) + output_modality = func_to_call(**to_anndata_args) + logger.info("Output anndata was:\n%s", output_modality) + logger.info("Adding column indices to varm and obsm items.") + for multimodal_attribute in ("varm", "obsm"): + multimodal_dict = getattr(output_modality, multimodal_attribute) + for key_ in list(multimodal_dict.keys()): + metadata = getattr( + open_experiment.ms[par["input_modality"]], multimodal_attribute + )[key_].metadata + if "column_index" in metadata: + logger.info( + "Found index for item %s in %s", key_, multimodal_attribute + ) + index_list = json.loads(metadata["column_index"]) + assert isinstance(index_list, list) + multimodal_dict[key_] = pd.DataFrame( + multimodal_dict[key_], + columns=index_list, + index=multimodal_dict.dim_names, + ) + logger.info( + "No column index found for item %s in %s", + key_, + multimodal_attribute, + ) + + logger.info("Converting to MuData") + output_mudata = mudata.MuData({par["output_modality"]: output_modality}) + logger.info( + "Writing output MuData to %s with compression %s", + par["output"], + par["output_compression"], + ) + output_mudata.write(par["output"], compression=par["output_compression"]) + logger.info("Finished!") + + +if __name__ == "__main__": + main(par) diff --git a/src/convert/from_tiledb_to_h5mu/test.py b/src/convert/from_tiledb_to_h5mu/test.py new file mode 100644 index 00000000000..0fdea4ad451 --- /dev/null +++ b/src/convert/from_tiledb_to_h5mu/test.py @@ -0,0 +1,130 @@ +import sys +import pytest +import boto3 +import os +import socket +import tiledbsoma +import tiledbsoma.io +import mudata +from moto.server import ThreadedMotoServer + + +tiledbsoma.logging.debug() + +## VIASH START +meta = { + "executable": "target/executable/convert/from_tiledb_to_h5mu/from_tiledb_to_h5mu", + "resources_dir": "./resources_test", + "cpus": 2, + "config": "./src/convert/from_tiledb_to_h5mu/config.vsh.yaml", +} +sys.path.append("src/utils") +## VIASH END + +sys.path.append(meta["resources_dir"]) +from setup_logger import setup_logger + +logger = setup_logger() + +input_dir = f"{meta['resources_dir']}/tiledb/pbmc_1k_protein_v3_mms" + + +@pytest.fixture(scope="module") +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + +@pytest.fixture(scope="function") +def moto_server(aws_credentials): + """Fixture to run a mocked AWS server for testing.""" + ip_addr = socket.gethostbyname(socket.gethostname()) + # Note: pass `port=0` to get a random free port. + server = ThreadedMotoServer(ip_address=ip_addr, port=0) + server.start() + host, port = server.get_host_and_port() + yield f"http://{host}:{port}" + server.stop() + + +@pytest.fixture +def initiated_database(moto_server): + client = boto3.client("s3", endpoint_url=moto_server, region_name="us-east-1") + client.create_bucket(Bucket="test") + + def raise_(ex): + raise ex + + for root, _, files in os.walk(input_dir, onerror=raise_): + for filename in files: + local_path = os.path.join(root, filename) + relative_path = os.path.relpath(local_path, input_dir) + client.upload_file(local_path, "test", relative_path) + client.close() + return moto_server + + +def test_convert(run_component, initiated_database, random_h5mu_path): + output = random_h5mu_path() + run_component( + [ + "--input_uri", + "s3://test", + "--endpoint", + initiated_database, + "--s3_region", + "us-east-1", + "--input_modality", + "rna", + "--output_modality", + "rna", + "--input_layers", + "log_normalized;raw", + "--output", + output, + ] + ) + output_mudata = mudata.read_h5mu(output) + assert output_mudata.mod_names == ["rna"] + assert output_mudata.shape == (713, 33538) + rna_modality = output_mudata["rna"] + assert set(rna_modality.layers.keys()) == {"raw", "log_normalized"} + assert set(rna_modality.obsm.keys()) == { + "X_leiden_harmony_umap", + "X_pca", + "X_pca_integrated", + "X_umap", + "knn_distances", + "knn_indices", + } + assert set(rna_modality.varm.keys()) == {"pca_loadings"} + assert set(rna_modality.var.columns.to_list()) == { + "filter_with_hvg", + "pct_dropout", + "gene_symbol", + "obs_mean", + "num_nonzero_obs", + "feature_types", + "filter_with_counts", + "total_counts", + "genome", + } + assert set(rna_modality.obs.columns.to_list()) == { + "scrublet_doublet_score", + "pct_of_counts_in_top_50_vars", + "pct_of_counts_in_top_200_vars", + "sample_id", + "harmony_integration_leiden_1.0", + "filter_with_scrublet", + "filter_with_counts", + "total_counts", + "pct_of_counts_in_top_100_vars", + "num_nonzero_vars", + "pct_of_counts_in_top_500_vars", + } + + +if __name__ == "__main__": + sys.exit(pytest.main(["-s", __file__]))