diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..16fede8 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +concurrency = multiprocessing diff --git a/bin/test b/bin/test index a9195dc..458108a 100755 --- a/bin/test +++ b/bin/test @@ -4,5 +4,6 @@ set -ex coverage erase coverage run --source harmony_netcdf_to_zarr -m unittest discover -s tests -t .. +coverage combine coverage report coverage xml diff --git a/harmony_netcdf_to_zarr/adapter.py b/harmony_netcdf_to_zarr/adapter.py index 09ee66a..0aa33ff 100644 --- a/harmony_netcdf_to_zarr/adapter.py +++ b/harmony_netcdf_to_zarr/adapter.py @@ -13,7 +13,7 @@ from harmony import BaseHarmonyAdapter from harmony.util import generate_output_filename, HarmonyException -from .convert import netcdf_to_zarr, make_localstack_s3fs, make_s3fs +from .convert import make_localstack_s3fs, make_s3fs, mosaic_to_zarr from .download_utilities import download_granules from .stac_utilities import get_netcdf_urls, get_output_catalog @@ -22,18 +22,14 @@ class ZarrException(HarmonyException): - """ - Exception thrown during Zarr conversion - """ + """ Exception thrown during Zarr conversion """ def __init__(self, message=None): super().__init__(message, 'harmonyservices/netcdf-to-zarr') class NetCDFToZarrAdapter(BaseHarmonyAdapter): - """ - Translates NetCDF4 to Zarr - """ + """ Translates NetCDF4 to Zarr """ def __init__(self, message, catalog=None, config=None): """ @@ -96,9 +92,6 @@ def process_items_many_to_one(self): self.message.accessToken, self.config, self.logger) - # Enable during DAS-1379: - # dimensions_mapping = DimensionsMapping(local_file_paths) - if len(local_file_paths) == 1: output_name = generate_output_filename(netcdf_urls[0], ext='.zarr') @@ -112,13 +105,7 @@ def process_items_many_to_one(self): zarr_store = self.s3.get_mapper(root=zarr_root, check=False, create=True) - if len(local_file_paths) == 1: - # Temporarily retain old version of NetCDF-to-Zarr service - netcdf_to_zarr(local_file_paths[0], zarr_store) - else: - # TODO: DAS-1379 - implement many-to-one Zarr output and - # enable concatenation. - raise NotImplementedError('Concatenation is not yet supported.') + mosaic_to_zarr(local_file_paths, zarr_store) return get_output_catalog(self.catalog, zarr_root) except Exception as service_exception: diff --git a/harmony_netcdf_to_zarr/convert.py b/harmony_netcdf_to_zarr/convert.py index 20be654..fcb725e 100644 --- a/harmony_netcdf_to_zarr/convert.py +++ b/harmony_netcdf_to_zarr/convert.py @@ -1,24 +1,40 @@ -import collections -import os +from collections import Sequence +from multiprocessing import Manager, Process, Queue +from multiprocessing.managers import Namespace +from os import cpu_count, environ +from os.path import splitext +from queue import Empty as QueueEmpty +from re import findall +from typing import Any, List, Set, Tuple, Union import sys -import multiprocessing -from multiprocessing import Semaphore -from typing import Union -import re +from fsspec.mapping import FSMap +from netCDF4 import Dataset, Group as NetCDFGroup, Variable as NetCDFVariable from s3fs import S3FileSystem +from zarr import (DirectoryStore, group as create_zarr_group, + Group as ZarrGroup, ProcessSynchronizer) +from zarr.core import Array as ZarrArray +from zarr.convenience import consolidate_metadata import numpy as np -import zarr -from netCDF4 import Dataset -region = os.environ.get('AWS_DEFAULT_REGION') or 'us-west-2' +from .mosaic_utilities import DimensionsMapping, resolve_reference_path -# Some global that may be shared by different methods -binary_prefix_conversion_map = {"Ki": 1024, "Mi": 1048576, "Gi": 1073741824} +# Types for function signatures +Number = Union[np.integer, np.floating, int, float] +ZarrStore = Union[DirectoryStore, FSMap] -def make_localstack_s3fs(): - host = os.environ.get('LOCALSTACK_HOST') or 'host.docker.internal' +# Some global variables that may be shared by different methods +region = environ.get('AWS_DEFAULT_REGION') or 'us-west-2' +# This dictionary converts from a string representation of units, such as +# kibibytes, mebibytes or gibibytes, to a raw number of bytes. This is used +# when a compressed chunk size is expressed as a string. See the NIST standard +# for binary prefix: https://physics.nist.gov/cuu/Units/binary.html. +binary_prefix_conversion_map = {'Ki': 1024, 'Mi': 1048576, 'Gi': 1073741824} + + +def make_localstack_s3fs() -> S3FileSystem: + host = environ.get('LOCALSTACK_HOST') or 'host.docker.internal' return S3FileSystem( use_ssl=False, key='ACCESS_KEY', @@ -28,49 +44,136 @@ def make_localstack_s3fs(): endpoint_url=f'http://{host}:4572')) -def make_s3fs(): +def make_s3fs() -> S3FileSystem: return S3FileSystem(client_kwargs=dict(region_name=region)) -def netcdf_to_zarr(src, dst): - """ - Convert the NetCDF file at src to the zarr file at dst, preserving data, metadata, and - group hierarchy +def mosaic_to_zarr(input_granules: List[str], zarr_store: Union[FSMap, str], + process_count: int = None): + """ Convert input NetCDF files to a Zarr store, preserving data, metadata + and group hierarchy. + This function makes use of multiple processes, to parallelise the + output of multiple granules. The number of processes will be the + smallest option of: + + * The number available CPUs. + * The number of input granules. + * The number of requested processes. + + Parameters: + ___________ + input_granules: a list of file paths for local NetCDF-4 files to + convert. + zarr_store: a location on disc into which a zarr.DirectoryStore will + be written or a MutableMapping into which Zarr data can be written. + This MutableMapping object points at the S3 object that represents + the root of the Zarr store, which is essentially a directory in + the S3 bucket. - Parameters - ---------- - src : string | netCDF4.Dataset - The file to convert, either a location on disk or an already-opened dataset - dst : string | collections.MutableMapping - The output zarr file. Either a location on disk into which a zarr.DirectoryStore - will be written or a MutableMapping into which zarr data can be written. """ - managed_resources = [] + if isinstance(zarr_store, str): + zarr_store = DirectoryStore(zarr_store) + + # Write dimension information from DimensionsMapping (including bounds) + # Store list of aggregated dimensions/variables to avoid writing them again + dim_mapping = DimensionsMapping(input_granules) + aggregated_dimensions = __copy_aggregated_dimensions(dim_mapping, + zarr_store) + + if process_count is None: + process_count = min(cpu_count(), len(input_granules)) + else: + process_count = min(process_count, cpu_count(), len(input_granules)) + + with Manager() as manager: + output_queue = manager.Queue(len(input_granules)) + shared_namespace = manager.Namespace() + + if isinstance(zarr_store, DirectoryStore): + shared_namespace.store_type = 'DirectoryStore' + shared_namespace.zarr_root = zarr_store.dir_path() + else: + shared_namespace.store_type = 'S3FileSystem' + shared_namespace.zarr_root = zarr_store.root + + for input_granule in input_granules: + output_queue.put(input_granule) + + processes = [Process(target=_output_worker, + args=(output_queue, shared_namespace, + aggregated_dimensions, input_granules)) + for _ in range(process_count)] + + for output_process in processes: + output_process.start() + + for output_process in processes: + output_process.join() + output_process.close() + + if hasattr(shared_namespace, 'exception'): + raise RuntimeError('Problem writing output data to Zarr store: ' + f'{shared_namespace.exception}') + + consolidate_metadata(zarr_store) + try: - # Allow passing in a path to a store or a file - if isinstance(src, str): - src = Dataset(src, 'r') - managed_resources.append(src) + zarr_store.close() + except AttributeError: + pass - if isinstance(dst, str): - dst = zarr.DirectoryStore(dst) - managed_resources.append(dst) - src.set_auto_mask(False) - src.set_auto_scale(True) - __copy_group(src, zarr.group(dst, overwrite=True)) - zarr.convenience.consolidate_metadata(dst) +def _output_worker(output_queue: Queue, shared_namespace: Namespace, + aggregated_dimensions: Set[str], input_granules: List[str]): + """ This worker function is executed in a spawned process. It checks for + items in the main queue, which correspond to local file paths for input + NetCDF-4 files. If there is at least one URL left for writing, then the + groups, variables and attributes from that NetCDF-4 file are written to + the output Zarr store, in the related slice of the aggregated output + array. - finally: - for resource in managed_resources: - try: - resource.close() - except BaseException: - pass + """ + if shared_namespace.store_type == 'S3FileSystem': + if environ.get('USE_LOCALSTACK') == 'true': + s3 = make_localstack_s3fs() + else: + s3 = make_s3fs() + zarr_store = s3.get_mapper(root=shared_namespace.zarr_root, + check=False, create=True) + else: + zarr_store = DirectoryStore(shared_namespace.zarr_root) + + zarr_synchronizer = ProcessSynchronizer( + f'{splitext(shared_namespace.zarr_root[0])}.sync' + ) + dim_mapping = DimensionsMapping(input_granules) -def scale_attribute(src, attr, scale_factor, add_offset): + while not hasattr(shared_namespace, 'exception') and not output_queue.empty(): + try: + input_granule = output_queue.get_nowait() + except QueueEmpty: + break + + try: + with Dataset(input_granule, 'r') as input_dataset: + input_dataset.set_auto_mask(False) + input_dataset.set_auto_scale(True) + __copy_group(input_dataset, + create_zarr_group(zarr_store, overwrite=False), + zarr_synchronizer, dim_mapping, + aggregated_dimensions) + except Exception as exception: + # If there was an issue, save a string message from the raised + # exception. This will cause the other processes to stop processing + # input NetCDF-4 files. + shared_namespace.exception = str(exception) + raise exception + + +def scale_attribute(src: NetCDFVariable, attr: Any, scale_factor: Number, + add_offset: Number): """ Scales an unscaled NetCDF attribute @@ -95,12 +198,346 @@ def scale_fn(x): return float(x * scale_factor + add_offset) unscaled = getattr(src, attr) - if isinstance(unscaled, collections.Sequence) or isinstance(unscaled, np.ndarray): + if isinstance(unscaled, Sequence) or isinstance(unscaled, np.ndarray): return [scale_fn(u) for u in unscaled] else: return scale_fn(unscaled) +def __copy_aggregated_dimensions(dim_mapping: DimensionsMapping, + zarr_store: ZarrStore) -> Set[str]: + """ Iterate through all aggregated dimensions, and their associated bounds, + and write these dimensions to the output Zarr store. A list of + aggregated dimensions are retained, so that the data values are not + overwritten when writing all other variables. + + To limit this function to the scope of TRT-121, only aggregated + temporal dimensions are considered. The aggregated spacial information + exists in the `DimensionsMapping` instance, so the temporal check could + be removed to allow all aggregated, 1-D grid dimensions to be inserted + into the Zarr store. + + """ + root_group = create_zarr_group(zarr_store, overwrite=True) + aggregated_dimensions = set() + + for output_dimension in dim_mapping.output_dimensions.values(): + if output_dimension.is_temporal(): + __copy_aggregated_dimension(output_dimension.dimension_path, + output_dimension.values, + output_dimension.units, root_group) + aggregated_dimensions.add(output_dimension.dimension_path) + + if output_dimension.bounds_path is not None: + __copy_aggregated_dimension(output_dimension.bounds_path, + output_dimension.bounds_values, + output_dimension.units, root_group) + aggregated_dimensions.add(output_dimension.bounds_path) + + return aggregated_dimensions + + +def __copy_aggregated_dimension(variable_path: str, variable_data: np.ndarray, + variable_units: str, root_group: ZarrGroup): + """ This function will copy variable data, but not metadata, from the + supplied variable array. The units metadata are added, as these are + contained in the `DimensionInformation` class. Other metadata attributes + will be later added when all variables are iterated through within each + granule. + + Technically, this function is used for both aggregated dimensions and + any associated bounds variables. + + """ + variable_path_pieces = variable_path.lstrip('/').split('/') + variable_basename = variable_path_pieces.pop() + parent_group = root_group + + for nested_group in variable_path_pieces: + parent_group = parent_group.require_group(nested_group) + + new_chunks = compute_chunksize(variable_data.shape, variable_data.dtype) + + zarr_variable = parent_group.require_dataset(variable_basename, + data=variable_data, + shape=variable_data.size, + chunks=tuple(new_chunks), + dtype=variable_data.dtype) + + zarr_variable.attrs.put({'units': variable_units}) + + +def __copy_group(netcdf_group: NetCDFGroup, zarr_group: ZarrGroup, + zarr_synchronizer: ProcessSynchronizer, + dim_mapping: DimensionsMapping, + aggregated_dimensions: Set[str] = set()): + """ Recursively copies the source netCDF4 group into the destination Zarr + group, along with all sub-groups, variables, and attributes + + Parameters + ---------- + netcdf_group : netCDF4.Group + the NetCDF group to copy from + zarr_group : zarr.hierarchy.Group + the existing Zarr group to copy into + zarr_syncronizer: zarr.ProcessSynchronizer + ensures that multiple processes can safely access the same Zarr + store. + dim_mapping: mosaic_utilities.DimensionsMapping + contains aggregated dimension values and units metadata + aggregated_dimensions: Set[str] + a set of full string paths of aggregated dimensions. As of TRT-121 + these are only temporal dimensions (and associated bounds variables) + + """ + __copy_attrs(netcdf_group, zarr_group) + + for child_group_name, child_netcdf_group in netcdf_group.groups.items(): + __copy_group(child_netcdf_group, + zarr_group.require_group(child_group_name.split('/').pop()), + zarr_synchronizer, dim_mapping, aggregated_dimensions) + + for variable_name, netcdf_variable in netcdf_group.variables.items(): + __copy_variable(netcdf_variable, zarr_group, zarr_synchronizer, + variable_name, dim_mapping, aggregated_dimensions) + + +def __copy_variable(netcdf_variable: NetCDFVariable, zarr_group: ZarrGroup, + zarr_synchronizer: ProcessSynchronizer, variable_name: str, + dim_mapping: DimensionsMapping, + aggregated_dimensions: Set[str] = set()): + """ Copies the variable from the NetCDF variable into the Zarr group, + giving it the provided variable_name. + + Parameters + ---------- + netcdf_variable : netCDF4.Variable + the source variable to copy + zarr_group : zarr.hierarchy.Group + the group into which to copy the variable + zarr_synchronizer: zarr.ProcessSynchronizer + ensures safe writing to the same Zarr store from multiple processes + variable_name : string + the variable_name of the variable in the destination group + dim_mapping: DimensionsMapping + an object containing aggregated dimension information. + aggregated_dimensions: Set[str] + a set of all dimension variable names that have been aggregated. + This ensures that the aggregated data will not be overwritten by + the input source data. + + """ + # create zarr group/dataset + chunks = netcdf_variable.chunking() + if chunks == 'contiguous' or chunks is None: + chunks = netcdf_variable.shape + + # Apply scale factor and offset to attributes that are not automatically + # scaled by NetCDF + scaled = {} + scale_factor = getattr(netcdf_variable, 'scale_factor', 1.0) + add_offset = getattr(netcdf_variable, 'add_offset', 0.0) + + if scale_factor != 1.0 or add_offset != 0.0: + unscaled_attributes = ['valid_range', 'valid_min', 'valid_max', + '_FillValue', 'missing_value'] + present_attributes = [attr for attr in unscaled_attributes + if hasattr(netcdf_variable, attr)] + scaled = {attr: scale_attribute(netcdf_variable, attr, scale_factor, add_offset) + for attr in present_attributes} + + if not chunks and len(netcdf_variable.dimensions) == 0: + # Treat a 0-dimensional NetCDF variable as a zarr group + zarr_variable = zarr_group.require_group(variable_name) + else: + if hasattr(netcdf_variable, 'add_offset'): + dtype = netcdf_variable.add_offset.dtype + elif hasattr(netcdf_variable, 'scale_factor'): + dtype = netcdf_variable.scale_factor.dtype + else: + dtype = netcdf_variable.dtype + + # Derive the aggregated shape, used for both the chunk size calculation + # and as the shape of the output Zarr variable. + aggregated_shape = __get_aggregated_shape(netcdf_variable, dim_mapping, + aggregated_dimensions) + new_chunks = compute_chunksize(aggregated_shape, dtype) + + fill_value = scaled.get('_FillValue', + getattr(netcdf_variable, '_FillValue', 0)) + + zarr_variable = zarr_group.require_dataset(variable_name, + shape=aggregated_shape, + chunks=tuple(new_chunks), + dtype=dtype, + fill_value=fill_value, + synchronizer=zarr_synchronizer) + + resolved_variable_name = resolve_reference_path(netcdf_variable, + variable_name) + if resolved_variable_name not in aggregated_dimensions: + # For a non-aggregated dimension, insert input granule data + __insert_data_slice(netcdf_variable, zarr_variable, + resolved_variable_name, dim_mapping) + + # xarray requires the _ARRAY_DIMENSIONS metadata to know how to label axes + __copy_attrs(netcdf_variable, zarr_variable, scaled, + _ARRAY_DIMENSIONS=list(netcdf_variable.dimensions)) + + +def __get_aggregated_shape(netcdf_variable: NetCDFVariable, + dim_mapping: DimensionsMapping, + aggregated_dimensions) -> Tuple[int]: + """ Derive the output array shape for a given input NetCDF-4 variable. + There are several possible use-cases: + + * An input variable matches an aggregated dimension, so the output + dimension shape should be used instead of the input dimension shape. + * An input variable matches an aggregated bounds variable, so the + output bounds shape should be used instead of the input variable + shape. + * An input variable has a dimension that has been aggregated (e.g., a + temporal dimension). For this dimension, the array size should match + the length of the aggregated temporal dimension. Unaggregated + dimension sizes should still match the input. + * An input variable has no aggregated dimensions. The output variable + shape should match in the input variable shape. + + """ + variable_path = resolve_reference_path(netcdf_variable, + netcdf_variable.name) + + if variable_path in aggregated_dimensions: + if variable_path.endswith('_bnds'): + dim_path = variable_path[:-5] + aggregated_shape = dim_mapping.output_dimensions[dim_path].bounds_values.shape + else: + aggregated_shape = dim_mapping.output_dimensions[variable_path].values.shape + else: + aggregated_shape = [] + for dim_index, dim_name in enumerate(netcdf_variable.dimensions): + dimension_path = resolve_reference_path(netcdf_variable, dim_name) + + if dimension_path in aggregated_dimensions: + aggregated_shape.append( + dim_mapping.output_dimensions[dimension_path].values.size + ) + else: + aggregated_shape.append(netcdf_variable.shape[dim_index]) + + return tuple(aggregated_shape) + + +def __insert_data_slice(netcdf_variable: NetCDFVariable, zarr_variable: ZarrArray, + variable_name: str, dim_mapping: DimensionsMapping): + """ A helper function that identifies the index ranges in the aggregated + output dimension into which the input values from the NetCDF-4 + variable should be inserted, and then updates the output Zarr store + with these data. + + """ + netcdf_file_path = netcdf_variable.group().filepath() + + dimension_indices = [] + + for dimension in netcdf_variable.dimensions: + dimension_path = resolve_reference_path(netcdf_variable, dimension) + + if ( + dimension_path in dim_mapping.output_dimensions + and dim_mapping.output_dimensions[dimension_path].is_temporal() + ): + output_dimension = dim_mapping.output_dimensions[dimension_path] + input_dimension = (dim_mapping.input_dimensions[dimension_path] + [netcdf_file_path]) + input_values = input_dimension.get_values(output_dimension.units) + output_indices = np.where(np.in1d(output_dimension.values, + input_values))[0] + + # This assumes that all input grid values from a single granule + # represent a continuous segment of the output dimension. Also, + # `slice(m, n)` will address from element `m`, up to (but not + # including) `n`, so need to extend the upper end of the slice by 1 + # to include all input data. + dimension_indices.append(slice(output_indices.min(), + output_indices.max() + 1)) + else: + # Currently only supporting temporal aggregation. Other dimensions, + # such as spatial dimensions, are assumed to be identical in all + # input granules. + dimension_indices.append(slice(None)) + + zarr_variable[tuple(dimension_indices)] = netcdf_variable[:] + + +def __copy_attrs(netcdf_input: Union[NetCDFVariable, NetCDFGroup], + zarr_output: Union[ZarrGroup, ZarrArray], scaled={}, **kwargs): + """ Copies all attributes from the source group or variable into the + destination group or variable. If the Zarr store already has that + attribute, it is not overwritten. For example, the units attribute of + aggregated temporal dimensions. + + Converts netCDF4 variable values from their native type (typically + Numpy dtypes) into JSON-serializable values that Zarr can store + + Parameters + ---------- + netcdf_input : netCDF4.Group | netCDF4.Variable + The source from which to copy attributes + zarr_output : zarr.hierarchy.Group | zarr.core.Array + The destination into which to copy attributes. + scaled : dict + attributes that require, and have been, scaled by the scale_factor + and add_offset attributes + **kwargs : dict + Additional attributes to add to the destination + + """ + existing_attributes = set(zarr_output.attrs.keys()) + new_attributes = {key: __netcdf_attr_to_python(getattr(netcdf_input, key)) + for key in netcdf_input.ncattrs()} + + new_attributes.update(kwargs) + new_attributes.update(scaled) + new_attributes.pop('scale_factor', None) + new_attributes.pop('add_offset', None) + + for existing_attribute in existing_attributes: + new_attributes.pop(existing_attribute, None) + + zarr_output.attrs.update(new_attributes) + + +def __netcdf_attr_to_python(val: Any) -> Any: + """ + Given an attribute value read from a NetCDF file (typically a numpy type), + returns the value as a Python primitive type, e.g. np.integer -> int. + + Returns the value unaltered if it does not need conversion or is unrecognized + + Parameters + ---------- + val : any + An attribute value read from a NetCDF file + + Returns + ------- + any + The converted value + """ + if isinstance(val, np.integer): + return int(val) + elif isinstance(val, np.floating): + return float(val) + elif isinstance(val, np.ndarray): + return [__netcdf_attr_to_python(v) for v in val.tolist()] + elif isinstance(val, bytes): + # Assumes bytes are UTF-8 strings. This holds for attributes. + return val.decode('utf-8') + else: + return val + + def compute_chunksize(shape: Union[tuple, list], datatype: str, compression_ratio: float = 1.5, @@ -138,20 +575,21 @@ def compute_chunksize(shape: Union[tuple, list], # convert compressed_chunksize_byte to integer if it's a str if type(compressed_chunksize_byte) == str: try: - (value, unit) = re.findall( - r"^\s*([\d.]+)\s*(Ki|Mi|Gi)\s*$", compressed_chunksize_byte + (value, unit) = findall( + r'^\s*([\d.]+)\s*(Ki|Mi|Gi)\s*$', compressed_chunksize_byte )[0] except IndexError: - err_message = """Chunksize needs to be either an integer or string. -If it's a string, assuming it follows NIST standard for binary prefix - (https://physics.nist.gov/cuu/Units/binary.html) -except that only Ki, Mi, and Gi are allowed.""" - raise ValueError(err_message) + raise ValueError('Chunksize needs to be either an integer or ' + 'string. If it\'s a string, assuming it follows ' + 'NIST standard for binary prefix ' + '(https://physics.nist.gov/cuu/Units/binary.html)' + ' except that only Ki, Mi, and Gi are allowed.') + compressed_chunksize_byte = int(float(value)) * int(binary_prefix_conversion_map[unit]) # get product of chunksize along different dimensions before compression if compression_ratio < 1.: - raise ValueError("Compression ratio < 1 found when estimating chunk size.") + raise ValueError('Compression ratio < 1 found when estimating chunk size.') chunksize_unrolled = int( compressed_chunksize_byte * compression_ratio / np.dtype(datatype).itemsize ) @@ -177,161 +615,5 @@ def compute_chunksize(shape: Union[tuple, list], return suggested_chunksize -def __copy_variable(src, dst_group, name, sema=Semaphore(20)): - """ - Copies the variable from the NetCDF src variable into the Zarr group dst_group, giving - it the provided name - - Parameters - ---------- - src : netCDF4.Variable - the source variable to copy - dst_group : zarr.hierarchy.Group - the group into which to copy the variable - name : string - the name of the variable in the destination group - sema: multiprocessing.synchronize.Semaphore - Semaphore used to limit concurrent processes - NOTE: the default value 20 is empirical - - Returns - ------- - zarr.core.Array - the copied variable - """ - # acquire Semaphore - sema.acquire() - - # connect to s3 - if os.environ.get('USE_LOCALSTACK') == 'true': - s3 = make_localstack_s3fs() - else: - s3 = make_s3fs() - - group_name = os.path.join(dst_group.store.root, dst_group.path) - dst = s3.get_mapper(root=group_name, check=False, create=True) - dst_group = zarr.group(dst) - - # create zarr group/dataset - chunks = src.chunking() - if chunks == 'contiguous' or chunks is None: - chunks = src.shape - if not chunks and len(src.dimensions) == 0: - # Treat a 0-dimensional NetCDF variable as a zarr group - dst = dst_group.create_group(name) - else: - dtype = src.dtype - dtype = src.scale_factor.dtype if hasattr(src, 'scale_factor') else dtype - dtype = src.add_offset.dtype if hasattr(src, 'add_offset') else dtype - new_chunks = compute_chunksize(src.shape, dtype) - dst = dst_group.create_dataset(name, - data=src, - shape=src.shape, - chunks=tuple(new_chunks), - dtype=dtype) - - # Apply scale factor and offset to attributes that are not automatically scaled by NetCDF - scaled = {} - scale_factor = getattr(src, 'scale_factor', 1.0) - add_offset = getattr(src, 'add_offset', 0.0) - if scale_factor != 1.0 or add_offset != 0.0: - unscaled_attributes = ['valid_range', 'valid_min', 'valid_max', '_FillValue', 'missing_value'] - present_attributes = [attr for attr in unscaled_attributes if hasattr(src, attr)] - scaled = {attr: scale_attribute(src, attr, scale_factor, add_offset) for attr in present_attributes} - - # xarray requires the _ARRAY_DIMENSIONS metadata to know how to label axes - __copy_attrs(src, dst, scaled, _ARRAY_DIMENSIONS=list(src.dimensions)) - - # release Semaphore - sema.release() - - return dst - - -def __copy_attrs(src, dst, scaled={}, **kwargs): - """ - Copies all attributes from the source group or variable into the destination group or variable. - Converts netCDF4 variable values from their native type (typically Numpy dtypes) into - JSON-serializable values that Zarr can store - - Parameters - ---------- - src : netCDF4.Group | netCDF4.Variable - The source from which to copy attributes - dst : zarr.hierarchy.Group | zarr.core.Array - The destination into which to copy attributes. - **kwargs : dict - Additional attributes to add to the destination - """ - attrs = {key: __netcdf_attr_to_python(getattr(src, key)) for key in src.ncattrs()} - attrs.update(kwargs) - attrs.update(scaled) - attrs.pop('scale_factor', None) - attrs.pop('add_offset', None) - dst.attrs.put(attrs) - - -def __copy_group(src, dst): - """ - Recursively copies the source netCDF4 group into the destination Zarr group, along with - all sub-groups, variables, and attributes - NOTE: the variables will be copied in parallel processes via multiprocessing; - 'fork' is used as the start-method because OSX/Windows is using 'spawn' by default, - which will introduce overhead and difficulties pickling data objects (and to the test); - Semaphore is used to limit the number of concurrent processes, - which is set to double the number of cpu-s found on the host - - Parameters - ---------- - src : netCDF4.Group - the NetCDF group to copy from - dst : zarr.hierarchy.Group - the existing Zarr group to copy into - """ - __copy_attrs(src, dst) - - for name, item in src.groups.items(): - __copy_group(item, dst.create_group(name.split('/').pop())) - - procs = [] - fork_ctx = multiprocessing.get_context('fork') - sema = Semaphore(multiprocessing.cpu_count() * 2) - for name, item in src.variables.items(): - proc = fork_ctx.Process(target=__copy_variable, args=(item, dst, name, sema)) - proc.start() - procs.append(proc) - for proc in procs: - proc.join() - - -def __netcdf_attr_to_python(val): - """ - Given an attribute value read from a NetCDF file (typically a numpy type), - returns the value as a Python primitive type, e.g. np.integer -> int. - - Returns the value unaltered if it does not need conversion or is unrecognized - - Parameters - ---------- - val : any - An attribute value read from a NetCDF file - - Returns - ------- - any - The converted value - """ - if isinstance(val, np.integer): - return int(val) - elif isinstance(val, np.floating): - return float(val) - elif isinstance(val, np.ndarray): - return [__netcdf_attr_to_python(v) for v in val.tolist()] - elif isinstance(val, bytes): - # Assumes bytes are UTF-8 strings. This holds for attributes. - return val.decode("utf-8") - return val - - if __name__ == '__main__': - netcdf_to_zarr(sys.argv[1], sys.argv[2]) + mosaic_to_zarr(*sys.argv[1:]) diff --git a/harmony_netcdf_to_zarr/download_utilities.py b/harmony_netcdf_to_zarr/download_utilities.py index abbf75b..dcd6464 100644 --- a/harmony_netcdf_to_zarr/download_utilities.py +++ b/harmony_netcdf_to_zarr/download_utilities.py @@ -7,6 +7,7 @@ from copy import deepcopy from logging import Logger from multiprocessing import Manager, Process, Queue +from multiprocessing.managers import Namespace from os import cpu_count from queue import Empty as QueueEmpty from typing import List @@ -21,16 +22,23 @@ def download_granules(netcdf_urls: List[str], destination_directory: str, CPU cores. For further explanation, see documentation on "multi-track drifting" + The number of processes is limited to the minimum of: + + * The number of available CPUs. + * The number of requested NetCDF-4 files. + * An user-supplied number of processes (if specified). + """ logger.info('Beginning granule downloads.') if process_count is None: - process_count = cpu_count() + process_count = min(cpu_count(), len(netcdf_urls)) else: - process_count = min(process_count, cpu_count()) + process_count = min(process_count, cpu_count(), netcdf_urls) with Manager() as manager: download_queue = manager.Queue(len(netcdf_urls)) + shared_namespace = manager.Namespace() local_paths = manager.list() for netcdf_url in netcdf_urls: @@ -38,9 +46,9 @@ def download_granules(netcdf_urls: List[str], destination_directory: str, # Spawn a worker process for each CPU being used processes = [Process(target=_download_worker, - args=(download_queue, local_paths, - destination_directory, access_token, - harmony_config, logger)) + args=(download_queue, shared_namespace, + local_paths, destination_directory, + access_token, harmony_config, logger)) for _ in range(process_count)] for download_process in processes: @@ -49,12 +57,12 @@ def download_granules(netcdf_urls: List[str], destination_directory: str, # Ensure worker processes exit successfully for download_process in processes: download_process.join() - if download_process.exitcode != 0: - raise RuntimeError('Download failed - exit code: ' - f'{download_process.exitcode}') - download_process.close() + if hasattr(shared_namespace, 'exception'): + raise RuntimeError('Download failed: ' + f'{shared_namespace.exception}') + # Copy paths so they persist outside of the Manager context. download_paths = deepcopy(local_paths) @@ -63,9 +71,9 @@ def download_granules(netcdf_urls: List[str], destination_directory: str, return download_paths -def _download_worker(download_queue: Queue, local_paths: List, - destination_dir: str, access_token: str, - harmony_config: Config, logger: Logger): +def _download_worker(download_queue: Queue, shared_namespace: Namespace, + local_paths: List, destination_dir: str, + access_token: str, harmony_config: Config, logger: Logger): """ A method to be executed in a separate process. This will check for items in the queue, which correspond to URLs for NetCDF-4 files to download. If there is at least one URL left for download, then it is @@ -75,13 +83,20 @@ def _download_worker(download_queue: Queue, local_paths: List, administered by the process manager instance. """ - while not download_queue.empty(): + while not hasattr(shared_namespace, 'exception') and not download_queue.empty(): try: netcdf_url = download_queue.get_nowait() except QueueEmpty: break - local_path = download(netcdf_url, destination_dir, logger=logger, - access_token=access_token, cfg=harmony_config) - - local_paths.append(local_path) + try: + local_path = download(netcdf_url, destination_dir, logger=logger, + access_token=access_token, + cfg=harmony_config) + + local_paths.append(local_path) + except Exception as exception: + # If there was an issue, save a string message from the raised + # exception. This will cause other processes to stop downloads. + shared_namespace.exception = str(exception) + raise exception diff --git a/harmony_netcdf_to_zarr/mosaic_utilities.py b/harmony_netcdf_to_zarr/mosaic_utilities.py index 2e418df..c7d08b7 100644 --- a/harmony_netcdf_to_zarr/mosaic_utilities.py +++ b/harmony_netcdf_to_zarr/mosaic_utilities.py @@ -261,8 +261,11 @@ def _get_temporal_output_dimension(self, output_dimension_units = dimension_units[np.argmin(dimension_epochs)] all_input_values = np.unique( - np.concatenate([dimension_input.get_values(output_dimension_units) - for dimension_input in dimension_inputs.values()]) + np.concatenate( + [dimension_input.get_values(output_dimension_units) + for dimension_input in dimension_inputs.values()], + dtype=list(dimension_inputs.values())[0].get_values().dtype + ) ) return self._get_output_dimension(dimension_name, all_input_values, diff --git a/requirements/core.txt b/requirements/core.txt index 0c1f27d..86277a4 100644 --- a/requirements/core.txt +++ b/requirements/core.txt @@ -1,7 +1,7 @@ boto3 ~= 1.14 git+https://github.com/nasa/harmony-service-lib-py@main#egg=harmony-service-lib netCDF4 ~= 1.5 -numpy ~= 1.18.0 +numpy ~= 1.21.0 python-dateutil ~= 2.8.2 s3fs ~= 0.4.0 zarr ~= 2.4.0 diff --git a/tests/test_adapter.py b/tests/test_adapter.py index 7d42e90..24fd60d 100644 --- a/tests/test_adapter.py +++ b/tests/test_adapter.py @@ -7,19 +7,15 @@ import os import tempfile import textwrap -import unittest from shutil import rmtree from tempfile import mkdtemp +from unittest import TestCase from unittest.mock import patch -import boto3 -import s3fs -import zarr from harmony.util import config as harmony_config -from moto import mock_s3 -from multiprocessing.popen_fork import Popen as mp_Popen -from multiprocessing import util as mp_util +from netCDF4 import Dataset from pystac import Catalog +from zarr import DirectoryStore, open_consolidated from harmony.message import Message from harmony_netcdf_to_zarr.__main__ import main @@ -32,74 +28,36 @@ logger = logging.getLogger() -def mock_mp_fork_popen_launch(class_object, process_obj): - """ `moto` is used to mock S3, however, `moto` does not work with - multiprocessing, because it holds objects in memory, and then the - objects are gone when the child processes quit. This method mocks - `multiprocessing.popen_fork.Popen._launch` to do the work in the - parent process. - - Parameters - ---------- - class_object: object - class object for multiprocessing.popen_fork.Popen - process_obj: object - task object to be processed by multiprocessing.popen_fork.Popen._launch - """ - - def attempt_to_close(*fds): - """ Mimic `multiprocessing.util.close_fds`, but wrap the `os.close` - statement with a test for `None`, to avoid issues with tests. - - """ - for fd in fds: - if fd is not None: - os.close(fd) - - code = 1 - parent_r, child_w = os.pipe() - child_r, parent_w = os.pipe() - class_object.pid = os.fork() - if class_object.pid == 0: - code = 0 - os._exit(code) - else: - code = process_obj._bootstrap() - os.close(child_w) - os.close(child_r) - class_object.finalizer = mp_util.Finalize(class_object, attempt_to_close, - (parent_r, parent_w,)) - class_object.sentinel = parent_r - - -class TestAdapter(unittest.TestCase): +class TestAdapter(TestCase): """ Tests the Harmony adapter """ def setUp(self): self.maxdiff = None self.config = harmony_config(validate=False) self.metadata_dir = mkdtemp() + self.temp_dir = mkdtemp() def tearDown(self): rmtree(self.metadata_dir) + rmtree(self.temp_dir) + @patch('harmony_netcdf_to_zarr.adapter.make_s3fs') + @patch('harmony_netcdf_to_zarr.convert.make_s3fs') @patch('harmony_netcdf_to_zarr.adapter.download_granules') @patch.dict(os.environ, MOCK_ENV) - @patch.object(mp_Popen, '_launch', new=mock_mp_fork_popen_launch) - @mock_s3 - def test_end_to_end_file_conversion(self, mock_download): + def test_end_to_end_file_conversion(self, mock_download, mock_make_s3fs, + mock_make_s3fs_adapter): """ Full end-to-end test of the adapter from call to `main` to Harmony STAC catalog output, including ensuring the contents of the file are correct. - Mocks S3 interactions using @mock_s3. - Also mocks download of granules due to `moto` and `multiprocessing` - incompatibility issues. + Mocks S3 interactions with a local Zarr file store and download of + granules due to `moto` and `multiprocessing` incompatibility + issues. """ - conn = boto3.resource('s3') - conn.create_bucket( - Bucket='example-bucket', - CreateBucketConfiguration={'LocationConstraint': os.environ['AWS_DEFAULT_REGION']}) + local_zarr = DirectoryStore(os.path.join(self.temp_dir, 'test.zarr')) + mock_make_s3fs_adapter.return_value.get_mapper.return_value = local_zarr + mock_make_s3fs.return_value.get_mapper.return_value = local_zarr netcdf_file = create_full_dataset() stac_catalog_path = create_input_catalog([netcdf_file]) @@ -136,10 +94,7 @@ def test_end_to_end_file_conversion(self, mock_download): self.assertEqual(output_item.common_metadata.end_datetime, input_item.common_metadata.end_datetime) - # Open the Zarr file using the URL from the output STAC item: - zarr_location = output_item.assets['data'].href - store = s3fs.S3FileSystem().get_mapper(root=zarr_location, check=False) - out = zarr.open_consolidated(store) + out = open_consolidated(local_zarr) # -- Hierarchical Structure Assertions -- contents = textwrap.dedent(""" @@ -197,25 +152,23 @@ def test_end_to_end_file_conversion(self, mock_download): # 1D Root-Level Float Array sharing its name with a dimension self.assertEqual(out['time'][0], 166536) + @patch('harmony_netcdf_to_zarr.adapter.make_s3fs') + @patch('harmony_netcdf_to_zarr.convert.make_s3fs') @patch('harmony_netcdf_to_zarr.adapter.download_granules') @patch.dict(os.environ, MOCK_ENV) - @patch.object(mp_Popen, '_launch', new=mock_mp_fork_popen_launch) - @mock_s3 - def test_end_to_end_large_file_conversion(self, mock_download): + def test_end_to_end_large_file_conversion(self, mock_download, + mock_make_s3fs, + mock_make_s3fs_adapter): """ Full end-to-end test of the adapter to make sure rechunk is working. Mocks S3 interactions using @mock_s3. - Currently only uses a single NetCDF-4 input, as concatenation will - be supported only starting with DAS-1379. - Mocks download of granules due to `moto` and `multiprocessing` incompatibility issues. """ - conn = boto3.resource('s3') - conn.create_bucket( - Bucket='example-bucket', - CreateBucketConfiguration={'LocationConstraint': os.environ['AWS_DEFAULT_REGION']}) + local_zarr = DirectoryStore(os.path.join(self.temp_dir, 'test.zarr')) + mock_make_s3fs_adapter.return_value.get_mapper.return_value = local_zarr + mock_make_s3fs.return_value.get_mapper.return_value = local_zarr netcdf_file = create_large_dataset() stac_catalog_path = create_input_catalog([netcdf_file]) @@ -238,27 +191,147 @@ def test_end_to_end_large_file_conversion(self, mock_download): output_items = list(output_catalog.get_items()) self.assertEqual(len(output_items), 2) + out = open_consolidated(local_zarr) + + # -- Hierarchical Structure Assertions -- + contents = textwrap.dedent(""" + / + ├── data + │ └── var (10000,) int32 + └── dummy_dim (10000,) int32 + """).strip() + self.assertEqual(str(out.tree()), contents) + + # -- Data Assertions -- + self.assertEqual(out['data/var'].chunks, (10000,)) + + @patch('harmony_netcdf_to_zarr.adapter.make_s3fs') + @patch('harmony_netcdf_to_zarr.convert.make_s3fs') + @patch('harmony_netcdf_to_zarr.adapter.download_granules') + @patch.dict(os.environ, MOCK_ENV) + def test_end_to_end_mosaic_conversion(self, mock_download, mock_make_s3fs, + mock_make_s3fs_adapter): + """ Full end-to-end test of the adapter from call to `main` to Harmony + STAC catalog output for multiple input granules, including ensuring + the contents of the file are correct. This should produce a single + Zarr output. + + Mocks S3 interactions with a local Zarr file store and download of + granules due to `moto` and `multiprocessing` incompatibility + issues. + + """ + local_zarr = DirectoryStore(os.path.join(self.temp_dir, 'test.zarr')) + mock_make_s3fs_adapter.return_value.get_mapper.return_value = local_zarr + mock_make_s3fs.return_value.get_mapper.return_value = local_zarr + + # Create mock data. Science variable and time for second NetCDF-4 must + # be different to first to allow mosaic testing. + first_file = create_full_dataset() + second_file = create_full_dataset() + + with Dataset(second_file, 'r+') as dataset: + dataset['time'][:] += 1800 + dataset['/data/vertical/north'][:] += 1 + dataset['/data/vertical/south'][:] += 1 + + stac_catalog_path = create_input_catalog([first_file, second_file]) + mock_download.return_value = [first_file, second_file] + + try: + message = mock_message() + main(['harmony_netcdf_to_zarr', '--harmony-action', 'invoke', + '--harmony-input', message, '--harmony-sources', + stac_catalog_path, '--harmony-metadata-dir', + self.metadata_dir], + config=self.config) + finally: + os.remove(first_file) + os.remove(second_file) + + # Assertions to ensure STAC output contains correct items, and the + # new output item has the correct temporal and spatial extents + output_catalog = Catalog.from_file(os.path.join(self.metadata_dir, + 'catalog.json')) + # There should be two items in the output catalog - the input and + # the output Zarr store + output_items = list(output_catalog.get_items()) + self.assertEqual(len(output_items), 3) + # The Zarr STAC item will not start with 'id', unlike the input items. # The first input Zarr item will have 'id0': + input_item = output_catalog.get_item('id0') output_item = next(item for item in output_items if not item.id.startswith('id')) - # Open the Zarr file using the URL from the output STAC item: - zarr_location = output_item.assets['data'].href - store = s3fs.S3FileSystem().get_mapper(root=zarr_location, check=False) - out = zarr.open_consolidated(store) + self.assertListEqual(output_item.bbox, input_item.bbox) + self.assertEqual(output_item.common_metadata.start_datetime, + input_item.common_metadata.start_datetime) + self.assertEqual(output_item.common_metadata.end_datetime, + input_item.common_metadata.end_datetime) + + out = open_consolidated(local_zarr) # -- Hierarchical Structure Assertions -- contents = textwrap.dedent(""" / ├── data - │ └── var (10000,) int32 - └── dummy_dim (10000,) int32 + │ ├── horizontal + │ │ ├── east (2, 3, 3) int64 + │ │ └── west (2, 3, 3) uint8 + │ └── vertical + │ ├── north (2, 3, 3) uint8 + │ └── south (2, 3, 3) uint8 + ├── location + │ ├── lat (3, 3) float32 + │ └── lon (3, 3) float32 + └── time (2,) int32 """).strip() self.assertEqual(str(out.tree()), contents) + # -- Metadata Assertions -- + # Root level values + self.assertEqual(dict(out.attrs), ROOT_METADATA_VALUES) + + # Group metadata + self.assertEqual(out['data'].attrs['description'], 'Group to hold the data') + + # Variable metadata + var = out['data/vertical/north'] + self.assertEqual(var.attrs['coordinates'], 'lon lat') + # -- Data Assertions -- - self.assertEqual(out['data/var'].chunks, (10000,)) + # Nested Byte Arrays + self.assertEqual(out['data/vertical/north'][0, 0, 2], 16) + self.assertEqual(out['data/vertical/north'][0, 2, 0], 0) + self.assertEqual(out['data/vertical/south'][0, 2, 0], 16) + self.assertEqual(out['data/vertical/south'][0, 0, 2], 0) + self.assertEqual(out['data/horizontal/east'][0, 2, 2], 16) # scale_factor = 2 + self.assertEqual(out['data/horizontal/east'][0, 0, 0], 0) + self.assertEqual(out['data/horizontal/west'][0, 0, 0], 16) + self.assertEqual(out['data/horizontal/west'][0, 2, 2], 0) + self.assertEqual(out['data/vertical/north'][1, 0, 2], 17) + self.assertEqual(out['data/vertical/north'][1, 2, 0], 1) + self.assertEqual(out['data/vertical/south'][1, 2, 0], 17) + self.assertEqual(out['data/vertical/south'][1, 0, 2], 1) + + # 'east' attributes scale_factor removed + self.assertFalse(hasattr(out['data/horizontal/east'], 'scale_factor')) + + # 'east' attributes present and scaled + self.assertEqual(out['data/horizontal/east'].attrs['valid_range'], [0.0, 50.0]) + self.assertEqual(out['data/horizontal/east'].attrs['valid_min'], 0.0) + self.assertEqual(out['data/horizontal/east'].attrs['valid_max'], 50.0) + self.assertEqual(out['data/horizontal/east'].attrs['_FillValue'], 254.0) + self.assertFalse(hasattr(out['data/horizontal/east'], 'missing_value')) + + # 2D Nested Float Arrays + self.assertEqual(out['location/lat'][0, 1], 5.5) + self.assertEqual(out['location/lon'][0, 1], -5.5) + + # 1D Root-Level Float Array sharing its name with a dimension + self.assertEqual(out['time'][0], 166536) + self.assertEqual(out['time'][1], 168336) @patch.object(argparse.ArgumentParser, 'error', return_value=None) def test_does_not_accept_non_harmony_clis(self, argparse_error): @@ -270,14 +343,15 @@ def test_does_not_accept_non_harmony_clis(self, argparse_error): @patch.dict(os.environ, dict(USE_LOCALSTACK='true', LOCALSTACK_HOST='fake-host')) def test_localstack_client(self): - """ - Tests that when USE_LOCALSTACK and LOCALSTACK_HOST are supplied the adapter uses localstack + """ Tests that when USE_LOCALSTACK and LOCALSTACK_HOST are supplied the + adapter uses localstack + """ adapter = NetCDFToZarrAdapter(Message(mock_message())) - self.assertEqual(adapter.s3.client_kwargs['endpoint_url'], 'http://fake-host:4572') + self.assertEqual(adapter.s3.client_kwargs['endpoint_url'], + 'http://fake-host:4572') @patch.dict(os.environ, MOCK_ENV) - @mock_s3 def test_conversion_failure(self): """ Tests that when file conversion fails, e.g. due to a corrupted input file, Harmony catches an exception and them the exception diff --git a/tests/test_convert.py b/tests/test_convert.py deleted file mode 100644 index 072112f..0000000 --- a/tests/test_convert.py +++ /dev/null @@ -1,66 +0,0 @@ -""" Tests the Harmony convert module """ -from unittest import TestCase -from pytest import raises - -from harmony_netcdf_to_zarr.convert import compute_chunksize - - -class TestConvert(TestCase): - """ - Tests the Harmony adapter - """ - def setUp(self): - pass - - def test_compute_chunksize_small(self): - """ - Test of compute_chunksize method for a small input shape - """ - chunksize_expected = (100, 100, 100) - chunksize_result = compute_chunksize(shape=(100, 100, 100), datatype='f8') - self.assertTupleEqual(chunksize_expected, chunksize_result) - - def test_compute_chunksize_medium(self): - """ - Test of compute_chunksize method for a medium input shape - """ - chunksize_expected = (100, 140, 140) - chunksize_result = compute_chunksize(shape=(100, 1000, 1000), datatype='f8') - self.assertTupleEqual(chunksize_expected, chunksize_result) - - def test_compute_chunksize_large(self): - """ - Test of compute_chunksize method for a large input shape - """ - chunksize_expected = (125, 125, 125) - chunksize_result = compute_chunksize(shape=(1000, 1000, 1000), datatype='f8') - self.assertTupleEqual(chunksize_expected, chunksize_result) - - def test_compute_chunksize_with_compression_args(self): - """ - Test of compute_chunksize method with non-default compression args - """ - chunksize_expected = (100, 680, 680) - chunksize_result = compute_chunksize(shape=(100, 1000, 1000), - datatype='i4', - compression_ratio=6.8, - compressed_chunksize_byte='26.8 Mi') - self.assertTupleEqual(chunksize_expected, chunksize_result) - - def test_compute_chunksize_wrong_arguments(self): - """ - Test of compute_chunksize method for a large input shape - """ - with raises(ValueError) as execinfo: - compute_chunksize(shape=(100, 1000, 1000), - datatype='i4', - compression_ratio=6.8, - compressed_chunksize_byte='26.8 MB') - err_message_expected = """Chunksize needs to be either an integer or string. -If it's a string, assuming it follows NIST standard for binary prefix - (https://physics.nist.gov/cuu/Units/binary.html) -except that only Ki, Mi, and Gi are allowed.""" - self.assertEqual(str(execinfo.value), err_message_expected) - - def tearDown(self): - pass diff --git a/tests/unit/test_convert.py b/tests/unit/test_convert.py new file mode 100644 index 0000000..c3c8481 --- /dev/null +++ b/tests/unit/test_convert.py @@ -0,0 +1,251 @@ +""" Tests the Harmony convert module """ +from datetime import datetime +from os.path import join as path_join +from pytest import raises +from shutil import rmtree +from tempfile import mkdtemp +from unittest import TestCase +from unittest.mock import Mock, patch + +from netCDF4 import Dataset +from numpy.testing import assert_array_equal +from zarr import DirectoryStore, group as create_zarr_group +import numpy as np + +from harmony_netcdf_to_zarr.convert import (__copy_attrs as copy_attrs, + __copy_group as copy_group, + compute_chunksize, + __get_aggregated_shape as get_aggregated_shape, + __insert_data_slice as insert_data_slice, + scale_attribute) +from harmony_netcdf_to_zarr.mosaic_utilities import DimensionsMapping +from tests.util.file_creation import create_gpm_dataset + + +class TestConvert(TestCase): + """ Tests the functions in `harmony_netcdf_to_zarr.convert.py`. """ + def setUp(self): + self.temp_dir = mkdtemp() + + def tearDown(self): + rmtree(self.temp_dir) + + def test_compute_chunksize_small(self): + """ Test of compute_chunksize method for a small input shape """ + chunksize_expected = (100, 100, 100) + chunksize_result = compute_chunksize(shape=(100, 100, 100), datatype='f8') + self.assertTupleEqual(chunksize_expected, chunksize_result) + + def test_compute_chunksize_medium(self): + """ Test of compute_chunksize method for a medium input shape """ + chunksize_expected = (100, 140, 140) + chunksize_result = compute_chunksize(shape=(100, 1000, 1000), datatype='f8') + self.assertTupleEqual(chunksize_expected, chunksize_result) + + def test_compute_chunksize_large(self): + """ Test of compute_chunksize method for a large input shape """ + chunksize_expected = (125, 125, 125) + chunksize_result = compute_chunksize(shape=(1000, 1000, 1000), datatype='f8') + self.assertTupleEqual(chunksize_expected, chunksize_result) + + def test_compute_chunksize_with_compression_args(self): + """ Test of compute_chunksize method with non-default compression + args + + """ + chunksize_expected = (100, 680, 680) + chunksize_result = compute_chunksize(shape=(100, 1000, 1000), + datatype='i4', + compression_ratio=6.8, + compressed_chunksize_byte='26.8 Mi') + self.assertTupleEqual(chunksize_expected, chunksize_result) + + def test_compute_chunksize_wrong_arguments(self): + """ Test of compute_chunksize method for a large input shape """ + with raises(ValueError) as execinfo: + compute_chunksize(shape=(100, 1000, 1000), + datatype='i4', + compression_ratio=6.8, + compressed_chunksize_byte='26.8 MB') + err_message_expected = ( + 'Chunksize needs to be either an integer or string. If it\'s a ' + 'string, assuming it follows NIST standard for binary prefix ' + '(https://physics.nist.gov/cuu/Units/binary.html) except that ' + 'only Ki, Mi, and Gi are allowed.' + ) + self.assertEqual(str(execinfo.value), err_message_expected) + + def test_scale_attribute(self): + """ Ensure an attribute is correctly retrieved and scaled from a + NetCDF-4 variable. + + This should include either a single value attribute, or an + attribute with a list type value. + + """ + with self.subTest('Single value attribute'): + with Dataset('test.nc4', diskless=True, mode='w') as dataset: + test_variable = dataset.createVariable('var_name', np.float64) + test_variable.setncattr('float_value', 2.0) + + self.assertEqual( + scale_attribute(test_variable, 'float_value', 3.0, 4.0), + 10.0 + ) + + with self.subTest('List value attribute'): + with Dataset('test2.nc4', diskless=True, mode='w') as dataset: + test_variable = dataset.createVariable('list_var', np.float64) + test_variable.setncattr('list_value', [1.0, 2.0, 3.0]) + + self.assertListEqual( + scale_attribute(test_variable, 'list_value', 3.0, 4.0), + [7.0, 10.0, 13.0] + ) + + def test_copy_attrs(self): + """ Ensure that attributes are copied to a Zarr store, and that any + pre-existing attributes are not removed or overwritten. + + """ + + with self.subTest('Attributes only updated.'): + zarr_store = DirectoryStore(path_join(self.temp_dir, 'test.zarr')) + zarr_group = create_zarr_group(zarr_store) + zarr_group.attrs.put({'attr_one': 'val_one', 'attr_two': 'val_two'}) + + with Dataset('test.nc4', diskless=True, mode='w') as dataset: + dataset.setncattr('attr_three', 'val_three') + dataset.setncattr('attr_two', 'val_four') + + copy_attrs(dataset, zarr_group, {'scaled': 1.0}) + + self.assertDictEqual(zarr_group.attrs.asdict(), + {'attr_one': 'val_one', + 'attr_two': 'val_two', + 'attr_three': 'val_three', + 'scaled': 1.0}) + + with self.subTest('scaled_factor and add_offset omitted.'): + zarr_store = DirectoryStore(path_join(self.temp_dir, 'test_two.zarr')) + zarr_group = create_zarr_group(zarr_store) + + with Dataset('test.nc4', diskless=True, mode='w') as dataset: + dataset.setncattr('scale_factor', 1.0) + dataset.setncattr('add_offset', 0.0) + dataset.setncattr('units', 'm') + + copy_attrs(dataset, zarr_group) + + self.assertDictEqual(zarr_group.attrs.asdict(), {'units': 'm'}) + + def test_get_aggregated_shape(self): + """ Ensure that correct shape is retrieved in each of the four + possible situations: + + * An aggregated dimension should return new 1-D shape. + * An aggregated bounds variable should return new 2-D shape. + * A variable with an aggregated dimension should return new shape. + * A variable with no aggregated dimensions should return old shape. + + """ + local_file_one = create_gpm_dataset(self.temp_dir, + datetime(2021, 2, 28, 3, 30)) + + local_file_two = create_gpm_dataset(self.temp_dir, + datetime(2021, 2, 28, 4, 00)) + dim_mapping = DimensionsMapping([local_file_one, local_file_two]) + aggregated_dimensions = ['/Grid/time', '/Grid/time_bnds'] + + with Dataset(local_file_one, 'r') as dataset: + with self.subTest('Aggregated dimension'): + self.assertTupleEqual( + get_aggregated_shape(dataset['/Grid/time'], dim_mapping, + aggregated_dimensions), + (2, ) + ) + + with self.subTest('Aggregated bounds variable'): + self.assertTupleEqual( + get_aggregated_shape(dataset['/Grid/time_bnds'], + dim_mapping, aggregated_dimensions), + (2, 2) + ) + + with self.subTest('Science variable with aggregated dimension'): + self.assertTupleEqual( + get_aggregated_shape(dataset['/Grid/precipitationCal'], + dim_mapping, aggregated_dimensions), + (2, 3600, 1800) + ) + + with self.subTest('Variable with non aggregated dimension'): + self.assertTupleEqual( + get_aggregated_shape(dataset['/Grid/lon'], + dim_mapping, aggregated_dimensions), + dataset['/Grid/lon'].shape + ) + + def test_insert_dataset_slice(self): + """ Ensure that an input granule is inserted into the correct region + of the output Zarr store dataset. + + """ + local_file_one = create_gpm_dataset(self.temp_dir, + datetime(2021, 2, 28, 3, 30)) + + local_file_two = create_gpm_dataset(self.temp_dir, + datetime(2021, 2, 28, 4, 00)) + dim_mapping = DimensionsMapping([local_file_one, local_file_two]) + output_shape = (2, 3600, 1800) + output_chunks = compute_chunksize(output_shape, np.float32) + + zarr_store = DirectoryStore(path_join(self.temp_dir, 'test.zarr')) + zarr_group = create_zarr_group(zarr_store) + zarr_variable = zarr_group.create_dataset('precipitationCal', + shape=output_shape, + chunks=output_chunks, + dtype=np.float64, + fill_value=-9999.0) + + with Dataset(local_file_one, 'r') as dataset: + insert_data_slice(dataset['/Grid/precipitationCal'], zarr_variable, + '/Grid/precipitationCal', dim_mapping) + + assert_array_equal(zarr_variable[0][:], + dataset['/Grid/precipitationCal'][0]) + + # Second time slice in the lon/lat plane should still be all fill values: + assert_array_equal(zarr_variable[1][:], np.ones((3600, 1800)) * -9999.0) + + @patch('harmony_netcdf_to_zarr.convert.__copy_variable') + def test_copy_group(self, mock_copy_variable): + """ Ensure that the copy_group function recurses to the point where + the `__copy_variable` function is called for each variable. + + """ + test_granule = create_gpm_dataset(self.temp_dir, + datetime(2021, 2, 28, 3, 30)) + + dim_mapping = DimensionsMapping([test_granule]) + aggregated_dims = ['/Grid/time', '/Grid/time_bnds'] + zarr_store = DirectoryStore(path_join(self.temp_dir, 'test.zarr')) + zarr_group = create_zarr_group(zarr_store) + zarr_synchronizer = Mock() + + with Dataset(test_granule, 'r') as dataset: + copy_group(dataset, zarr_group, zarr_synchronizer, dim_mapping, + aggregated_dims) + all_input_variables = set(dataset['/Grid'].variables.keys()) + + # There are 3 dimension variables, 3 bounds variables and 10 other + # gridded variables. + self.assertEqual(mock_copy_variable.call_count, 16) + + # The fourth argument in the call to `__copy_variable` is the variable + # name. + all_output_variables = {call[0][3] + for call + in mock_copy_variable.call_args_list} + + self.assertSetEqual(all_input_variables, all_output_variables) diff --git a/tests/unit/test_download_utilities.py b/tests/unit/test_download_utilities.py index 3c98ac0..08d068b 100644 --- a/tests/unit/test_download_utilities.py +++ b/tests/unit/test_download_utilities.py @@ -1,11 +1,14 @@ """ Unit tests for the `harmony_netcdf_to_zarr.download_utilities` module. """ from logging import getLogger +from multiprocessing import Queue +from multiprocessing.managers import Namespace from os import remove as remove_file from os.path import dirname, exists as file_exists, join as join_path from pathlib import Path from shutil import rmtree from tempfile import mkdtemp from unittest import TestCase +from unittest.mock import patch from harmony.util import config @@ -62,8 +65,15 @@ def test_download_granules_failure(self): error downloading one of the granules. In this test, the error will be caused by requesting a file with an unknown protocol. + The `RuntimeError` that is raised should also preserve the message + from the exception that was raised within the child process. + """ - with self.assertRaises(RuntimeError): + with self.assertRaises(RuntimeError) as context_manager: download_granules(['unknown_protocol_file.nc4'], self.temp_dir, self.access_token, self.harmony_config, self.logger) + + self.assertTrue(str(context_manager.exception).startswith( + 'Unable to download a url of unknown type' + )) diff --git a/tests/util/file_creation.py b/tests/util/file_creation.py index 0248394..9fa1d2d 100644 --- a/tests/util/file_creation.py +++ b/tests/util/file_creation.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +from datetime import datetime from os.path import join as path_join from tempfile import mkdtemp, mkstemp from typing import List @@ -152,3 +153,83 @@ def create_large_dataset(filename=None): data_var[:] = np.arange(num_points) return filename + + +def create_gpm_dataset(test_dir: str, file_datetime: datetime) -> str: + """ Create a granule representative of GPM data. """ + epoch_datetime = datetime(1970, 1, 1) + filename = path_join(test_dir, f'{uuid4()}.nc4') + + science_variables = ['HQObservationTime', 'HQPrecipitation', + 'HQPrecipSource', 'IRkalmanFilterWeight', + 'IRPrecipitation', 'precipitationCal', + 'precipitationQualityIndex', 'precipitationUncal', + 'probabilityLiquidPrecipitation', 'randomError'] + + with Dataset(filename, 'w') as dataset: + grid_group = dataset.createGroup('Grid') + grid_group.createDimension('lat', 1800) + lat = grid_group.createVariable('lat', np.float32, ('lat',)) + lat[:] = np.linspace(-89.95, 89.95, 1800, dtype=np.float32) + lat.setncattr('units', 'degrees_north') + lat.setncattr('bounds', 'lat_bnds') + lat.setncattr('standard_name', 'latitude') + lat.setncattr('axis', 'Y') + + grid_group.createDimension('latv', 2) + lat_bounds = grid_group.createVariable('lat_bnds', np.float32, + ('lat', 'latv')) + lat_bounds_values = np.array([ + np.linspace(-90.0, 89.9, 1800, dtype=np.float32), + np.linspace(-89.9, 90.0, 1800, dtype=np.float32) + ], dtype=np.float32) + + lat_bounds[:] = lat_bounds_values.T + lat_bounds.setncattr('units', 'degrees_north') + + grid_group.createDimension('lon', 3600) + lon = grid_group.createVariable('lon', np.float32, ('lon',)) + lon[:] = np.linspace(-179.95, 179.95, 3600, dtype=np.float32) + lon.setncattr('units', 'degrees_east') + lon.setncattr('bounds', 'lon_bnds') + lon.setncattr('standard_name', 'longitude') + lon.setncattr('axis', 'X') + + grid_group.createDimension('lonv', 2) + lon_bounds = grid_group.createVariable('lon_bnds', np.float32, + ('lon', 'lonv')) + + lon_bounds_values = np.array([ + np.linspace(-180.0, 179.9, 3600, dtype=np.float32), + np.linspace(-179.9, 180.0, 3600, dtype=np.float32) + ], dtype=np.float32) + + lon_bounds[:] = lon_bounds_values.T + lon_bounds.setncattr('units', 'degrees_east') + + grid_group.createDimension('time', 1) + time_variable = grid_group.createVariable('time', np.float64, + ('time',)) + time_variable[0] = (file_datetime - epoch_datetime).total_seconds() + time_variable.setncattr('units', 'seconds since 1970-01-01T00:00:00') + time_variable.setncattr('bounds', 'time_bnds') + time_variable.setncattr('standard_name', 'time') + time_variable.setncattr('calendar', 'julian') + time_variable.setncattr('axis', 'T') + + grid_group.createDimension('nv', 2) + time_bounds = grid_group.createVariable('time_bnds', np.float64, + ('time', 'nv')) + time_bounds[0][0] = time_variable[0] + time_bounds[0][1] = time_variable[0] + 1800.0 + time_bounds.setncattr('units', 'seconds since 1970-01-01T00:00:00') + + for variable_name in science_variables: + variable = grid_group.createVariable(variable_name, np.float64, + ('time', 'lon', 'lat'), + fill_value=-9999) + variable[:] = np.random.rand(1, 3600, 1800) + variable.setncattr('coordinates', 'time lon lat') + variable.setncattr('DimensionNames', 'time,lon,lat') + + return filename