diff --git a/examples/virtual-rechunk/Dockerfile_virtualizarr b/examples/virtual-rechunk/Dockerfile_virtualizarr new file mode 100644 index 000000000..d1793c6a7 --- /dev/null +++ b/examples/virtual-rechunk/Dockerfile_virtualizarr @@ -0,0 +1,59 @@ +# Python 3.11 +FROM python:3.11-slim-buster + + +RUN apt-get update \ + # Install aws-lambda-cpp build dependencies + && apt-get install -y \ + g++ \ + make \ + cmake \ + unzip \ + # cleanup package lists, they are not used anymore in this image + && rm -rf /var/lib/apt/lists/* \ + && apt-cache search linux-headers-generic + +ARG FUNCTION_DIR="/function" + +# Copy function code +RUN mkdir -p ${FUNCTION_DIR} + +# Update pip +# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648 +# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py +# due to s3fs dependency +RUN pip install --upgrade --ignore-installed pip wheel six setuptools \ + && pip install --upgrade --no-cache-dir --ignore-installed \ + awslambdaric \ + botocore==1.29.76 \ + boto3==1.26.76 \ + redis \ + httplib2 \ + requests \ + numpy \ + scipy \ + pandas \ + pika \ + kafka-python \ + cloudpickle \ + ps-mem \ + tblib + +# Set working directory to function root directory +WORKDIR ${FUNCTION_DIR} + +# Add Lithops +COPY lithops_lambda.zip ${FUNCTION_DIR} +RUN unzip lithops_lambda.zip \ + && rm lithops_lambda.zip \ + && mkdir handler \ + && touch handler/__init__.py \ + && mv entry_point.py handler/ + +# Put your dependencies here, using RUN pip install... or RUN apt install... + +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] +CMD [ "handler.entry_point.lambda_handler" ] diff --git a/examples/virtual-rechunk/README.md b/examples/virtual-rechunk/README.md new file mode 100644 index 000000000..15cc954ed --- /dev/null +++ b/examples/virtual-rechunk/README.md @@ -0,0 +1,55 @@ +# Rechunk a virtual dataset + +This example demonstrates how to rechunk a collection of necdf files on s3 into a single zarr store. + +Most rechunking workflows can be conceptualized in two steps, +which typically provides greater flexibility than combining them. +The first (staging) step is mostly embarassingly parallel and prepares the input data. +In this example, we construct a virtual zarr dataset using `lithops`, +but we could incorporate data transfer and reprocessing as part of staging. + +The second (rechunking) step rechunks the staged data. +Here, we rechuck the virtual zarr using `cubed`, +but in theory, `dask` or other map-reduce frameworks may be used. + + +## Credits +Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook +by norlandrhagen. + +Please, contribute improvements. + + +1. Set up a Python environment +```bash +conda create --name virtualizarr-rechunk -y python=3.11 +conda activate virtualizarr-rechunk +pip install -r requirements.txt +``` + +2. Set up cubed executor for [lithops-aws](https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md) by editing `./lithops.yaml` with your `bucket` and `execution_role`. + +3. Build a runtime image for `cubed` +```bash +export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml +export CUBED_CONFIG=$(pwd)/cubed.yaml +# create a bucket for storing results +export BUCKET_URL=s3://wma-uncertainty/scratch +lithops runtime build -b aws_lambda -f Dockerfile_virtualizarr virtualizarr-runtime +``` + +4. Stage the virtual zarr using `lithops` +```bash +python stage-virtual-zarr.py +``` + +5. Rechunk the virtual zarr with `cubed` (using `lithops`) +```bash +python rechunk-virtual-zarr.py +``` + +## Cleaning up +To rebuild the `lithops` image, delete the existing one by running +```bash +lithops runtime delete -b aws_lambda -d virtualizarr-runtime +``` diff --git a/examples/virtual-rechunk/cubed.yaml b/examples/virtual-rechunk/cubed.yaml new file mode 100644 index 000000000..b4d2173c0 --- /dev/null +++ b/examples/virtual-rechunk/cubed.yaml @@ -0,0 +1,7 @@ +spec: + work_dir: "s3://cubed-$USER-temp" + allowed_mem: "2GB" + executor_name: "lithops" + executor_options: + runtime: "virtualizarr-runtime" + runtime_memory: 2000 diff --git a/examples/virtual-rechunk/lithops.yaml b/examples/virtual-rechunk/lithops.yaml new file mode 100644 index 000000000..3740ac605 --- /dev/null +++ b/examples/virtual-rechunk/lithops.yaml @@ -0,0 +1,14 @@ +lithops: + backend: aws_lambda + storage: aws_s3 + +aws: + region: us-west-2 + +aws_lambda: + execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole + runtime: virtualizarr-runtime + runtime_memory: 2000 + +aws_s3: + storage_bucket: cubed-thodson-temp diff --git a/examples/virtual-rechunk/rechunk-virtual-zarr.py b/examples/virtual-rechunk/rechunk-virtual-zarr.py new file mode 100644 index 000000000..7c4cbee25 --- /dev/null +++ b/examples/virtual-rechunk/rechunk-virtual-zarr.py @@ -0,0 +1,35 @@ +# Rechunk a virtual zarr on s3 into a single zarr store using xarray-cubed. +# +# Prior to running this script, create the virtual zarr with +# > python create-virtualzarr.py +# +# NOTE: In jupyter, open_dataset seems to cache the json, such that changes +# aren't propogated until the kernel is restarted. + +import os +import xarray as xr + + +bucket_url = os.getenv("BUCKET_URL") + +combined_ds = xr.open_dataset( + f"{bucket_url}/combined.json", # location must be accessible to workers + engine="kerchunk", + chunks={}, + chunked_array_type="cubed", +) + +combined_ds['Time'].attrs = {} # otherwise to_zarr complains about attrs + +rechunked_ds = combined_ds.chunk( + chunks={'Time': 5, 'south_north': 25, 'west_east': 32}, + chunked_array_type="cubed", +) + +rechunked_ds.to_zarr( + f"{bucket_url}/rechunked.zarr", + mode="w", + encoding={}, # TODO + consolidated=True, + safe_chunks=False, +) diff --git a/examples/virtual-rechunk/requirements.txt b/examples/virtual-rechunk/requirements.txt new file mode 100644 index 000000000..368494255 --- /dev/null +++ b/examples/virtual-rechunk/requirements.txt @@ -0,0 +1,11 @@ +boto3 +cftime +cubed +cubed-xarray +h5netcdf +h5py +kerchunk +lithops +s3fs +virtualizarr +xarray diff --git a/examples/virtual-rechunk/stage-virtual-zarr.py b/examples/virtual-rechunk/stage-virtual-zarr.py new file mode 100755 index 000000000..4af14effa --- /dev/null +++ b/examples/virtual-rechunk/stage-virtual-zarr.py @@ -0,0 +1,63 @@ +# Use lithops to construct a virtual zarr from netcdf files on s3. + +import fsspec +import lithops +import os +import xarray as xr + +from virtualizarr import open_virtual_dataset + +bucket_url = os.getenv("BUCKET_URL") + +fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True) +files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*") +file_pattern = sorted(["s3://" + f for f in files_paths]) + +# Truncate file_pattern while debugging +file_pattern = file_pattern[:4] + +print(f"{len(file_pattern)} file paths were retrieved.") + + +def map_references(fil): + """ Map function to open virtual datasets. + """ + vds = open_virtual_dataset( + fil, + indexes={}, + loadable_variables=['Time'], + cftime_variables=['Time'], + ) + return vds + + +def reduce_references(results): + """ Reduce to concat virtual datasets. + """ + combined_vds = xr.combine_nested( + results, + concat_dim=["Time"], + coords="minimal", + compat="override", + ) + + return combined_vds + + +fexec = lithops.FunctionExecutor(config_file="lithops.yaml") + +futures = fexec.map_reduce( + map_references, + file_pattern, + reduce_references, + spawn_reducer=100, +) + +ds = futures.get_result() + +# Save the virtual zarr manifest +ds.virtualize.to_kerchunk(f"combined.json", format="json") + +# Upload manifest to s3 +fs_write = fsspec.filesystem("s3", anon=False, skip_instance_cache=True) +fs_write.put("combined.json", f"{bucket_url}/combined.json")