-
Notifications
You must be signed in to change notification settings - Fork 17
WIP: Add virtual-rechunk example #520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b7a3313
56c4bf8
351761b
b35f01e
ee0dfcf
7d9d654
63dca1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# Python 3.11 | ||
FROM python:3.11-slim-buster | ||
|
||
|
||
RUN apt-get update \ | ||
# Install aws-lambda-cpp build dependencies | ||
&& apt-get install -y \ | ||
g++ \ | ||
make \ | ||
cmake \ | ||
unzip \ | ||
# cleanup package lists, they are not used anymore in this image | ||
&& rm -rf /var/lib/apt/lists/* \ | ||
&& apt-cache search linux-headers-generic | ||
|
||
ARG FUNCTION_DIR="/function" | ||
|
||
# Copy function code | ||
RUN mkdir -p ${FUNCTION_DIR} | ||
|
||
# Update pip | ||
# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648 | ||
# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py | ||
# due to s3fs dependency | ||
RUN pip install --upgrade --ignore-installed pip wheel six setuptools \ | ||
&& pip install --upgrade --no-cache-dir --ignore-installed \ | ||
awslambdaric \ | ||
botocore==1.29.76 \ | ||
boto3==1.26.76 \ | ||
redis \ | ||
httplib2 \ | ||
requests \ | ||
numpy \ | ||
scipy \ | ||
pandas \ | ||
pika \ | ||
kafka-python \ | ||
cloudpickle \ | ||
ps-mem \ | ||
tblib | ||
|
||
# Set working directory to function root directory | ||
WORKDIR ${FUNCTION_DIR} | ||
|
||
# Add Lithops | ||
COPY lithops_lambda.zip ${FUNCTION_DIR} | ||
RUN unzip lithops_lambda.zip \ | ||
&& rm lithops_lambda.zip \ | ||
&& mkdir handler \ | ||
&& touch handler/__init__.py \ | ||
&& mv entry_point.py handler/ | ||
|
||
# Put your dependencies here, using RUN pip install... or RUN apt install... | ||
|
||
COPY requirements.txt requirements.txt | ||
RUN pip install --no-cache-dir -r requirements.txt | ||
|
||
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] | ||
CMD [ "handler.entry_point.lambda_handler" ] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Rechunk a virtual dataset | ||
|
||
This example demonstrates how to rechunk a collection of necdf files on s3 into a single zarr store. | ||
|
||
Most rechunking workflows can be conceptualized in two steps, | ||
which typically provides greater flexibility than combining them. | ||
The first (staging) step is mostly embarassingly parallel and prepares the input data. | ||
In this example, we construct a virtual zarr dataset using `lithops`, | ||
but we could incorporate data transfer and reprocessing as part of staging. | ||
|
||
The second (rechunking) step rechunks the staged data. | ||
Here, we rechuck the virtual zarr using `cubed`, | ||
but in theory, `dask` or other map-reduce frameworks may be used. | ||
|
||
|
||
## Credits | ||
Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook | ||
by norlandrhagen. | ||
|
||
Please, contribute improvements. | ||
|
||
|
||
1. Set up a Python environment | ||
```bash | ||
conda create --name virtualizarr-rechunk -y python=3.11 | ||
conda activate virtualizarr-rechunk | ||
pip install -r requirements.txt | ||
``` | ||
|
||
2. Set up cubed executor for [lithops-aws](https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md) by editing `./lithops.yaml` with your `bucket` and `execution_role`. | ||
|
||
3. Build a runtime image for `cubed` | ||
```bash | ||
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml | ||
export CUBED_CONFIG=$(pwd)/cubed.yaml | ||
# create a bucket for storing results | ||
export BUCKET_URL=s3://wma-uncertainty/scratch | ||
lithops runtime build -b aws_lambda -f Dockerfile_virtualizarr virtualizarr-runtime | ||
``` | ||
|
||
4. Stage the virtual zarr using `lithops` | ||
```bash | ||
python stage-virtual-zarr.py | ||
``` | ||
|
||
5. Rechunk the virtual zarr with `cubed` (using `lithops`) | ||
```bash | ||
python rechunk-virtual-zarr.py | ||
``` | ||
|
||
## Cleaning up | ||
To rebuild the `lithops` image, delete the existing one by running | ||
```bash | ||
lithops runtime delete -b aws_lambda -d virtualizarr-runtime | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
spec: | ||
work_dir: "s3://cubed-$USER-temp" | ||
allowed_mem: "2GB" | ||
executor_name: "lithops" | ||
executor_options: | ||
runtime: "virtualizarr-runtime" | ||
runtime_memory: 2000 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
lithops: | ||
backend: aws_lambda | ||
storage: aws_s3 | ||
|
||
aws: | ||
region: us-west-2 | ||
|
||
aws_lambda: | ||
execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole | ||
runtime: virtualizarr-runtime | ||
runtime_memory: 2000 | ||
|
||
aws_s3: | ||
storage_bucket: cubed-thodson-temp |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# Rechunk a virtual zarr on s3 into a single zarr store using xarray-cubed. | ||
# | ||
# Prior to running this script, create the virtual zarr with | ||
# > python create-virtualzarr.py | ||
# | ||
# NOTE: In jupyter, open_dataset seems to cache the json, such that changes | ||
# aren't propogated until the kernel is restarted. | ||
|
||
import os | ||
import xarray as xr | ||
|
||
|
||
bucket_url = os.getenv("BUCKET_URL") | ||
|
||
combined_ds = xr.open_dataset( | ||
f"{bucket_url}/combined.json", # location must be accessible to workers | ||
engine="kerchunk", | ||
chunks={}, | ||
chunked_array_type="cubed", | ||
) | ||
|
||
combined_ds['Time'].attrs = {} # otherwise to_zarr complains about attrs | ||
|
||
rechunked_ds = combined_ds.chunk( | ||
chunks={'Time': 5, 'south_north': 25, 'west_east': 32}, | ||
chunked_array_type="cubed", | ||
) | ||
|
||
rechunked_ds.to_zarr( | ||
f"{bucket_url}/rechunked.zarr", | ||
mode="w", | ||
encoding={}, # TODO | ||
consolidated=True, | ||
safe_chunks=False, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
boto3 | ||
cftime | ||
cubed | ||
cubed-xarray | ||
h5netcdf | ||
h5py | ||
kerchunk | ||
lithops | ||
s3fs | ||
virtualizarr | ||
xarray |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Use lithops to construct a virtual zarr from netcdf files on s3. | ||
|
||
import fsspec | ||
import lithops | ||
import os | ||
import xarray as xr | ||
|
||
from virtualizarr import open_virtual_dataset | ||
|
||
bucket_url = os.getenv("BUCKET_URL") | ||
|
||
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True) | ||
files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*") | ||
file_pattern = sorted(["s3://" + f for f in files_paths]) | ||
|
||
# Truncate file_pattern while debugging | ||
file_pattern = file_pattern[:4] | ||
|
||
print(f"{len(file_pattern)} file paths were retrieved.") | ||
|
||
|
||
def map_references(fil): | ||
""" Map function to open virtual datasets. | ||
""" | ||
vds = open_virtual_dataset( | ||
fil, | ||
indexes={}, | ||
loadable_variables=['Time'], | ||
cftime_variables=['Time'], | ||
) | ||
return vds | ||
|
||
|
||
def reduce_references(results): | ||
""" Reduce to concat virtual datasets. | ||
""" | ||
combined_vds = xr.combine_nested( | ||
results, | ||
concat_dim=["Time"], | ||
coords="minimal", | ||
compat="override", | ||
) | ||
|
||
return combined_vds | ||
|
||
|
||
fexec = lithops.FunctionExecutor(config_file="lithops.yaml") | ||
|
||
futures = fexec.map_reduce( | ||
map_references, | ||
file_pattern, | ||
reduce_references, | ||
spawn_reducer=100, | ||
) | ||
|
||
ds = futures.get_result() | ||
|
||
# Save the virtual zarr manifest | ||
ds.virtualize.to_kerchunk(f"combined.json", format="json") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, might be worth trying parquet or icechunk if you're willing into venture into the bleeding edge. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Second using icechunk instead of kerchunk at this point. |
||
|
||
# Upload manifest to s3 | ||
fs_write = fsspec.filesystem("s3", anon=False, skip_instance_cache=True) | ||
fs_write.put("combined.json", f"{bucket_url}/combined.json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be able to write the ref directly to s3, if not, we should figure out why!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ty!