diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index 4d1e336e..4ea12fe6 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -182,6 +182,8 @@ def create_batches( # Split locations per example into batches: n_batches = len(spatial_and_temporal_locations_of_each_example) // batch_size locations_for_batches = [] + logger.warning("xxxxx") + logger.warning(n_batches) for batch_idx in range(n_batches): start_example_idx = batch_idx * batch_size end_example_idx = (batch_idx + 1) * batch_size @@ -193,7 +195,7 @@ def create_batches( # Loop round each batch: for n_batches_processed, locations_for_batch in enumerate(locations_for_batches): batch_idx = idx_of_first_batch + n_batches_processed - logger.debug(f"{self.__class__.__name__} creating batch {batch_idx}!") + logger.warning(f"{self.__class__.__name__} creating batch {batch_idx}!") # Generate batch. batch = self.get_batch( @@ -205,6 +207,7 @@ def create_batches( # Save batch to disk. netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx) batch.to_netcdf(netcdf_filename) + logger.warning(f"Save file to {netcdf_filename}") # Upload if necessary. if ( diff --git a/nowcasting_dataset/dataset/split/split.py b/nowcasting_dataset/dataset/split/split.py index 4f1e134b..f873728e 100644 --- a/nowcasting_dataset/dataset/split/split.py +++ b/nowcasting_dataset/dataset/split/split.py @@ -200,6 +200,10 @@ def split_data( logger.debug("Split data done!") for split_name, dt in split_datetimes._asdict().items(): - logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}") + if len(dt) == 0: + # only a warning is made as this may happen during unittests + logger.warning(f"{split_name} has {len(dt):,d} datetimes") + else: + logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}") return split_datetimes diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index 4f9feed9..dbff97d3 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -10,15 +10,24 @@ _LOG = logging.getLogger("nowcasting_dataset") -def upload_and_delete_local_files(dst_path: str, local_path: Path): +def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[str, Path]): """ Upload an entire folder and delete local files to either AWS or GCP """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) - filesystem.put(str(local_path), dst_path, recursive=True) + + _LOG.warning(f"moving files from {local_path} to {dst_path}") + + _LOG.warning(get_all_filenames_in_path(local_path)) + _LOG.warning(get_all_filenames_in_path(dst_path)) + + filesystem.put(str(local_path) + "/", str(dst_path) + "/", recursive=True) delete_all_files_in_temp_path(local_path) + _LOG.warning(get_all_filenames_in_path(local_path)) + _LOG.warning(get_all_filenames_in_path(dst_path)) + def get_filesystem(path: Union[str, Path]) -> fsspec.AbstractFileSystem: r"""Get the fsspect FileSystem from a path. diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 9383a73e..f7f24795 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -369,6 +369,14 @@ def create_batches(self, overwrite_batches: bool) -> None: for worker_id, (data_source_name, data_source) in enumerate( self.data_sources.items() ): + + if len(locations_for_split) == 0: + # not raising error as this is ok for unittests + logger.warning( + f"Not create batches for {split_name} as there are no locations" + ) + break + # Get indexes of first batch and example. And subset locations_for_split. idx_of_first_batch = first_batches_to_create[split_name][data_source_name] idx_of_first_example = idx_of_first_batch * self.config.process.batch_size @@ -389,10 +397,17 @@ def create_batches(self, overwrite_batches: bool) -> None: nd_fs_utils.makedirs(dst_path, exist_ok=True) if self.save_batches_locally_and_upload: nd_fs_utils.makedirs(local_temp_path, exist_ok=True) + else: + logger.warning( + f"Not saving uploading batches so have not made {local_temp_path}" + ) # Submit data_source.create_batches task to the worker process. - future = executor.submit( - data_source.create_batches, + print(executor) + # future = executor.submit( + logger.warning("Making batches") + logger.warning(locations) + data_source.create_batches( spatial_and_temporal_locations_of_each_example=locations, idx_of_first_batch=idx_of_first_batch, batch_size=self.config.process.batch_size, @@ -400,7 +415,7 @@ def create_batches(self, overwrite_batches: bool) -> None: local_temp_path=local_temp_path, upload_every_n_batches=self.config.process.upload_every_n_batches, ) - future_create_batches_jobs.append(future) + # future_create_batches_jobs.append(future) # Wait for all futures to finish: for future, data_source_name in zip( diff --git a/requirements.txt b/requirements.txt index 714d4269..c76e5b65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,5 +21,5 @@ plotly tqdm black pre-commit -fsspec +fsspec==2021.7.0 pathy diff --git a/tests/config/test.yaml b/tests/config/test.yaml index 7cfc3153..37f846cc 100644 --- a/tests/config/test.yaml +++ b/tests/config/test.yaml @@ -30,3 +30,6 @@ process: local_temp_path: ~/temp/ seed: 1234 upload_every_n_batches: 16 + n_train_batches: 2 + n_validation_batches: 0 + n_test_batches: 0 diff --git a/tests/filesystem/test_local.py b/tests/filesystem/test_local.py index 642a5655..2c60730d 100644 --- a/tests/filesystem/test_local.py +++ b/tests/filesystem/test_local.py @@ -9,6 +9,7 @@ download_to_local, get_all_filenames_in_path, makedirs, + upload_and_delete_local_files, upload_one_file, ) @@ -175,3 +176,42 @@ def test_upload(): # noqa: D103 # check the object are not there filenames = get_all_filenames_in_path(local_path) assert len(filenames) == 3 + + +def test_upload_and_delete_local_files(): + """Check 'upload_and_delete_local_files' works""" + + file1 = "test_file1.txt" + file2 = "test_dir/test_file2.txt" + file3 = "test_file3.txt" + + with tempfile.TemporaryDirectory() as tmpdirname: + local_path = Path(tmpdirname) + + # add fake file to dir + path_and_filename_1 = os.path.join(local_path, file1) + with open(path_and_filename_1, "w"): + pass + + # add fake file to dir + os.mkdir(f"{tmpdirname}/test_dir") + _ = os.path.join(local_path, file2) + with open(os.path.join(local_path, file2), "w"): + pass + + path_and_filename_3 = os.path.join(local_path, file3) + with open(path_and_filename_3, "w"): + pass + + with tempfile.TemporaryDirectory() as tmpdirname2: + dst_path = Path(tmpdirname2) + + upload_and_delete_local_files(dst_path=dst_path, local_path=local_path) + + # check the object are not there,just dir is left + filenames = get_all_filenames_in_path(local_path) + assert len(filenames) == 1 + + # check the object are there + filenames = get_all_filenames_in_path(dst_path) + assert len(filenames) == 3 diff --git a/tests/test_manager.py b/tests/test_manager.py index 81daf75e..4aefbc4b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,4 +1,7 @@ """Test Manager.""" +import logging +import os +import tempfile from datetime import datetime from pathlib import Path @@ -10,6 +13,8 @@ from nowcasting_dataset.data_sources.satellite.satellite_data_source import SatelliteDataSource from nowcasting_dataset.manager import Manager +_LOG = logging.getLogger("nowcasting_dataset") + def test_sample_spatial_and_temporal_locations_for_examples(): # noqa: D103 local_path = Path(nowcasting_dataset.__file__).parent.parent @@ -76,4 +81,87 @@ def test_get_daylight_datetime_index(): np.testing.assert_array_equal(t0_datetimes, correct_t0_datetimes) -# TODO: Issue #322: Test the other Manager methods! +def test_batches(): + """Test that batches can be made""" + filename = Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "sat_data.zarr" + + sat = SatelliteDataSource( + zarr_path=filename, + history_minutes=30, + forecast_minutes=60, + image_size_pixels=64, + meters_per_pixel=2000, + channels=("HRV",), + ) + + filename = ( + Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "gsp" / "test.zarr" + ) + + gsp = GSPDataSource( + zarr_path=filename, + start_dt=datetime(2019, 1, 1), + end_dt=datetime(2019, 1, 2), + history_minutes=30, + forecast_minutes=60, + image_size_pixels=64, + meters_per_pixel=2000, + ) + + manager = Manager() + + # load config + local_path = Path(nowcasting_dataset.__file__).parent.parent + filename = local_path / "tests" / "config" / "test.yaml" + manager.load_yaml_configuration(filename=filename) + + with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101 + + # set local temp path, and dst path + manager.config.output_data.filepath = Path(dst_path) + manager.local_temp_path = Path(local_temp_path) + + # just set satellite as data source + manager.data_sources = {"gsp": gsp, "sat": sat} + manager.data_source_which_defines_geospatial_locations = gsp + + # make file for locations + manager.create_files_specifying_spatial_and_temporal_locations_of_each_example_if_necessary() # noqa 101 + + # make batches + manager.create_batches(overwrite_batches=True) + + from nowcasting_dataset.filesystem.utils import get_all_filenames_in_path + + assert os.path.exists(f"{dst_path}/train") + assert os.path.exists(f"{dst_path}/train/gsp") + + _LOG.warning(get_all_filenames_in_path(f"{dst_path}/train/gsp")) + assert get_all_filenames_in_path(f"{dst_path}/train/gsp") == 0 + + assert os.path.exists(f"{dst_path}/train/gsp/000000.nc") + assert os.path.exists(f"{dst_path}/train/sat/000000.nc") + assert os.path.exists(f"{dst_path}/train/gsp/000001.nc") + assert os.path.exists(f"{dst_path}/train/sat/000001.nc") + + +def test_save_config(): + """Test that configuration file is saved""" + + manager = Manager() + + # load config + local_path = Path(nowcasting_dataset.__file__).parent.parent + filename = local_path / "tests" / "config" / "test.yaml" + manager.load_yaml_configuration(filename=filename) + + with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101 + + # set local temp path, and dst path + manager.config.output_data.filepath = Path(dst_path) + manager.local_temp_path = Path(local_temp_path) + + # save config + manager.save_yaml_configuration() + + assert os.path.exists(f"{dst_path}/configuration.yaml")