From 689ddff24bc7bdb421259e8e9dfa9e66846a915a Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:17:43 +0000 Subject: [PATCH 01/20] add make batches test for manager --- nowcasting_dataset/dataset/split/split.py | 5 +- nowcasting_dataset/filesystem/utils.py | 2 +- nowcasting_dataset/manager.py | 4 ++ tests/config/test.yaml | 3 ++ tests/test_manager.py | 62 +++++++++++++++++++++++ 5 files changed, 74 insertions(+), 2 deletions(-) diff --git a/nowcasting_dataset/dataset/split/split.py b/nowcasting_dataset/dataset/split/split.py index 4f1e134b..1a6f1eb4 100644 --- a/nowcasting_dataset/dataset/split/split.py +++ b/nowcasting_dataset/dataset/split/split.py @@ -200,6 +200,9 @@ def split_data( logger.debug("Split data done!") for split_name, dt in split_datetimes._asdict().items(): - logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}") + if len(dt) == 0: + logger.warning(f"{split_name} has {len(dt):,d} datetimes") + else: + logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}") return split_datetimes diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index 4f9feed9..12bb27f7 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,7 +16,7 @@ def upload_and_delete_local_files(dst_path: str, local_path: Path): """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) - filesystem.put(str(local_path), dst_path, recursive=True) + filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 9383a73e..5a180fed 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -369,6 +369,10 @@ def create_batches(self, overwrite_batches: bool) -> None: for worker_id, (data_source_name, data_source) in enumerate( self.data_sources.items() ): + + if len(locations_for_split) == 0: + break + # Get indexes of first batch and example. And subset locations_for_split. idx_of_first_batch = first_batches_to_create[split_name][data_source_name] idx_of_first_example = idx_of_first_batch * self.config.process.batch_size diff --git a/tests/config/test.yaml b/tests/config/test.yaml index 7cfc3153..37f846cc 100644 --- a/tests/config/test.yaml +++ b/tests/config/test.yaml @@ -30,3 +30,6 @@ process: local_temp_path: ~/temp/ seed: 1234 upload_every_n_batches: 16 + n_train_batches: 2 + n_validation_batches: 0 + n_test_batches: 0 diff --git a/tests/test_manager.py b/tests/test_manager.py index 81daf75e..1dc00174 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,4 +1,6 @@ """Test Manager.""" +import os +import tempfile from datetime import datetime from pathlib import Path @@ -77,3 +79,63 @@ def test_get_daylight_datetime_index(): # TODO: Issue #322: Test the other Manager methods! + + +def test_batches(): + """Test that batches can be made""" + filename = Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "sat_data.zarr" + + sat = SatelliteDataSource( + zarr_path=filename, + history_minutes=30, + forecast_minutes=60, + image_size_pixels=64, + meters_per_pixel=2000, + channels=("HRV",), + ) + + filename = ( + Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "gsp" / "test.zarr" + ) + + gsp = GSPDataSource( + zarr_path=filename, + start_dt=datetime(2019, 1, 1), + end_dt=datetime(2019, 1, 2), + history_minutes=30, + forecast_minutes=60, + image_size_pixels=64, + meters_per_pixel=2000, + ) + + manager = Manager() + + # load config + local_path = Path(nowcasting_dataset.__file__).parent.parent + filename = local_path / "tests" / "config" / "test.yaml" + manager.load_yaml_configuration(filename=filename) + + with tempfile.TemporaryDirectory(dir="./") as local_temp_path, tempfile.TemporaryDirectory( + dir="./" + ) as dst_path: + + # set local temp path, and dst path + manager.config.output_data.filepath = Path(dst_path) + manager.local_temp_path = Path(local_temp_path) + + # just set satellite as data source + manager.data_sources = {"gsp": gsp, "sat": sat} + manager.data_source_which_defines_geospatial_locations = gsp + + # make file for locations + manager.create_files_specifying_spatial_and_temporal_locations_of_each_example_if_necessary() # noqa 101 + + # make batches + manager.create_batches(overwrite_batches=True) + + assert os.path.exists(f"{dst_path}/train") + assert os.path.exists(f"{dst_path}/train/gsp") + assert os.path.exists(f"{dst_path}/train/gsp/000000.nc") + assert os.path.exists(f"{dst_path}/train/sat/000000.nc") + assert os.path.exists(f"{dst_path}/train/gsp/000001.nc") + assert os.path.exists(f"{dst_path}/train/sat/000001.nc") From 4489256188377325a984cb49df02cfa1e0590fab Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:19:53 +0000 Subject: [PATCH 02/20] tidy --- tests/test_manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_manager.py b/tests/test_manager.py index 1dc00174..00d07eea 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -115,9 +115,7 @@ def test_batches(): filename = local_path / "tests" / "config" / "test.yaml" manager.load_yaml_configuration(filename=filename) - with tempfile.TemporaryDirectory(dir="./") as local_temp_path, tempfile.TemporaryDirectory( - dir="./" - ) as dst_path: + with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101 # set local temp path, and dst path manager.config.output_data.filepath = Path(dst_path) From 7ce94fb1af811202ef90f70f54ed3cc3a411b83c Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:23:34 +0000 Subject: [PATCH 03/20] add save config test --- nowcasting_dataset/filesystem/utils.py | 2 +- tests/test_manager.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index 12bb27f7..d16e5d5a 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -10,7 +10,7 @@ _LOG = logging.getLogger("nowcasting_dataset") -def upload_and_delete_local_files(dst_path: str, local_path: Path): +def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[str, Path]): """ Upload an entire folder and delete local files to either AWS or GCP """ diff --git a/tests/test_manager.py b/tests/test_manager.py index 00d07eea..3b6a8d1b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -137,3 +137,25 @@ def test_batches(): assert os.path.exists(f"{dst_path}/train/sat/000000.nc") assert os.path.exists(f"{dst_path}/train/gsp/000001.nc") assert os.path.exists(f"{dst_path}/train/sat/000001.nc") + + +def test_save_config(): + """Test that configuration file is saved""" + + manager = Manager() + + # load config + local_path = Path(nowcasting_dataset.__file__).parent.parent + filename = local_path / "tests" / "config" / "test.yaml" + manager.load_yaml_configuration(filename=filename) + + with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101 + + # set local temp path, and dst path + manager.config.output_data.filepath = Path(dst_path) + manager.local_temp_path = Path(local_temp_path) + + # save config + manager.save_yaml_configuration() + + assert os.path.exists(f"{dst_path}/configuration.yaml") From 71d2228758652c4b264109f1481d8b8c675234d8 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:27:11 +0000 Subject: [PATCH 04/20] self PR review --- nowcasting_dataset/dataset/split/split.py | 1 + tests/test_manager.py | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/nowcasting_dataset/dataset/split/split.py b/nowcasting_dataset/dataset/split/split.py index 1a6f1eb4..f873728e 100644 --- a/nowcasting_dataset/dataset/split/split.py +++ b/nowcasting_dataset/dataset/split/split.py @@ -201,6 +201,7 @@ def split_data( logger.debug("Split data done!") for split_name, dt in split_datetimes._asdict().items(): if len(dt) == 0: + # only a warning is made as this may happen during unittests logger.warning(f"{split_name} has {len(dt):,d} datetimes") else: logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}") diff --git a/tests/test_manager.py b/tests/test_manager.py index 3b6a8d1b..0056ee2e 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -78,9 +78,6 @@ def test_get_daylight_datetime_index(): np.testing.assert_array_equal(t0_datetimes, correct_t0_datetimes) -# TODO: Issue #322: Test the other Manager methods! - - def test_batches(): """Test that batches can be made""" filename = Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "sat_data.zarr" From 6886b37ac537ab8019faebd9f0a650314417e3fd Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:35:33 +0000 Subject: [PATCH 05/20] add warning logs to debug --- nowcasting_dataset/data_sources/data_source.py | 1 + nowcasting_dataset/filesystem/utils.py | 1 + 2 files changed, 2 insertions(+) diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index 4d1e336e..ded1d639 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -205,6 +205,7 @@ def create_batches( # Save batch to disk. netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx) batch.to_netcdf(netcdf_filename) + logger.warning(f"Saved file to {netcdf_filename}") # Upload if necessary. if ( diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index d16e5d5a..7afd9cc9 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,6 +16,7 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) + _LOG.warning(f"moving files from {local_path} to {dst_path}!") filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) From ffd94c1e904f7ae955b28bb058de6a7b7620502a Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:40:13 +0000 Subject: [PATCH 06/20] change warnings to prints --- nowcasting_dataset/data_sources/data_source.py | 2 +- nowcasting_dataset/filesystem/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index ded1d639..0f0cf0e2 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -205,7 +205,7 @@ def create_batches( # Save batch to disk. netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx) batch.to_netcdf(netcdf_filename) - logger.warning(f"Saved file to {netcdf_filename}") + print(f"Saved file to {netcdf_filename}") # Upload if necessary. if ( diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index 7afd9cc9..5aecd7fa 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,7 +16,7 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) - _LOG.warning(f"moving files from {local_path} to {dst_path}!") + print(f"moving files from {local_path} to {dst_path}!") filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) From b0f20c01dbb4f0c22e0537a345b9790184f1ff42 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:45:03 +0000 Subject: [PATCH 07/20] add warnings --- nowcasting_dataset/manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 5a180fed..8c025105 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -371,6 +371,10 @@ def create_batches(self, overwrite_batches: bool) -> None: ): if len(locations_for_split) == 0: + # not raising error as this is ok for unittests + logger.warning( + f"Not create batches for {split_name} " f"as there are no locations" + ) break # Get indexes of first batch and example. And subset locations_for_split. From d7b3414f55c92f599fcefe312659c6f09949d3a7 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:45:25 +0000 Subject: [PATCH 08/20] tidy --- nowcasting_dataset/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 8c025105..8218a0c6 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -373,7 +373,7 @@ def create_batches(self, overwrite_batches: bool) -> None: if len(locations_for_split) == 0: # not raising error as this is ok for unittests logger.warning( - f"Not create batches for {split_name} " f"as there are no locations" + f"Not create batches for {split_name} as there are no locations" ) break From c22a0780e841f620ade9b2faec16d549b47fd74f Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:51:14 +0000 Subject: [PATCH 09/20] debug --- nowcasting_dataset/filesystem/utils.py | 2 +- tests/test_manager.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index 5aecd7fa..cee928af 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,7 +16,7 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) - print(f"moving files from {local_path} to {dst_path}!") + print(f"moving files from {local_path} to {dst_path}") filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) diff --git a/tests/test_manager.py b/tests/test_manager.py index 0056ee2e..1fd25884 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -128,8 +128,14 @@ def test_batches(): # make batches manager.create_batches(overwrite_batches=True) + from nowcasting_dataset.filesystem.utils import get_all_filenames_in_path + assert os.path.exists(f"{dst_path}/train") assert os.path.exists(f"{dst_path}/train/gsp") + + print(get_all_filenames_in_path(f"{dst_path}/train/gsp")) + assert get_all_filenames_in_path(f"{dst_path}/train/gsp") + assert os.path.exists(f"{dst_path}/train/gsp/000000.nc") assert os.path.exists(f"{dst_path}/train/sat/000000.nc") assert os.path.exists(f"{dst_path}/train/gsp/000001.nc") From 94c52cdb6711f183186c96c32c14444ce14afcb8 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 12:52:09 +0000 Subject: [PATCH 10/20] force assert --- tests/test_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_manager.py b/tests/test_manager.py index 1fd25884..a171de03 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -134,7 +134,7 @@ def test_batches(): assert os.path.exists(f"{dst_path}/train/gsp") print(get_all_filenames_in_path(f"{dst_path}/train/gsp")) - assert get_all_filenames_in_path(f"{dst_path}/train/gsp") + assert get_all_filenames_in_path(f"{dst_path}/train/gsp") == 0 assert os.path.exists(f"{dst_path}/train/gsp/000000.nc") assert os.path.exists(f"{dst_path}/train/sat/000000.nc") From 984bd71f82ac5c213717e533309ed6a7dae6b497 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:02:30 +0000 Subject: [PATCH 11/20] dont use futures --- nowcasting_dataset/data_sources/data_source.py | 2 ++ nowcasting_dataset/manager.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index 0f0cf0e2..857551a6 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -182,6 +182,8 @@ def create_batches( # Split locations per example into batches: n_batches = len(spatial_and_temporal_locations_of_each_example) // batch_size locations_for_batches = [] + print("xxxxx") + print(n_batches) for batch_idx in range(n_batches): start_example_idx = batch_idx * batch_size end_example_idx = (batch_idx + 1) * batch_size diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index 8218a0c6..d631f974 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -397,10 +397,15 @@ def create_batches(self, overwrite_batches: bool) -> None: nd_fs_utils.makedirs(dst_path, exist_ok=True) if self.save_batches_locally_and_upload: nd_fs_utils.makedirs(local_temp_path, exist_ok=True) + else: + logger.warning( + f"Not saving uploading batches so have not made {local_temp_path}" + ) # Submit data_source.create_batches task to the worker process. - future = executor.submit( - data_source.create_batches, + print(executor) + # future = executor.submit( + data_source.create_batches( spatial_and_temporal_locations_of_each_example=locations, idx_of_first_batch=idx_of_first_batch, batch_size=self.config.process.batch_size, @@ -408,7 +413,7 @@ def create_batches(self, overwrite_batches: bool) -> None: local_temp_path=local_temp_path, upload_every_n_batches=self.config.process.upload_every_n_batches, ) - future_create_batches_jobs.append(future) + # future_create_batches_jobs.append(future) # Wait for all futures to finish: for future, data_source_name in zip( From daf3ef68eaa2ce10d96b98397c3bf42e9d3eb415 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:06:31 +0000 Subject: [PATCH 12/20] add more warnings --- nowcasting_dataset/manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nowcasting_dataset/manager.py b/nowcasting_dataset/manager.py index d631f974..f7f24795 100644 --- a/nowcasting_dataset/manager.py +++ b/nowcasting_dataset/manager.py @@ -405,6 +405,8 @@ def create_batches(self, overwrite_batches: bool) -> None: # Submit data_source.create_batches task to the worker process. print(executor) # future = executor.submit( + logger.warning("Making batches") + logger.warning(locations) data_source.create_batches( spatial_and_temporal_locations_of_each_example=locations, idx_of_first_batch=idx_of_first_batch, From 82d850cb22609411b7d1836003aa2a7f91913828 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:08:28 +0000 Subject: [PATCH 13/20] more warnings --- nowcasting_dataset/data_sources/data_source.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index 857551a6..ec4e36bc 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -182,8 +182,8 @@ def create_batches( # Split locations per example into batches: n_batches = len(spatial_and_temporal_locations_of_each_example) // batch_size locations_for_batches = [] - print("xxxxx") - print(n_batches) + logger.warning("xxxxx") + logger.warning(n_batches) for batch_idx in range(n_batches): start_example_idx = batch_idx * batch_size end_example_idx = (batch_idx + 1) * batch_size @@ -195,7 +195,7 @@ def create_batches( # Loop round each batch: for n_batches_processed, locations_for_batch in enumerate(locations_for_batches): batch_idx = idx_of_first_batch + n_batches_processed - logger.debug(f"{self.__class__.__name__} creating batch {batch_idx}!") + logger.warning(f"{self.__class__.__name__} creating batch {batch_idx}!") # Generate batch. batch = self.get_batch( @@ -207,7 +207,6 @@ def create_batches( # Save batch to disk. netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx) batch.to_netcdf(netcdf_filename) - print(f"Saved file to {netcdf_filename}") # Upload if necessary. if ( From 95c1a62733969bdf2ce6edb9c8117f7b925de5e2 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:09:13 +0000 Subject: [PATCH 14/20] more warnings --- nowcasting_dataset/filesystem/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index cee928af..c9b6801f 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,7 +16,7 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) - print(f"moving files from {local_path} to {dst_path}") + _LOG.warning(f"moving files from {local_path} to {dst_path}") filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) From a3624faad701cde6b4abaa4d2c57a49d0a2b353c Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:14:06 +0000 Subject: [PATCH 15/20] add warning in test --- tests/test_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_manager.py b/tests/test_manager.py index a171de03..4aefbc4b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,4 +1,5 @@ """Test Manager.""" +import logging import os import tempfile from datetime import datetime @@ -12,6 +13,8 @@ from nowcasting_dataset.data_sources.satellite.satellite_data_source import SatelliteDataSource from nowcasting_dataset.manager import Manager +_LOG = logging.getLogger("nowcasting_dataset") + def test_sample_spatial_and_temporal_locations_for_examples(): # noqa: D103 local_path = Path(nowcasting_dataset.__file__).parent.parent @@ -133,7 +136,7 @@ def test_batches(): assert os.path.exists(f"{dst_path}/train") assert os.path.exists(f"{dst_path}/train/gsp") - print(get_all_filenames_in_path(f"{dst_path}/train/gsp")) + _LOG.warning(get_all_filenames_in_path(f"{dst_path}/train/gsp")) assert get_all_filenames_in_path(f"{dst_path}/train/gsp") == 0 assert os.path.exists(f"{dst_path}/train/gsp/000000.nc") From 1d4aef9a320662a85bca859e7e1f587f6361bef7 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:15:41 +0000 Subject: [PATCH 16/20] add warnings --- nowcasting_dataset/data_sources/data_source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nowcasting_dataset/data_sources/data_source.py b/nowcasting_dataset/data_sources/data_source.py index ec4e36bc..4ea12fe6 100644 --- a/nowcasting_dataset/data_sources/data_source.py +++ b/nowcasting_dataset/data_sources/data_source.py @@ -207,6 +207,7 @@ def create_batches( # Save batch to disk. netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx) batch.to_netcdf(netcdf_filename) + logger.warning(f"Save file to {netcdf_filename}") # Upload if necessary. if ( From 03a2f16fb336a809cd68eb547f26fbdf83a782aa Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:24:26 +0000 Subject: [PATCH 17/20] add test for upload_and_delete_local_files --- tests/filesystem/test_local.py | 40 ++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/filesystem/test_local.py b/tests/filesystem/test_local.py index 642a5655..2c60730d 100644 --- a/tests/filesystem/test_local.py +++ b/tests/filesystem/test_local.py @@ -9,6 +9,7 @@ download_to_local, get_all_filenames_in_path, makedirs, + upload_and_delete_local_files, upload_one_file, ) @@ -175,3 +176,42 @@ def test_upload(): # noqa: D103 # check the object are not there filenames = get_all_filenames_in_path(local_path) assert len(filenames) == 3 + + +def test_upload_and_delete_local_files(): + """Check 'upload_and_delete_local_files' works""" + + file1 = "test_file1.txt" + file2 = "test_dir/test_file2.txt" + file3 = "test_file3.txt" + + with tempfile.TemporaryDirectory() as tmpdirname: + local_path = Path(tmpdirname) + + # add fake file to dir + path_and_filename_1 = os.path.join(local_path, file1) + with open(path_and_filename_1, "w"): + pass + + # add fake file to dir + os.mkdir(f"{tmpdirname}/test_dir") + _ = os.path.join(local_path, file2) + with open(os.path.join(local_path, file2), "w"): + pass + + path_and_filename_3 = os.path.join(local_path, file3) + with open(path_and_filename_3, "w"): + pass + + with tempfile.TemporaryDirectory() as tmpdirname2: + dst_path = Path(tmpdirname2) + + upload_and_delete_local_files(dst_path=dst_path, local_path=local_path) + + # check the object are not there,just dir is left + filenames = get_all_filenames_in_path(local_path) + assert len(filenames) == 1 + + # check the object are there + filenames = get_all_filenames_in_path(dst_path) + assert len(filenames) == 3 From 0f2bef80c0c37babf1dfa1f99d56627a15c33db1 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:26:10 +0000 Subject: [PATCH 18/20] add more warnings --- nowcasting_dataset/filesystem/utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index c9b6801f..b34b2ea1 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -16,10 +16,18 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ """ _LOG.info("Uploading!") filesystem = get_filesystem(dst_path) + _LOG.warning(f"moving files from {local_path} to {dst_path}") + + _LOG.warning(get_all_filenames_in_path(local_path)) + _LOG.warning(get_all_filenames_in_path(dst_path)) + filesystem.put(str(local_path), str(dst_path), recursive=True) delete_all_files_in_temp_path(local_path) + _LOG.warning(get_all_filenames_in_path(local_path)) + _LOG.warning(get_all_filenames_in_path(dst_path)) + def get_filesystem(path: Union[str, Path]) -> fsspec.AbstractFileSystem: r"""Get the fsspect FileSystem from a path. From 0545c52e7b5348bd98365632b639d2d41af10edb Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:32:25 +0000 Subject: [PATCH 19/20] make sure paths have '/' at end for 'put' function --- nowcasting_dataset/filesystem/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nowcasting_dataset/filesystem/utils.py b/nowcasting_dataset/filesystem/utils.py index b34b2ea1..dbff97d3 100644 --- a/nowcasting_dataset/filesystem/utils.py +++ b/nowcasting_dataset/filesystem/utils.py @@ -22,7 +22,7 @@ def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[ _LOG.warning(get_all_filenames_in_path(local_path)) _LOG.warning(get_all_filenames_in_path(dst_path)) - filesystem.put(str(local_path), str(dst_path), recursive=True) + filesystem.put(str(local_path) + "/", str(dst_path) + "/", recursive=True) delete_all_files_in_temp_path(local_path) _LOG.warning(get_all_filenames_in_path(local_path)) From 7fc646cad73abca21847191cb33d2c9dbb329cd0 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Wed, 10 Nov 2021 13:42:57 +0000 Subject: [PATCH 20/20] fix fsspec to 2021.7.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 714d4269..c76e5b65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,5 +21,5 @@ plotly tqdm black pre-commit -fsspec +fsspec==2021.7.0 pathy