Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion nowcasting_dataset/data_sources/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def create_batches(
# Split locations per example into batches:
n_batches = len(spatial_and_temporal_locations_of_each_example) // batch_size
locations_for_batches = []
logger.warning("xxxxx")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this warning still required? 🙂

logger.warning(n_batches)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this warning a bit more verbose?

for batch_idx in range(n_batches):
start_example_idx = batch_idx * batch_size
end_example_idx = (batch_idx + 1) * batch_size
Expand All @@ -193,7 +195,7 @@ def create_batches(
# Loop round each batch:
for n_batches_processed, locations_for_batch in enumerate(locations_for_batches):
batch_idx = idx_of_first_batch + n_batches_processed
logger.debug(f"{self.__class__.__name__} creating batch {batch_idx}!")
logger.warning(f"{self.__class__.__name__} creating batch {batch_idx}!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I've misunderstood but why does this message need to be a warning?


# Generate batch.
batch = self.get_batch(
Expand All @@ -205,6 +207,7 @@ def create_batches(
# Save batch to disk.
netcdf_filename = path_to_write_to / nd_utils.get_netcdf_filename(batch_idx)
batch.to_netcdf(netcdf_filename)
logger.warning(f"Save file to {netcdf_filename}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: Does this message really need to be a warning?


# Upload if necessary.
if (
Expand Down
6 changes: 5 additions & 1 deletion nowcasting_dataset/dataset/split/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def split_data(

logger.debug("Split data done!")
for split_name, dt in split_datetimes._asdict().items():
logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}")
if len(dt) == 0:
# only a warning is made as this may happen during unittests
logger.warning(f"{split_name} has {len(dt):,d} datetimes")
else:
logger.debug(f"{split_name} has {len(dt):,d} datetimes, from {dt[0]} to {dt[-1]}")

return split_datetimes
13 changes: 11 additions & 2 deletions nowcasting_dataset/filesystem/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,24 @@
_LOG = logging.getLogger("nowcasting_dataset")


def upload_and_delete_local_files(dst_path: str, local_path: Path):
def upload_and_delete_local_files(dst_path: Union[str, Path], local_path: Union[str, Path]):
"""
Upload an entire folder and delete local files to either AWS or GCP
"""
_LOG.info("Uploading!")
filesystem = get_filesystem(dst_path)
filesystem.put(str(local_path), dst_path, recursive=True)

_LOG.warning(f"moving files from {local_path} to {dst_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a warning?


_LOG.warning(get_all_filenames_in_path(local_path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make these two messages a bit more verbose (or maybe remove these two messages?) Also, do these need to be warnings?

_LOG.warning(get_all_filenames_in_path(dst_path))

filesystem.put(str(local_path) + "/", str(dst_path) + "/", recursive=True)
delete_all_files_in_temp_path(local_path)

_LOG.warning(get_all_filenames_in_path(local_path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above :)

_LOG.warning(get_all_filenames_in_path(dst_path))


def get_filesystem(path: Union[str, Path]) -> fsspec.AbstractFileSystem:
r"""Get the fsspect FileSystem from a path.
Expand Down
21 changes: 18 additions & 3 deletions nowcasting_dataset/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ def create_batches(self, overwrite_batches: bool) -> None:
for worker_id, (data_source_name, data_source) in enumerate(
self.data_sources.items()
):

if len(locations_for_split) == 0:
# not raising error as this is ok for unittests
logger.warning(
f"Not create batches for {split_name} as there are no locations"
)
break

# Get indexes of first batch and example. And subset locations_for_split.
idx_of_first_batch = first_batches_to_create[split_name][data_source_name]
idx_of_first_example = idx_of_first_batch * self.config.process.batch_size
Expand All @@ -389,18 +397,25 @@ def create_batches(self, overwrite_batches: bool) -> None:
nd_fs_utils.makedirs(dst_path, exist_ok=True)
if self.save_batches_locally_and_upload:
nd_fs_utils.makedirs(local_temp_path, exist_ok=True)
else:
logger.warning(
f"Not saving uploading batches so have not made {local_temp_path}"
)

# Submit data_source.create_batches task to the worker process.
future = executor.submit(
data_source.create_batches,
print(executor)
# future = executor.submit(
logger.warning("Making batches")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be warnings?

logger.warning(locations)
data_source.create_batches(
spatial_and_temporal_locations_of_each_example=locations,
idx_of_first_batch=idx_of_first_batch,
batch_size=self.config.process.batch_size,
dst_path=dst_path,
local_temp_path=local_temp_path,
upload_every_n_batches=self.config.process.upload_every_n_batches,
)
future_create_batches_jobs.append(future)
# future_create_batches_jobs.append(future)

# Wait for all futures to finish:
for future, data_source_name in zip(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ plotly
tqdm
black
pre-commit
fsspec
fsspec==2021.7.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, interesting, are we sure we can't use more recent versions of fsspec? A few days ago, there was a bug (I think) in fsspec which forced pip to install an ancient version of gscfs but that appears fixed now (if that's what make you pin fsspec to version 2021.7.0?

pathy
3 changes: 3 additions & 0 deletions tests/config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ process:
local_temp_path: ~/temp/
seed: 1234
upload_every_n_batches: 16
n_train_batches: 2
n_validation_batches: 0
n_test_batches: 0
40 changes: 40 additions & 0 deletions tests/filesystem/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
download_to_local,
get_all_filenames_in_path,
makedirs,
upload_and_delete_local_files,
upload_one_file,
)

Expand Down Expand Up @@ -175,3 +176,42 @@ def test_upload(): # noqa: D103
# check the object are not there
filenames = get_all_filenames_in_path(local_path)
assert len(filenames) == 3


def test_upload_and_delete_local_files():
"""Check 'upload_and_delete_local_files' works"""

file1 = "test_file1.txt"
file2 = "test_dir/test_file2.txt"
file3 = "test_file3.txt"

with tempfile.TemporaryDirectory() as tmpdirname:
local_path = Path(tmpdirname)

# add fake file to dir
path_and_filename_1 = os.path.join(local_path, file1)
with open(path_and_filename_1, "w"):
pass

# add fake file to dir
os.mkdir(f"{tmpdirname}/test_dir")
_ = os.path.join(local_path, file2)
with open(os.path.join(local_path, file2), "w"):
pass

path_and_filename_3 = os.path.join(local_path, file3)
with open(path_and_filename_3, "w"):
pass

with tempfile.TemporaryDirectory() as tmpdirname2:
dst_path = Path(tmpdirname2)

upload_and_delete_local_files(dst_path=dst_path, local_path=local_path)

# check the object are not there,just dir is left
filenames = get_all_filenames_in_path(local_path)
assert len(filenames) == 1

# check the object are there
filenames = get_all_filenames_in_path(dst_path)
assert len(filenames) == 3
90 changes: 89 additions & 1 deletion tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Test Manager."""
import logging
import os
import tempfile
from datetime import datetime
from pathlib import Path

Expand All @@ -10,6 +13,8 @@
from nowcasting_dataset.data_sources.satellite.satellite_data_source import SatelliteDataSource
from nowcasting_dataset.manager import Manager

_LOG = logging.getLogger("nowcasting_dataset")


def test_sample_spatial_and_temporal_locations_for_examples(): # noqa: D103
local_path = Path(nowcasting_dataset.__file__).parent.parent
Expand Down Expand Up @@ -76,4 +81,87 @@ def test_get_daylight_datetime_index():
np.testing.assert_array_equal(t0_datetimes, correct_t0_datetimes)


# TODO: Issue #322: Test the other Manager methods!
def test_batches():
"""Test that batches can be made"""
filename = Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "sat_data.zarr"

sat = SatelliteDataSource(
zarr_path=filename,
history_minutes=30,
forecast_minutes=60,
image_size_pixels=64,
meters_per_pixel=2000,
channels=("HRV",),
)

filename = (
Path(nowcasting_dataset.__file__).parent.parent / "tests" / "data" / "gsp" / "test.zarr"
)

gsp = GSPDataSource(
zarr_path=filename,
start_dt=datetime(2019, 1, 1),
end_dt=datetime(2019, 1, 2),
history_minutes=30,
forecast_minutes=60,
image_size_pixels=64,
meters_per_pixel=2000,
)

manager = Manager()

# load config
local_path = Path(nowcasting_dataset.__file__).parent.parent
filename = local_path / "tests" / "config" / "test.yaml"
manager.load_yaml_configuration(filename=filename)

with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101

# set local temp path, and dst path
manager.config.output_data.filepath = Path(dst_path)
manager.local_temp_path = Path(local_temp_path)

# just set satellite as data source
manager.data_sources = {"gsp": gsp, "sat": sat}
manager.data_source_which_defines_geospatial_locations = gsp

# make file for locations
manager.create_files_specifying_spatial_and_temporal_locations_of_each_example_if_necessary() # noqa 101

# make batches
manager.create_batches(overwrite_batches=True)

from nowcasting_dataset.filesystem.utils import get_all_filenames_in_path

assert os.path.exists(f"{dst_path}/train")
assert os.path.exists(f"{dst_path}/train/gsp")

_LOG.warning(get_all_filenames_in_path(f"{dst_path}/train/gsp"))
assert get_all_filenames_in_path(f"{dst_path}/train/gsp") == 0

assert os.path.exists(f"{dst_path}/train/gsp/000000.nc")
assert os.path.exists(f"{dst_path}/train/sat/000000.nc")
assert os.path.exists(f"{dst_path}/train/gsp/000001.nc")
assert os.path.exists(f"{dst_path}/train/sat/000001.nc")


def test_save_config():
"""Test that configuration file is saved"""

manager = Manager()

# load config
local_path = Path(nowcasting_dataset.__file__).parent.parent
filename = local_path / "tests" / "config" / "test.yaml"
manager.load_yaml_configuration(filename=filename)

with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101

# set local temp path, and dst path
manager.config.output_data.filepath = Path(dst_path)
manager.local_temp_path = Path(local_temp_path)

# save config
manager.save_yaml_configuration()

assert os.path.exists(f"{dst_path}/configuration.yaml")