Skip to content

Commit f58480c

Browse files
committed
DD-159 fixes for more robust cimis downloads
1 parent 750de7f commit f58480c

File tree

3 files changed

+44
-69
lines changed

3 files changed

+44
-69
lines changed

dms_datastore/download_cimis.py

+44-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import logging
2020
import click
21+
from functools import lru_cache
2122

2223
logging.basicConfig(level=logging.ERROR)
2324
VARTYPES = [
@@ -105,7 +106,11 @@ def download(self, remotefile, dir=None):
105106
dir = self.CIMIS_DOWNLOAD_DIR
106107
localfile = os.path.join(dir, str.split(remotefile, "/")[-1])
107108
self.ensure_dir(os.path.dirname(localfile))
108-
self.sftp.get(remotefile, localfile)
109+
try:
110+
self.sftp.get(remotefile, localfile)
111+
except Exception as ex:
112+
logging.error(f"Error downloading {remotefile}: {ex}")
113+
raise ex
109114
return localfile
110115

111116
def download_zipped(self, year, hourly=True):
@@ -152,6 +157,7 @@ def download_unzipped(self, year, stations, hourly=True):
152157
except Exception as ex:
153158
logging.warning(f"Error downloading {interval} station {station}: {ex}")
154159

160+
@lru_cache(maxsize=128)
155161
def get_columns_for_year(self, y, hourly=True):
156162
if y >= 2014:
157163
units_file = self.download("/pub2/readme-ftp-Revised5units.txt")
@@ -234,6 +240,7 @@ def download_current_month(self, stations, hourly=True):
234240
except Exception as ex:
235241
logging.warning(f"Error downloading station {station}: {ex}")
236242

243+
@lru_cache(maxsize=128)
237244
def get_stations_info(
238245
self,
239246
file="/pub2/CIMIS Stations List (January20).xlsx",
@@ -443,7 +450,14 @@ def cache_to_pkl(self, dfstations):
443450
print(dfstations[dfstations["Station Number"].isin(failed_stations)])
444451

445452

446-
def download_all_data(hourly=True):
453+
def download_all_data(hourly=True, partial=False):
454+
"""
455+
Download all CIMIS data from the FTP site. Each year is a separate file for hourly data
456+
Each month is a separate file for hourly data
457+
458+
:param hourly: download hourly data (default is True)
459+
:param partial: download only partial data (default is False) (only downloads last couple of years)
460+
"""
447461
password = os.environ.get("CIMIS_PASSWORD", default="xxx")
448462
cx = CIMIS(password=password)
449463
if hourly:
@@ -456,9 +470,11 @@ def download_all_data(hourly=True):
456470
dfcat.to_csv("cimis_stations.csv", index="Station Number")
457471
current_year = pd.to_datetime("today").year
458472
active_stations = list(dfcat[dfcat["Status"] == "Active"]["Station Number"])
459-
for year in range(min_year, current_year - 2):
460-
print(f"Downloading zipped {interval} data for year", year)
461-
cx.download_zipped(year, hourly)
473+
474+
if not partial:
475+
for year in range(min_year, current_year - 2):
476+
print(f"Downloading zipped {interval} data for year", year)
477+
cx.download_zipped(year, hourly)
462478

463479
for year in range(current_year - 2, current_year):
464480
print(f"Downloading unzipped {interval} data for year", year)
@@ -472,7 +488,7 @@ def download_all_data(hourly=True):
472488
dfs = cx.load_station(station, True, hourly)
473489
dfs.to_csv(f"cimis_{interval}_{station:03d}.csv", index="Date")
474490
except Exception as e:
475-
logging.error(f"Error: {e}")
491+
logging.error(f"Error loading station {station}: {e}")
476492
continue
477493

478494

@@ -492,7 +508,11 @@ def merge_with_existing(existing_dir, new_dir, hourly=True):
492508
dfn = pd.read_csv(file, index_col=0, parse_dates=True)
493509
if os.path.exists(existing_file):
494510
dfe = pd.read_csv(existing_file, index_col=0, parse_dates=True)
495-
dfe.combine_first(dfn).to_csv(existing_file)
511+
# Combine the two DataFrames and remove duplicates
512+
combined = pd.concat([dfe, dfn]).drop_duplicates(
513+
keep="last"
514+
) # Keeps the last occurrence
515+
combined.to_csv(existing_file)
496516
else:
497517
logging.warning(f"File {existing_file} does not exist so writing new file")
498518
dfn.to_csv(existing_file)
@@ -503,7 +523,19 @@ def merge_with_existing(existing_dir, new_dir, hourly=True):
503523
"--hourly", type=bool, default=True, help="Download hourly data (default is True)"
504524
)
505525
@click.option("--existing_dir", default=None, help="Directory to merge new data into")
506-
def main(hourly, existing_dir=None):
526+
@click.option(
527+
"--download",
528+
type=bool,
529+
default=True,
530+
help="Download data (default is True)",
531+
)
532+
@click.option(
533+
"--partial",
534+
is_flag=True,
535+
default=False,
536+
help="Set partial download to True if provided (default is False)",
537+
)
538+
def main(hourly, existing_dir=None, download=True, partial=False):
507539
"""
508540
Download CIMIS data
509541
--hourly: download hourly data (default is True)
@@ -512,6 +544,9 @@ def main(hourly, existing_dir=None):
512544
environment variable CIMIS_PASSWORD must be set to the password for the CIMIS FTP site
513545
514546
"""
515-
download_all_data(hourly=hourly)
547+
if partial:
548+
partial_only = True
549+
if download:
550+
download_all_data(hourly=hourly, partial=partial)
516551
if existing_dir is not None:
517552
merge_with_existing(existing_dir, ".", hourly=hourly)

test_download/ex_cimis.py

-36
This file was deleted.

test_download/ex_ucdipm.py

-24
This file was deleted.

0 commit comments

Comments
 (0)