Skip to content

Commit 5e15343

Browse files
scvi_leiden: add support for writing to a tileDB database. (#1094)
1 parent 06ce282 commit 5e15343

File tree

13 files changed

+841
-36
lines changed

13 files changed

+841
-36
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
* Added `from_tiledb_to_h5mu` component (PR #1068).
2626

27+
* `workflows/integration/scvi_leiden`: add support for adding the output slots to a tileDB-SOMA database (PR 1094).
28+
2729
## MAJOR CHANGES
2830

2931
* `mapping/samtools_sort` has been deprecated and will be removed in openpipeline 4.0. Use [vsh://biobox/samtools/samtools_sort](https://www.viash-hub.com/packages/biobox/latest/components/samtools/samtools_sort) instead.

src/convert/from_tiledb_to_h5mu/config.vsh.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ argument_groups:
2323
description: |
2424
Custom endpoint to use to connect to S3
2525
required: false
26+
- name: "--s3_no_sign_request"
27+
description: |
28+
Do not sign S3 requests. Credentials will not be loaded if this argument is provided.
29+
type: boolean
30+
default: false
2631
- name: "--input_modality"
2732
required: true
2833
type: string

src/convert/from_tiledb_to_h5mu/script.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ def _log_arguments(function_obj, arg_dict):
4545

4646
def main(par):
4747
logger.info("Component %s started", meta["name"])
48-
tiledb_config = {
49-
"vfs.s3.no_sign_request": "false",
50-
}
48+
tiledb_config = {}
5149
optional_config = {
50+
"vfs.s3.no_sign_request": par["s3_no_sign_request"],
5251
"vfs.s3.region": par["s3_region"],
5352
"vfs.s3.endpoint_override": par["endpoint"],
5453
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
name: move_mudata_obs_to_tiledb
2+
namespace: tiledb
3+
scope: "private"
4+
description: |
5+
Move .obs columns from a MuData modality to an existing tileDB database.
6+
The .obs keys should not exist in the database yet; and the observations from the modality and
7+
their order should match with what is already present the tiledb database.
8+
authors:
9+
- __merge__: /src/authors/dries_schaumont.yaml
10+
roles: [ author, maintainer ]
11+
argument_groups:
12+
- name: Input database
13+
description: "Open a tileDB-SOMA database by URI or as a local directory."
14+
arguments:
15+
- name: "--input_uri"
16+
type: string
17+
description: "A URI pointing to a TileDB-SOMA database. Mutually exclusive with 'input_dir'"
18+
required: false
19+
example: "s3://bucket/path"
20+
- name: "--input_dir"
21+
type: file
22+
required: false
23+
description: "Path to a TileDB-SOMA database as a local directory"
24+
example: "./tiledb_database"
25+
- name: "--s3_region"
26+
description: |
27+
Region where the TileDB-SOMA database is hosted.
28+
type: string
29+
required: false
30+
- name: "--endpoint"
31+
type: string
32+
description: |
33+
Custom endpoint to use to connect to S3
34+
required: false
35+
- name: "--s3_no_sign_request"
36+
description: |
37+
Do not sign S3 requests. Credentials will not be loaded if this argument is provided.
38+
type: boolean
39+
default: false
40+
- name: "--output_modality"
41+
type: string
42+
description: |
43+
TileDB-SOMA measurement to add the output to.
44+
- name: "--obs_index_name_input"
45+
description: |
46+
Name of the index that is used to describe the cells (observations).
47+
type: string
48+
default: cell_id
49+
- name: "MuData input"
50+
arguments:
51+
- name: "--input_mudata"
52+
type: file
53+
required: true
54+
description: |
55+
MuData object to take the columns from. The observations and their order should
56+
match between the database and the input modality.
57+
- name: "--modality"
58+
required: true
59+
type: string
60+
description: |
61+
Modality where to take the .obs from.
62+
- name: "--obs_input"
63+
type: string
64+
multiple: true
65+
description: |
66+
Columns from .obs to copy. The keys should not be present yet in the database.
67+
- name: "TileDB-SOMA output"
68+
arguments:
69+
- name: "--output_tiledb"
70+
type: file
71+
direction: output
72+
required: False
73+
description: |
74+
Output to a directory instead of adding to the existing database.
75+
resources:
76+
- type: python_script
77+
path: script.py
78+
- path: /src/utils/setup_logger.py
79+
test_resources:
80+
- type: python_script
81+
path: test.py
82+
- path: /resources_test/tiledb/
83+
- path: /resources_test/pbmc_1k_protein_v3/
84+
engines:
85+
- type: docker
86+
image: python:3.12
87+
setup:
88+
- type: python
89+
packages:
90+
- tiledbsoma
91+
- boto3
92+
- awscli
93+
__merge__: /src/base/requirements/anndata_mudata.yaml
94+
test_setup:
95+
- type: python
96+
packages:
97+
- moto[server]
98+
__merge__: [ /src/base/requirements/python_test_setup.yaml, .]
99+
runners:
100+
- type: executable
101+
docker_run_args: ["--env", "AWS_ACCESS_KEY_ID", "--env", "AWS_SECRET_ACCESS_KEY", "--env", "AWS_DEFAULT_REGION"]
102+
- type: nextflow
103+
directives:
104+
label: [highmem, midcpu]
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import sys
2+
import mudata
3+
import tiledbsoma
4+
import tiledbsoma.io
5+
import pandas as pd
6+
from pathlib import Path
7+
8+
## VIASH START
9+
par = {
10+
"input_uri": "s3://openpipelines-data/tiledb/pbmc_1k_protein_v3_mms/",
11+
"input_mudata": "./resources_test/pbmc_1k_protein_v3/pbmc_1k_protein_v3_mms.h5mu",
12+
"obs_input": ["test_slot"],
13+
"modality": "rna",
14+
"input_dir": None,
15+
"s3_region": "eu-west-3",
16+
"endpoint": None,
17+
"output_tiledb": "./output",
18+
"output_modality": "rna",
19+
"s3_no_sign_request": True,
20+
"obs_index_name_input": "cell_id",
21+
}
22+
meta = {"resources_dir": "src/utils", "name": "move_mudata_obs_to_tiledb"}
23+
24+
test_path = "./mudata_for_testing.h5mu"
25+
test_mudata = mudata.read_h5mu(par["input_mudata"])
26+
test_mudata["rna"].obs["test_slot"] = (
27+
test_mudata["rna"].obs["filter_with_counts"].copy()
28+
)
29+
test_mudata.write(test_path)
30+
par["input_mudata"] = test_path
31+
## VIASH END
32+
33+
sys.path.append(meta["resources_dir"])
34+
from setup_logger import setup_logger
35+
36+
logger = setup_logger()
37+
38+
tiledbsoma.logging.info()
39+
40+
41+
def download_s3_dir(input_uri, output_dir_path):
42+
logger.info("Requested to output to a directory. Downloading database...")
43+
output_dir_path.mkdir(parents=True, exist_ok=True)
44+
import boto3
45+
from awscli.customizations.s3.utils import split_s3_bucket_key
46+
47+
bucket, key = split_s3_bucket_key(input_uri)
48+
connection_args = {
49+
"endpoint_url": par["endpoint"],
50+
"region_name": par["s3_region"],
51+
}
52+
if par["s3_no_sign_request"]:
53+
import botocore
54+
55+
connection_args["config"] = botocore.config.Config(
56+
signature_version=botocore.UNSIGNED
57+
)
58+
59+
client = boto3.resource("s3", **connection_args)
60+
bucket = client.Bucket(bucket)
61+
for i, s3_obj in enumerate(bucket.objects.filter(Prefix=key)):
62+
output_path = output_dir_path / s3_obj.key.removeprefix(key).lstrip("/")
63+
output_path.parent.mkdir(parents=True, exist_ok=True)
64+
bucket.download_file(s3_obj.key, output_path)
65+
print(f"Downloaded {i} files.", file=sys.stdout, flush=True, end="\r")
66+
logger.info("Download completed!")
67+
68+
69+
def main(par):
70+
logger.info(f"Component {meta['name']} started.")
71+
if par["input_uri"]:
72+
par["input_uri"] = par["input_uri"].rstrip("/")
73+
if par["input_uri"] and par["input_dir"]:
74+
raise ValueError("Cannot provide both 'input_uri' and 'input_dir'.")
75+
if not par["input_uri"] and not par["input_dir"]:
76+
raise ValueError("Must provide either 'input_uri' or 'input_dir'")
77+
if not par["obs_input"]:
78+
raise ValueError("Please provide at least one .obs column.")
79+
logger.info(
80+
"Opening mudata file '%s', modality '%s'.", par["input_mudata"], par["modality"]
81+
)
82+
modality_data = mudata.read_h5ad(par["input_mudata"], mod=par["modality"])
83+
logger.info(
84+
"Done reading modality. Looking at .obs for keys: '%s'",
85+
",".join(par["obs_input"]),
86+
)
87+
try:
88+
for obs_key in par["obs_input"]:
89+
modality_data.obs[obs_key]
90+
except KeyError as e:
91+
raise KeyError("Not all .obs keys were found in the input!") from e
92+
93+
logger.info("Done getting .obs keys.")
94+
optional_config = {
95+
"vfs.s3.region": par["s3_region"],
96+
"vfs.s3.endpoint_override": par["endpoint"],
97+
"vfs.s3.no_sign_request": par["s3_no_sign_request"],
98+
}
99+
tiledb_config = {}
100+
for config_setting, config_val in optional_config.items():
101+
if config_val is not None:
102+
tiledb_config[config_setting] = config_val
103+
logger.info("Using the following config to connect to S3: %s", tiledb_config)
104+
105+
if par["output_tiledb"]:
106+
output_dir_path = Path(par["output_tiledb"])
107+
if par["input_dir"]:
108+
import shutil
109+
110+
shutil.copytree(par["input_dir"], output_dir_path, dirs_exist_ok=True)
111+
else:
112+
download_s3_dir(par["input_uri"], output_dir_path)
113+
logger.info("Setting input to '%s'", output_dir_path)
114+
par["input_uri"] = f"file://{output_dir_path.resolve()}"
115+
logger.info("Overwriting TileDB config because S3 connection is not required.")
116+
tiledb_config = {}
117+
118+
logger.info("Trying to access '%s'", par["input_uri"])
119+
logger.info("Fetching .obs")
120+
with tiledbsoma.open(
121+
par["input_uri"],
122+
mode="r",
123+
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
124+
) as open_experiment:
125+
logger.info("Connection established.")
126+
obs_df = open_experiment.obs.read().concat().to_pandas()
127+
logger.info("Done downloading .obs from databse.")
128+
logger.info("Adding obs columns to fetched .obs dataframe.")
129+
overlapping_obs = set(par["obs_input"]).intersection(set(obs_df.columns.to_list()))
130+
if overlapping_obs:
131+
raise ValueError(
132+
f"The following keys already exist in the database: {','.join(overlapping_obs)}."
133+
)
134+
135+
columns_to_add = modality_data.obs[par["obs_input"]]
136+
new_obs = pd.merge(
137+
obs_df,
138+
columns_to_add,
139+
left_on=par["obs_index_name_input"],
140+
right_index=True,
141+
how="right",
142+
)
143+
logger.info(
144+
"Writing obs back to database. Connection to %s with config %s",
145+
par["input_uri"],
146+
tiledb_config,
147+
)
148+
with tiledbsoma.open(
149+
par["input_uri"],
150+
mode="w",
151+
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
152+
) as open_experiment:
153+
tiledbsoma.io.update_obs(
154+
open_experiment,
155+
new_data=new_obs,
156+
default_index_name=par["obs_index_name_input"],
157+
context=tiledbsoma.SOMATileDBContext(tiledb_config=tiledb_config),
158+
)
159+
logger.info("Finished!")
160+
161+
162+
if __name__ == "__main__":
163+
main(par)

0 commit comments

Comments
 (0)