Skip to content

Add rechunk ERA5 benchmark #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions tests/benchmarks/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@
import xarray as xr

import cubed
import cubed as xp
import cubed.random
from cubed.core.optimization import multiple_inputs_optimize_dag, simple_optimize_dag
from cubed.diagnostics.rich import RichProgressBar

from ..utils import run

def test_measure_reserved_mem(runtime):
spec = runtime

if spec.executor.name in ("single-threaded", "threads"):
pytest.skip(f"Don't measure reserved memory on {spec.executor.name}")

reserved_memory = cubed.measure_reserved_mem(executor=spec.executor, work_dir=spec.work_dir)
print("reserved memory", reserved_memory)


@pytest.mark.parametrize("optimizer", ["new-optimizer"])
@pytest.mark.parametrize("t_length", [50, 500, 5000])
Expand Down Expand Up @@ -90,3 +100,47 @@ def test_quadratic_means_xarray(tmp_path, runtime, benchmark_all, optimizer, t_l
fs.rm(path, recursive=True)
except FileNotFoundError:
pass


@pytest.mark.skip(reason="Skipping due to large computation")
def test_rechunk_era5(tmp_path, runtime, benchmark_all):
spec = runtime

if spec.executor.name in ("single-threaded", "threads"):
pytest.skip(f"Don't run large computation on {spec.executor.name}")

# from https://github.com/pangeo-data/rechunker/pull/89
shape = (350640, 721, 1440)
source_chunks = (31, 721, 1440)
target_chunks = (350640, 10, 10)

# set the random seed to ensure deterministic results
random.seed(42)

# create zarr test data (not timed)
a = cubed.random.random(shape, dtype=xp.float32, chunks=source_chunks, spec=spec)
path = f"{spec.work_dir}/a.zarr"
cubed.store([a], [path], compute_arrays_in_parallel=True, callbacks=[RichProgressBar()])

a = cubed.from_zarr(path, spec=spec)
result = a.rechunk(chunks=target_chunks, use_new_impl=True)

result.visualize(filename=tmp_path / f"rechunk_era5")

try:
# time only the computing of the result
run(
result,
executor=spec.executor,
benchmarks=benchmark_all,
callbacks=[RichProgressBar()],
_return_in_memory_array=False, # don't load result into memory!
)
finally:
# delete zarr intermediate data (not timed)
try:
work_dir = spec.work_dir
fs, _, _ = fsspec.get_fs_token_paths(work_dir)
fs.rm(work_dir, recursive=True)
except FileNotFoundError:
pass
7 changes: 7 additions & 0 deletions tests/configs/lithops_aws_dev.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spec:
work_dir: "s3://cubed-tom-temp"
allowed_mem: "3.5GB"
executor_name: "lithops"
executor_options:
runtime: "cubed-runtime-dev"
runtime_memory: 3500