-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathconftest.py
407 lines (334 loc) · 12.4 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""Pytest configuration and fixtures for virtualizarr tests."""
# Standard library imports
import itertools
from pathlib import Path
from typing import Any, Callable, Mapping, Optional
# Third-party imports
import h5py # type: ignore[import]
import numpy as np
import pytest
import xarray as xr
from xarray.core.variable import Variable
# Local imports
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.manifests.manifest import join
from virtualizarr.manifests.utils import create_v3_array_metadata
from virtualizarr.utils import ceildiv
# Pytest configuration
def pytest_addoption(parser):
"""Add command-line flags for pytest."""
parser.addoption(
"--run-network-tests",
action="store_true",
help="runs tests requiring a network connection",
)
parser.addoption(
"--run-minio-tests",
action="store_true",
help="runs tests requiring docker and minio",
)
def pytest_runtest_setup(item):
"""Skip network tests unless explicitly enabled."""
if "network" in item.keywords and not item.config.getoption("--run-network-tests"):
pytest.skip(
"set --run-network-tests to run tests requiring an internet connection"
)
if "minio" in item.keywords and not item.config.getoption("--run-minio-tests"):
pytest.skip("set --run-minio-tests to run tests requiring docker and minio")
# Common codec configurations
DELTA_CODEC = {"name": "numcodecs.delta", "configuration": {"dtype": "<i8"}}
ARRAYBYTES_CODEC = {"name": "bytes", "configuration": {"endian": "little"}}
BLOSC_CODEC = {
"name": "blosc",
"configuration": {
"cname": "zstd",
"clevel": 5,
"shuffle": "shuffle",
"typesize": 4,
},
}
ZLIB_CODEC = {"name": "numcodecs.zlib", "configuration": {"level": 1}}
# Helper functions
def _generate_chunk_entries(
shape: tuple[int, ...],
chunks: tuple[int, ...],
entry_generator: Callable[[tuple[int, ...]], dict[str, Any]],
) -> dict[str, dict[str, Any]]:
"""
Generate chunk entries for a manifest based on shape and chunks.
Parameters
----------
shape : tuple of int
The shape of the array
chunks : tuple of int
The chunk size for each dimension
entry_generator : callable
Function that takes chunk indices and returns an entry dict
Returns
-------
dict
Mapping of chunk keys to entry dictionaries
"""
chunk_grid_shape = tuple(
ceildiv(axis_length, chunk_length)
for axis_length, chunk_length in zip(shape, chunks)
)
if chunk_grid_shape == ():
return {"0": entry_generator((0,))}
all_possible_combos = itertools.product(
*[range(length) for length in chunk_grid_shape]
)
return {join(ind): entry_generator(ind) for ind in all_possible_combos}
def _offset_from_chunk_key(ind: tuple[int, ...]) -> int:
"""Generate an offset value from chunk indices."""
return sum(ind) * 10
def _length_from_chunk_key(ind: tuple[int, ...]) -> int:
"""Generate a length value from chunk indices."""
return sum(ind) + 5
def _entry_from_chunk_key(ind: tuple[int, ...]) -> dict[str, str | int]:
"""Generate a (somewhat) unique manifest entry from a given chunk key."""
entry = {
"path": f"/foo.{str(join(ind))}.nc",
"offset": _offset_from_chunk_key(ind),
"length": _length_from_chunk_key(ind),
}
return entry # type: ignore[return-value]
def _generate_chunk_manifest(
netcdf4_file: str,
shape: tuple[int, ...],
chunks: tuple[int, ...],
offset: int = 6144,
length: int = 48,
) -> ChunkManifest:
"""Generate a chunk manifest with sequential offsets for each chunk."""
current_offset = [offset] # Use list to allow mutation in closure
def sequential_entry_generator(ind: tuple[int, ...]) -> dict[str, Any]:
entry = {
"path": netcdf4_file,
"offset": current_offset[0],
"length": length,
}
current_offset[0] += length
return entry
entries = _generate_chunk_entries(shape, chunks, sequential_entry_generator)
return ChunkManifest(entries)
# NetCDF file fixtures
@pytest.fixture
def empty_netcdf4_file(tmp_path: Path) -> str:
"""Create an empty NetCDF4 file."""
filepath = tmp_path / "empty.nc"
with xr.Dataset() as ds:
ds.to_netcdf(filepath, format="NETCDF4")
return str(filepath)
@pytest.fixture
def netcdf4_file(tmp_path: Path) -> str:
"""Create a NetCDF4 file with air temperature data."""
filepath = tmp_path / "air.nc"
with xr.tutorial.open_dataset("air_temperature") as ds:
ds.to_netcdf(filepath, format="NETCDF4")
return str(filepath)
@pytest.fixture
def netcdf4_file_with_data_in_multiple_groups(tmp_path: Path) -> str:
"""Create a NetCDF4 file with data in multiple groups."""
filepath = tmp_path / "test.nc"
ds1 = xr.DataArray([1, 2, 3], name="foo").to_dataset()
ds1.to_netcdf(filepath)
ds2 = xr.DataArray([4, 5], name="bar").to_dataset()
ds2.to_netcdf(filepath, group="subgroup", mode="a")
return str(filepath)
@pytest.fixture
def netcdf4_files_factory(tmp_path: Path) -> Callable[[], tuple[str, str]]:
"""Factory fixture to create multiple NetCDF4 files."""
def create_netcdf4_files(
encoding: Optional[Mapping[str, Mapping[str, Any]]] = None,
) -> tuple[str, str]:
filepath1 = tmp_path / "air1.nc"
filepath2 = tmp_path / "air2.nc"
with xr.tutorial.open_dataset("air_temperature") as ds:
ds1 = ds.isel(time=slice(None, 1460))
ds2 = ds.isel(time=slice(1460, None))
ds1.to_netcdf(filepath1, encoding=encoding)
ds2.to_netcdf(filepath2, encoding=encoding)
return str(filepath1), str(filepath2)
return create_netcdf4_files
@pytest.fixture
def netcdf4_file_with_2d_coords(tmp_path: Path) -> str:
"""Create a NetCDF4 file with 2D coordinates."""
filepath = tmp_path / "ROMS_example.nc"
with xr.tutorial.open_dataset("ROMS_example") as ds:
ds.to_netcdf(filepath, format="NETCDF4")
return str(filepath)
@pytest.fixture
def netcdf4_virtual_dataset(netcdf4_file):
"""Create a virtual dataset from a NetCDF4 file."""
from virtualizarr import open_virtual_dataset
with open_virtual_dataset(netcdf4_file, loadable_variables=[]) as ds:
yield ds
@pytest.fixture
def netcdf4_inlined_ref(netcdf4_file):
"""Create an inlined reference from a NetCDF4 file."""
from kerchunk.hdf import SingleHdf5ToZarr
return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate()
# HDF5 file fixtures
@pytest.fixture
def hdf5_groups_file(tmp_path: Path) -> str:
"""Create an HDF5 file with groups."""
filepath = tmp_path / "air.nc"
with xr.tutorial.open_dataset("air_temperature") as ds:
ds.to_netcdf(filepath, format="NETCDF4", group="test/group")
return str(filepath)
@pytest.fixture
def hdf5_empty(tmp_path: Path) -> str:
"""Create an empty HDF5 file."""
filepath = tmp_path / "empty.nc"
with h5py.File(filepath, "w") as f:
dataset = f.create_dataset("empty", shape=(), dtype="float32")
dataset.attrs["empty"] = "true"
return str(filepath)
@pytest.fixture
def hdf5_scalar(tmp_path: Path) -> str:
"""Create an HDF5 file with a scalar dataset."""
filepath = tmp_path / "scalar.nc"
with h5py.File(filepath, "w") as f:
dataset = f.create_dataset("scalar", data=0.1, dtype="float32")
dataset.attrs["scalar"] = "true"
return str(filepath)
@pytest.fixture
def simple_netcdf4(tmp_path: Path) -> str:
"""Create a simple NetCDF4 file with a single variable."""
filepath = tmp_path / "simple.nc"
arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4)
var = Variable(data=arr, dims=["x", "y"])
ds = xr.Dataset({"foo": var})
ds.to_netcdf(filepath)
return str(filepath)
# Zarr ArrayV3Metadata, ManifestArray, virtual xr.Variable and virtual xr.Dataset fixtures
@pytest.fixture
def array_v3_metadata():
"""Create V3 array metadata with sensible defaults."""
def _create_metadata(
shape: tuple = (5, 5),
chunks: tuple = (5, 5),
data_type: np.dtype = np.dtype("int32"),
codecs: list[dict] | None = None,
fill_value: int | float | None = None,
):
codecs = codecs or [{"configuration": {"endian": "little"}, "name": "bytes"}]
return create_v3_array_metadata(
shape=shape,
chunk_shape=chunks,
data_type=data_type,
codecs=codecs,
fill_value=fill_value or 0,
)
return _create_metadata
@pytest.fixture
def manifest_array(array_v3_metadata):
"""
Create an example ManifestArray with sensible defaults.
The manifest is populated with a (somewhat) unique path, offset, and length for each key.
"""
def _manifest_array(
shape: tuple = (5, 5),
chunks: tuple = (5, 5),
codecs: list[dict] | None = [ARRAYBYTES_CODEC, ZLIB_CODEC],
):
metadata = array_v3_metadata(shape=shape, chunks=chunks, codecs=codecs)
entries = _generate_chunk_entries(shape, chunks, _entry_from_chunk_key)
chunkmanifest = ChunkManifest(entries=entries)
return ManifestArray(chunkmanifest=chunkmanifest, metadata=metadata)
return _manifest_array
@pytest.fixture
def virtual_variable(array_v3_metadata: Callable) -> Callable:
"""Generate a virtual variable with configurable parameters."""
def _virtual_variable(
file_uri: str,
shape: tuple[int, ...] = (3, 4),
chunk_shape: tuple[int, ...] = (3, 4),
dtype: np.dtype = np.dtype("int32"),
codecs: Optional[list[dict[Any, Any]]] = None,
fill_value: Optional[str] = None,
encoding: Optional[dict] = None,
offset: int = 6144,
length: int = 48,
dims: list[str] = [],
attrs: dict[str, Any] = {},
) -> xr.Variable:
manifest = _generate_chunk_manifest(
file_uri,
shape=shape,
chunks=chunk_shape,
offset=offset,
length=length,
)
metadata = array_v3_metadata(
shape=shape,
chunks=chunk_shape,
codecs=codecs,
data_type=dtype,
fill_value=fill_value,
)
ma = ManifestArray(chunkmanifest=manifest, metadata=metadata)
return xr.Variable(
data=ma,
dims=dims,
encoding=encoding,
attrs=attrs,
)
return _virtual_variable
@pytest.fixture
def virtual_dataset(virtual_variable: Callable) -> Callable:
"""Generate a virtual dataset with configurable parameters."""
def _virtual_dataset(
file_uri: str,
shape: tuple[int, ...] = (3, 4),
chunk_shape: tuple[int, ...] = (3, 4),
dtype: np.dtype = np.dtype("int32"),
codecs: Optional[list[dict[Any, Any]]] = None,
fill_value: Optional[str] = None,
encoding: Optional[dict] = None,
variable_name: str = "foo",
offset: int = 6144,
length: int = 48,
dims: Optional[list[str]] = None,
coords: Optional[xr.Coordinates] = None,
) -> xr.Dataset:
with xr.open_dataset(file_uri) as ds:
var = virtual_variable(
file_uri=file_uri,
shape=shape,
chunk_shape=chunk_shape,
dtype=dtype,
codecs=codecs,
fill_value=fill_value,
encoding=encoding,
offset=offset,
length=length,
dims=dims or [str(name) for name in ds.dims],
attrs=ds[variable_name].attrs,
)
return xr.Dataset(
{variable_name: var},
coords=coords,
attrs=ds.attrs,
)
return _virtual_dataset
# Zarr fixtures
@pytest.fixture
def zarr_array():
def create_zarr_array(codecs=None, zarr_format=3):
"""Create a test Zarr array with the specified codecs."""
import zarr
# Create a Zarr array in memory with the codecs
zarr_array = zarr.create(
shape=(1000, 1000),
chunks=(100, 100),
dtype="int32",
store=None,
zarr_format=zarr_format,
codecs=codecs,
)
# Populate the Zarr array with data
zarr_array[:] = np.arange(1000 * 1000).reshape(1000, 1000)
return zarr_array
return create_zarr_array