Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

* Added `from_tiledb_to_h5mu` component (PR #1068).

* `workflows/integration/scvi_leiden`: add support for adding the output slots to a tileDB-SOMA database (PR 1094).

## MAJOR CHANGES

* `mapping/samtools_sort` has been deprecated and will be removed in openpipeline 4.0. Use [vsh://biobox/samtools/samtools_sort](https://www.viash-hub.com/packages/biobox/latest/components/samtools/samtools_sort) instead.
Expand Down
5 changes: 5 additions & 0 deletions src/convert/from_tiledb_to_h5mu/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ argument_groups:
description: |
Custom endpoint to use to connect to S3
required: false
- name: "--s3_no_sign_request"
description: |
Do not sign S3 requests. Credentials will not be loaded if this argument is provided.
type: boolean
default: false
- name: "--input_modality"
required: true
type: string
Expand Down
5 changes: 2 additions & 3 deletions src/convert/from_tiledb_to_h5mu/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ def _log_arguments(function_obj, arg_dict):

def main(par):
logger.info("Component %s started", meta["name"])
tiledb_config = {
"vfs.s3.no_sign_request": "false",
}
tiledb_config = {}
optional_config = {
"vfs.s3.no_sign_request": par["s3_no_sign_request"],
"vfs.s3.region": par["s3_region"],
"vfs.s3.endpoint_override": par["endpoint"],
}
Expand Down
104 changes: 104 additions & 0 deletions src/tiledb/move_mudata_obs_to_tiledb/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
name: move_mudata_obs_to_tiledb
namespace: tiledb
scope: "private"
description: |
Move .obs columns from a MuData modality to an existing tileDB database.
The .obs keys should not exist in the database yet; and the observations from the modality and
their order should match with what is already present the tiledb database.
authors:
- __merge__: /src/authors/dries_schaumont.yaml
roles: [ author, maintainer ]
argument_groups:
- name: Input database
description: "Open a tileDB-SOMA database by URI or as a local directory."
arguments:
- name: "--input_uri"
type: string
description: "A URI pointing to a TileDB-SOMA database. Mutually exclusive with 'input_dir'"
required: false
example: "s3://bucket/path"
- name: "--input_dir"
type: file
required: false
description: "Path to a TileDB-SOMA database as a local directory"
example: "./tiledb_database"
- name: "--s3_region"
description: |
Region where the TileDB-SOMA database is hosted.
type: string
required: false
- name: "--endpoint"
type: string
description: |
Custom endpoint to use to connect to S3
required: false
- name: "--s3_no_sign_request"
description: |
Do not sign S3 requests. Credentials will not be loaded if this argument is provided.
type: boolean
default: false
- name: "--output_modality"
type: string
description: |
TileDB-SOMA measurement to add the output to.
- name: "--obs_index_name_input"
description: |
Name of the index that is used to describe the cells (observations).
type: string
default: cell_id
- name: "MuData input"
arguments:
- name: "--input_mudata"
type: file
required: true
description: |
MuData object to take the columns from. The observations and their order should
match between the database and the input modality.
- name: "--modality"
required: true
type: string
description: |
Modality where to take the .obs from.
- name: "--obs_input"
type: string
multiple: true
description: |
Columns from .obs to copy. The keys should not be present yet in the database.
- name: "TileDB-SOMA output"
arguments:
- name: "--output_tiledb"
type: file
direction: output
required: False
description: |
Output to a directory instead of adding to the existing database.
resources:
- type: python_script
path: script.py
- path: /src/utils/setup_logger.py
test_resources:
- type: python_script
path: test.py
- path: /resources_test/tiledb/
- path: /resources_test/pbmc_1k_protein_v3/
engines:
- type: docker
image: python:3.12
setup:
- type: python
packages:
- tiledbsoma
- boto3
- awscli
__merge__: /src/base/requirements/anndata_mudata.yaml
test_setup:
- type: python
packages:
- moto[server]
__merge__: [ /src/base/requirements/python_test_setup.yaml, .]
runners:
- type: executable
docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY", "--env", "AWS_DEFAULT_REGION"]
- type: nextflow
directives:
label: [highmem, midcpu]
163 changes: 163 additions & 0 deletions src/tiledb/move_mudata_obs_to_tiledb/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import sys
import mudata
import tiledbsoma
import tiledbsoma.io
import pandas as pd
from pathlib import Path

## VIASH START
par = {
"input_uri": "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms/",
"input_mudata": "./resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu",
"obs_input": ["test_slot"],
"modality": "rna",
"input_dir": None,
"s3_region": "eu-west-3",
"endpoint": None,
"output_tiledb": "./output",
"output_modality": "rna",
"s3_no_sign_request": True,
"obs_index_name_input": "cell_id",
}
meta = {"resources_dir": "src/utils", "name": "move_mudata_obs_to_tiledb"}

test_path = "./mudata_for_testing.h5mu"
test_mudata = mudata.read_h5mu(par["input_mudata"])
test_mudata["rna"].obs["test_slot"] = (
test_mudata["rna"].obs["filter_with_counts"].copy()
)
test_mudata.write(test_path)
par["input_mudata"] = test_path
## VIASH END

sys.path.append(meta["resources_dir"])
from setup_logger import setup_logger

logger = setup_logger()

tiledbsoma.logging.info()


def download_s3_dir(input_uri, output_dir_path):
logger.info("Requested to output to a directory. Downloading database...")
output_dir_path.mkdir(parents=True, exist_ok=True)
import boto3
from awscli.customizations.s3.utils import split_s3_bucket_key

bucket, key = split_s3_bucket_key(input_uri)
connection_args = {
"endpoint_url": par["endpoint"],
"region_name": par["s3_region"],
}
if par["s3_no_sign_request"]:
import botocore

connection_args["config"] = botocore.config.Config(
signature_version=botocore.UNSIGNED
)

client = boto3.resource("s3", **connection_args)
bucket = client.Bucket(bucket)
for i, s3_obj in enumerate(bucket.objects.filter(Prefix=key)):
output_path = output_dir_path / s3_obj.key.removeprefix(key).lstrip("/")
output_path.parent.mkdir(parents=True, exist_ok=True)
bucket.download_file(s3_obj.key, output_path)
print(f"Downloaded {i} files.", file=sys.stdout, flush=True, end="\r")
logger.info("Download completed!")


def main(par):
logger.info(f"Component {meta['name']} started.")
if par["input_uri"]:
par["input_uri"] = par["input_uri"].rstrip("/")
if par["input_uri"] and par["input_dir"]:
raise ValueError("Cannot provide both 'input_uri' and 'input_dir'.")
if not par["input_uri"] and not par["input_dir"]:
raise ValueError("Must provide either 'input_uri' or 'input_dir'")
if not par["obs_input"]:
raise ValueError("Please provide at least one .obs column.")
logger.info(
"Opening mudata file '%s', modality '%s'.", par["input_mudata"], par["modality"]
)
modality_data = mudata.read_h5ad(par["input_mudata"], mod=par["modality"])
logger.info(
"Done reading modality. Looking at .obs for keys: '%s'",
",".join(par["obs_input"]),
)
try:
for obs_key in par["obs_input"]:
modality_data.obs[obs_key]
except KeyError as e:
raise KeyError("Not all .obs keys were found in the input!") from e

logger.info("Done getting .obs keys.")
optional_config = {
"vfs.s3.region": par["s3_region"],
"vfs.s3.endpoint_override": par["endpoint"],
"vfs.s3.no_sign_request": par["s3_no_sign_request"],
}
tiledb_config = {}
for config_setting, config_val in optional_config.items():
if config_val is not None:
tiledb_config[config_setting] = config_val
logger.info("Using the following config to connect to S3: %s", tiledb_config)

if par["output_tiledb"]:
output_dir_path = Path(par["output_tiledb"])
if par["input_dir"]:
import shutil

shutil.copytree(par["input_dir"], output_dir_path, dirs_exist_ok=True)
else:
download_s3_dir(par["input_uri"], output_dir_path)
logger.info("Setting input to '%s'", output_dir_path)
par["input_uri"] = f"file://{output_dir_path.resolve()}"
logger.info("Overwriting TileDB config because S3 connection is not required.")
tiledb_config = {}

logger.info("Trying to access '%s'", par["input_uri"])
logger.info("Fetching .obs")
with tiledbsoma.open(
par["input_uri"],
mode="r",
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
) as open_experiment:
logger.info("Connection established.")
obs_df = open_experiment.obs.read().concat().to_pandas()
logger.info("Done downloading .obs from databse.")
logger.info("Adding obs columns to fetched .obs dataframe.")
overlapping_obs = set(par["obs_input"]).intersection(set(obs_df.columns.to_list()))
if overlapping_obs:
raise ValueError(
f"The following keys already exist in the database: {','.join(overlapping_obs)}."
)

columns_to_add = modality_data.obs[par["obs_input"]]
new_obs = pd.merge(
obs_df,
columns_to_add,
left_on=par["obs_index_name_input"],
right_index=True,
how="right",
)
logger.info(
"Writing obs back to database. Connection to %s with config %s",
par["input_uri"],
tiledb_config,
)
with tiledbsoma.open(
par["input_uri"],
mode="w",
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
) as open_experiment:
tiledbsoma.io.update_obs(
open_experiment,
new_data=new_obs,
default_index_name=par["obs_index_name_input"],
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
)
logger.info("Finished!")


if __name__ == "__main__":
main(par)
Loading
Loading