Skip to content

Add experimental vcf_to_parquet function #1083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ zarr >= 2.10.0, != 2.11.0, != 2.11.1, != 2.11.2
fsspec != 2021.6.*
scikit-learn
pandas
pyarrow
136 changes: 134 additions & 2 deletions sgkit/io/vcf/vcf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import dask
import fsspec
import numpy as np
import pandas as pd
import xarray as xr
import zarr
from cyvcf2 import VCF, Variant
Expand Down Expand Up @@ -97,9 +98,9 @@ class MaxAltAllelesExceededWarning(UserWarning):


@contextmanager
def open_vcf(path: PathType) -> Iterator[VCF]:
def open_vcf(path: PathType, **kwargs) -> Iterator[VCF]:
"""A context manager for opening a VCF file."""
vcf = VCF(path)
vcf = VCF(path, **kwargs)
try:
yield vcf
finally:
Expand Down Expand Up @@ -1363,3 +1364,134 @@ def zip_input_and_regions(
assert len(inputs) == len(input_regions)

return zip(inputs, input_regions)


def vcf_to_parquet(
input: Union[PathType, Sequence[PathType]],
output: PathType,
*,
regions: Union[None, Sequence[str], Sequence[Optional[Sequence[str]]]] = None,
target_part_size: Union[None, int, str] = "auto",
fields: Optional[Sequence[str]] = None,
exclude_fields: Optional[Sequence[str]] = None,
) -> None:
sequential_function = functools.partial(
vcf_to_parquet_sequential,
output=output,
fields=fields,
exclude_fields=exclude_fields,
)
parallel_function = functools.partial(
vcf_to_parquet_parallel,
output=output,
fields=fields,
exclude_fields=exclude_fields,
)
process_vcfs(
input,
sequential_function,
parallel_function,
regions=regions,
target_part_size=target_part_size,
)


def vcf_to_parquet_sequential(
input: PathType,
output: PathType,
region: Optional[str] = None,
fields: Optional[Sequence[str]] = None,
exclude_fields: Optional[Sequence[str]] = None,
) -> None:
# Use lazy=True so cyvcf2 doesn't unpack format fields
with open_vcf(input, lazy=True) as vcf:
if region is None:
variants = vcf
else:
variants = vcf(region)

fields = fields or ["INFO/*"] # default to all INFO fields
fields = _normalize_fields(vcf, fields)
exclude_fields = exclude_fields or []
exclude_fields = _normalize_fields(vcf, exclude_fields)
fields = [f for f in fields if f not in exclude_fields]
if any([field.startswith("FORMAT/") for field in fields]):
raise ValueError("FORMAT fields are not supported in `vcf_to_parquet`")
vcf_field_defs = _get_vcf_field_defs(vcf, "INFO")
info_keys = [field[len("INFO/") :] for field in fields]

chrom = []
pos = []
id = []
ref = []
alt = []
qual = []
filter = []

infos = {}
for key in info_keys:
infos[key] = []

for variant in region_filter(variants, region):
chrom.append(variant.CHROM)
pos.append(variant.POS)
id.append(variant.ID)
ref.append(variant.REF)
alt.append(variant.ALT)
qual.append(variant.QUAL)
filter.append(variant.FILTERS)

info = dict(variant.INFO)
for key in info_keys:
value = info.get(key)

# ensure Number=. is always a tuple
if vcf_field_defs[key]["Number"] == ".":
if value is not None and not isinstance(value, tuple):
value = (value,)

infos[key].append(value)

d = {
"CHROM": chrom,
"POS": pos,
"ID": id,
"REF": ref,
"ALT": alt,
"QUAL": qual,
"FILTER": filter,
}
d.update(infos)
df = pd.DataFrame(d)
df.to_parquet(output)


def vcf_to_parquet_parallel(
input: Union[PathType, Sequence[PathType]],
output: PathType,
regions: Union[None, Sequence[str], Sequence[Optional[Sequence[str]]]],
fields: Optional[Sequence[str]] = None,
exclude_fields: Optional[Sequence[str]] = None,
) -> Dict[str, Any]:
if isinstance(output, str):
Path(output).mkdir(parents=True, exist_ok=True)
else:
output.mkdir(parents=True, exist_ok=True)

tasks = []
for input, input_region_list in zip_input_and_regions(input, regions):
if input_region_list is None:
# single partition case: make a list so the loop below works
input_region_list = [None] # type: ignore
for r, region in enumerate(input_region_list):
part_url = build_url(str(output), f"part-{r}.parquet")
output_part = part_url
task = dask.delayed(vcf_to_parquet_sequential)(
input,
output=output_part,
region=region,
fields=fields,
exclude_fields=exclude_fields,
)
tasks.append(task)
dask.compute(*tasks)
33 changes: 33 additions & 0 deletions sgkit/tests/io/vcf/test_vcf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import MutableMapping

import numpy as np
import pandas as pd
import pytest
import xarray as xr
import zarr
Expand All @@ -22,6 +23,7 @@
from sgkit.io.vcf.vcf_reader import (
FloatFormatFieldWarning,
merge_zarr_array_sizes,
vcf_to_parquet,
zarr_array_sizes,
)
from sgkit.model import get_contigs, get_filters, num_contigs
Expand Down Expand Up @@ -1679,3 +1681,34 @@ def test_vcf_to_zarr__no_samples(shared_datadir, tmp_path):
assert_array_equal(ds["sample_id"], [])
assert_array_equal(ds["contig_id"], ["1"])
assert ds.dims["variants"] == 973


def test_vcf_to_parquet(shared_datadir, tmp_path):
path = path_for_test(shared_datadir, "sample.vcf.gz")
output = tmp_path.joinpath("vcf.parquet").as_posix()
vcf_to_parquet(path, output)

df = pd.read_parquet(output)
assert len(df) == 9
assert_array_equal(df[df["ID"] == "rs6040355"]["ALT"].tolist(), [["G", "T"]])


def test_vcf_to_parquet__parallel(shared_datadir, tmp_path):
path = path_for_test(shared_datadir, "CEUTrio.20.21.gatk3.4.g.vcf.bgz")
output = tmp_path.joinpath("vcf.parquet").as_posix()
regions = ["20", "21"]
vcf_to_parquet(path, output, regions=regions)

df = pd.read_parquet(output)
assert len(df) == 19910


def test_vcf_to_parquet__fields_errors(shared_datadir, tmp_path):
path = path_for_test(shared_datadir, "sample.vcf.gz")
output = tmp_path.joinpath("vcf.parquet").as_posix()

with pytest.raises(
ValueError,
match=r"FORMAT fields are not supported in `vcf_to_parquet`",
):
vcf_to_parquet(path, output, fields=["FORMAT/DP"])