Skip to content

Commit

Permalink
Implement file locking for Nexrad Level2 and IRIS/Sigmet backends (#269)
Browse files Browse the repository at this point in the history
* introduce file lock for nexrad level2 backend

* add regression test

* add history.md entry

* add iris file lock

* fix history.md
  • Loading branch information
kmuehlbauer authored Feb 7, 2025
1 parent fc43e60 commit 3d7a2ad
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/history.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* FIX: Correct retrieval of intermediate records in nexrad level2 reader ({issue}`259`) ({pull}`261`) by [@kmuehlbauer](https://github.com/kmuehlbauer).
* FIX: Test for magic number BZhX1AY&SY (where X is any number between 0..9) when retrieving BZ2 record indices in nexrad level2 reader ({issue}`264`) ({pull}`266`) by [@kmuehlbauer](https://github.com/kmuehlbauer).
* ENH: Add message type 1 decoding to nexrad level 2 reader ({issue}`256`) ({pull}`267`) by [@kmuehlbauer](https://github.com/kmuehlbauer).
* ENH: Introduce file locks for nexrad level2 and iris backend ({issue}`207`) ({pull}`268`) by [@kmuehlbauer](https://github.com/kmuehlbauer).

## 0.8.0 (2024-11-04)

Expand Down
14 changes: 14 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,17 @@
# Distributed under the MIT License. See LICENSE for more info.

"""Unit test package for xradar."""

import importlib

import pytest


def skip_import(name):
try:
importlib.import_module(name)
found = True
except ImportError:
found = False

return pytest.mark.skipif(not found, reason=f"requires {name}")
18 changes: 18 additions & 0 deletions tests/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import xarray as xr

import xradar.io
from tests import skip_import
from xradar.io import (
open_cfradial1_datatree,
open_datamet_datatree,
Expand Down Expand Up @@ -1101,3 +1102,20 @@ def test_open_nexradlevel2_datatree(nexradlevel2_files):
}
assert np.round(ds.elevation.mean().values.item(), 1) == elevations[i]
assert ds.sweep_number.values == int(grp[6:])


@skip_import("dask")
@pytest.mark.parametrize(
"nexradlevel2_files", ["nexradlevel2_gzfile", "nexradlevel2_bzfile"], indirect=True
)
def test_nexradlevel2_dask_load(nexradlevel2_files):
ds = xr.open_dataset(nexradlevel2_files, group="sweep_0", engine="nexradlevel2")
dsc = ds.chunk()
dsc.load()


@skip_import("dask")
def test_iris_dask_load(iris0_file):
ds = xr.open_dataset(iris0_file, group="sweep_0", engine="iris")
dsc = ds.chunk()
dsc.load()
22 changes: 16 additions & 6 deletions xradar/io/backends/iris.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from xarray import DataTree
from xarray.backends.common import AbstractDataStore, BackendArray, BackendEntrypoint
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.locks import SerializableLock, ensure_lock
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict
Expand All @@ -72,6 +73,9 @@
_get_subgroup,
)

IRIS_LOCK = SerializableLock()


#: mapping from IRIS names to CfRadial2/ODIM
iris_mapping = {
"DB_DBT": "DBTH",
Expand Down Expand Up @@ -3786,9 +3790,10 @@ def __init__(self, datastore, name, var):
self.shape = (nrays, nbins)

def _getitem(self, key):
# read the data and put it into dict
self.datastore.root.get_moment(self.group, self.name)
return self.datastore.ds["sweep_data"][self.name][key]
with self.datastore.lock:
# read the data and put it into dict
self.datastore.root.get_moment(self.group, self.name)
return self.datastore.ds["sweep_data"][self.name][key]

def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
Expand All @@ -3805,16 +3810,19 @@ class IrisStore(AbstractDataStore):
Ported from wradlib.
"""

def __init__(self, manager, group=None):
def __init__(self, manager, group=None, lock=IRIS_LOCK):
self._manager = manager
self._group = int(group[6:]) + 1
self._filename = self.filename
self._need_time_recalc = False
self.lock = ensure_lock(lock)

@classmethod
def open(cls, filename, mode="r", group=None, **kwargs):
def open(cls, filename, mode="r", group=None, lock=None, **kwargs):
if lock is None:
lock = IRIS_LOCK
manager = CachingFileManager(IrisRawFile, filename, mode=mode, kwargs=kwargs)
return cls(manager, group=group)
return cls(manager, group=group, lock=lock)

@property
def filename(self):
Expand Down Expand Up @@ -3991,6 +3999,7 @@ def open_dataset(
use_cftime=None,
decode_timedelta=None,
group=None,
lock=None,
first_dim="auto",
reindex_angle=False,
fix_second_angle=False,
Expand All @@ -4000,6 +4009,7 @@ def open_dataset(
store = IrisStore.open(
filename_or_obj,
group=group,
lock=lock,
loaddata=False,
)

Expand Down
81 changes: 46 additions & 35 deletions xradar/io/backends/nexrad_level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from xarray import DataTree
from xarray.backends.common import AbstractDataStore, BackendArray, BackendEntrypoint
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.locks import SerializableLock, ensure_lock
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import FrozenDict, close_on_error
Expand Down Expand Up @@ -78,6 +79,9 @@
string_dict,
)

NEXRADL2_LOCK = SerializableLock()


#: mapping from NEXRAD names to CfRadial2/ODIM
nexrad_mapping = {
"REF": "DBZH",
Expand Down Expand Up @@ -1324,36 +1328,32 @@ def __init__(self, datastore, name, var):
self.shape = (nrays, nbins)

def _getitem(self, key):
# read the data if not available
try:
data = self.datastore.ds["sweep_data"][self.name]["data"]
print("ZZZZZAA:", self.name, data, key)
except KeyError:
print("XXXXX:", self.group, self.name)
self.datastore.root.get_data(self.group, self.name)
data = self.datastore.ds["sweep_data"][self.name]["data"]
print("ZZZZZBB:", self.name, data, key)
print("YY0:", self.name, len(data), len(data[0]))
# see 3.2.4.17.6 Table XVII-I Data Moment Characteristics and Conversion for Data Names
word_size = self.datastore.ds["sweep_data"][self.name]["word_size"]
if self.name == "PHI" and word_size == 16:
# 10 bit mask, but only for 2 byte data
x = np.uint16(0x3FF)
elif self.name == "ZDR" and word_size == 16:
# 11 bit mask, but only for 2 byte data
x = np.uint16(0x7FF)
else:
x = np.uint8(0xFF)
print("YY1:", self.name, len(data[0]), self.shape)
if len(data[0]) < self.shape[1]:
return np.pad(
np.vstack(data) & x,
((0, 0), (0, self.shape[1] - len(data[0]))),
mode="constant",
constant_values=0,
)[key]
else:
return (np.vstack(data) & x)[key]
with self.datastore.lock:
# read the data if not available
try:
data = self.datastore.ds["sweep_data"][self.name]["data"]
except KeyError:
self.datastore.root.get_data(self.group, self.name)
data = self.datastore.ds["sweep_data"][self.name]["data"]
# see 3.2.4.17.6 Table XVII-I Data Moment Characteristics and Conversion for Data Names
word_size = self.datastore.ds["sweep_data"][self.name]["word_size"]
if self.name == "PHI" and word_size == 16:
# 10 bit mask, but only for 2 byte data
x = np.uint16(0x3FF)
elif self.name == "ZDR" and word_size == 16:
# 11 bit mask, but only for 2 byte data
x = np.uint16(0x7FF)
else:
x = np.uint8(0xFF)
if len(data[0]) < self.shape[1]:
return np.pad(
np.vstack(data) & x,
((0, 0), (0, self.shape[1] - len(data[0]))),
mode="constant",
constant_values=0,
)[key]
else:
return (np.vstack(data) & x)[key]

def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
Expand All @@ -1365,24 +1365,29 @@ def __getitem__(self, key):


class NexradLevel2Store(AbstractDataStore):
def __init__(self, manager, group=None):
def __init__(self, manager, group=None, lock=NEXRADL2_LOCK):
self._manager = manager
self._group = int(group[6:])
self._filename = self.filename
self.lock = ensure_lock(lock)

@classmethod
def open(cls, filename, mode="r", group=None, **kwargs):
def open(cls, filename, mode="r", group=None, lock=None, **kwargs):
if lock is None:
lock = NEXRADL2_LOCK
manager = CachingFileManager(
NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
)
return cls(manager, group=group)
return cls(manager, group=group, lock=lock)

@classmethod
def open_groups(cls, filename, groups, mode="r", **kwargs):
def open_groups(cls, filename, groups, mode="r", lock=None, **kwargs):
if lock is None:
lock = NEXRADL2_LOCK
manager = CachingFileManager(
NEXRADLevel2File, filename, mode=mode, kwargs=kwargs
)
return {group: cls(manager, group=group) for group in groups}
return {group: cls(manager, group=group, lock=lock) for group in groups}

@property
def filename(self):
Expand Down Expand Up @@ -1534,6 +1539,7 @@ def open_dataset(
use_cftime=None,
decode_timedelta=None,
group=None,
lock=None,
first_dim="auto",
reindex_angle=False,
fix_second_angle=False,
Expand All @@ -1543,6 +1549,7 @@ def open_dataset(
store = NexradLevel2Store.open(
filename_or_obj,
group=group,
lock=lock,
loaddata=False,
)

Expand Down Expand Up @@ -1610,6 +1617,7 @@ def open_nexradlevel2_datatree(
fix_second_angle=False,
site_coords=True,
optional=True,
lock=None,
**kwargs,
):
"""Open a NEXRAD Level2 dataset as an `xarray.DataTree`.
Expand Down Expand Up @@ -1732,6 +1740,7 @@ def open_nexradlevel2_datatree(
fix_second_angle=fix_second_angle,
site_coords=site_coords,
optional=optional,
lock=lock,
**kwargs,
)
ls_ds: list[xr.Dataset] = [sweep_dict[sweep] for sweep in sweep_dict.keys()]
Expand Down Expand Up @@ -1771,10 +1780,12 @@ def open_sweeps_as_dict(
fix_second_angle=False,
site_coords=True,
optional=True,
lock=None,
**kwargs,
):
stores = NexradLevel2Store.open_groups(
filename=filename_or_obj,
lock=lock,
groups=sweeps,
)
groups_dict = {}
Expand Down

0 comments on commit 3d7a2ad

Please sign in to comment.