Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
run: |
source ~/.venv/bin/activate
black --check .
flake8 --max-line-length=88 .
flake8 --max-line-length=88 --extend-ignore=E501 .
isort --check-only --diff .
coverage run --include="./*" --omit="docs/","*/tests/*","_version.py" -m unittest -v
coverage json
Expand Down
5 changes: 5 additions & 0 deletions docs/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
Release notes
=============

DEV
===

Added the ``--insert-all`` command line argument.

Version 4.2 (2025-08-23)
========================

Expand Down
28 changes: 28 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ and ``Task Scheduler`` on Windows), or, if the software you use to
download the data from the meteorological station has the feature, add
``loggertodb`` as a trigger.

Command line reference
======================

::

loggertodb <configuration file> [--insert-all <station> <filepath>]

Normally ``loggertodb`` reads the last date stored in the database and
then reads the input file(s) from the end upwards until it finds the
date that is already stored in the database. Subsequently it enters the
new records from that point on. This can be changed with the
``--insert-all`` command line option. In this case, there are the
following differences from normal operation:

* ``loggertodb`` ignores all sections of the configuration file except
for <station>.
* It also ignores the ``path`` in the configuration file and instead
uses <filepath>.
* It also ignores ``max_records``.
* It uploads all records in the specified file; not only those that are
newer than the last record in the database. However records with
these dates must not already be in the database; in this case
Enhydris will refuse to upload with an http error code of 400.

``--insert-all`` is useful when older data are discovered later and need
to be inserted retroactively. It is available only for text storage formats; it
is not available for ``wdat5``.

Configuration file reference
============================

Expand Down
87 changes: 58 additions & 29 deletions loggertodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import sys
import traceback
from typing import Any

import click

Expand All @@ -26,45 +27,53 @@ def __str__(self):
@click.option(
"--upgrade", is_flag=True, default=False, help="Upgrade configuration file"
)
@click.option("--insert-all", nargs=2, type=str, help="Insert old data retroactively")
@click.argument("configfile")
@click.version_option(version=__version__, prog_name="loggertodb")
def main(upgrade, configfile):
def main(upgrade: bool, insert_all: tuple[str, str] | None, configfile: str):
"""Insert meteorological logger data to Enhydris"""
if upgrade:
ConfigFile(configfile).upgrade()
else:
LoggerToDb(configfile).run()
LoggerToDb(configfile, insert_all).run()


class LoggerToDb:
def __init__(self, configfile):
def __init__(self, configfile: str, insert_all: tuple[str, str] | None):
self.configfile = configfile
self.logging_system = Logging()
self.insert_all = insert_all

def run(self):
try:
self.configuration = Configuration(self.configfile, self.logging_system)
self.configuration = Configuration(
self.configfile, self.logging_system, self.insert_all
)
self.configuration.read()
self.logging_system.setup_logger(self.configuration)
self.logging_system.log_start_of_execution()
self.enhydris = Enhydris(self.configuration, self.logging_system.logger)
self._process_stations()
self.enhydris = Enhydris(
self.configuration,
self.logging_system.logger,
self.insert_all is not None,
)
self._process_all_stations()
self.logging_system.log_end_of_execution()
except Exception as e:
self.logging_system.logger.error(str(e))
self.logging_system.logger.debug(traceback.format_exc())
raise click.ClickException(str(e))

def _process_stations(self):
def _process_all_stations(self):
config = self.configuration
for i, meteologger_storage in enumerate(config.meteologger_storages):
section = config.config.sections()[i + 1]
for sectionname, meteologger_storage in config.meteologger_storages.items():
section = config.config[sectionname]
try:
self.logging_system.logger.info(f"*** Processing item {section}")
self.logging_system.logger.info(f"*** Processing item {section.name}")
self.enhydris.upload(meteologger_storage)
self.logging_system.logger.info(f"Finished item {section}")
self.logging_system.logger.info(f"Finished item {section.name}")
except LoggerToDbError as e:
msg = f"Error while processing item {section}: {str(e)}"
msg = f"Error while processing item {section.name}: {str(e)}"
sys.stderr.write(msg + "\n")
self.logging_system.logger.error(msg)
self.logging_system.logger.debug(traceback.format_exc())
Expand All @@ -76,7 +85,7 @@ def __init__(self):
self.stdout_handler = logging.StreamHandler()
self.logger.addHandler(self.stdout_handler)

def setup_logger(self, configuration):
def setup_logger(self, configuration: "Configuration"):
self.logger.setLevel(configuration.loglevel.upper())
if configuration.logfile:
self.logger.removeHandler(self.stdout_handler)
Expand All @@ -92,13 +101,19 @@ def log_end_of_execution(self):


class Configuration:
def __init__(self, configfile, logging_system):
def __init__(
self,
configfile: str,
logging_system: Logging,
insert_all: tuple[str, str] | None,
):
self.logging_system = logging_system
self.configfile = configfile
self.config = configparser.ConfigParser(interpolation=None)
with open(self.configfile) as f:
self.config.read_file(f)
self.meteologger_storages = []
self.insert_all = insert_all
self.meteologger_storages: dict[str, Any] = {}

def read(self):
self._read_general_section()
Expand All @@ -110,8 +125,9 @@ def _read_general_section(self):
self.logfile = self.config.get("General", "logfile", fallback="")
self.loglevel = self.config.get("General", "loglevel", fallback="warning")
try:
fallback = 1_000_000_000 if self.insert_all else 10_000
self.max_records = self.config.getint(
"General", "max_records", fallback=10000
"General", "max_records", fallback=fallback
)
except ValueError:
raise WrongValueError("Wrong max_records: must be an integer")
Expand All @@ -122,20 +138,33 @@ def _read_general_section(self):
def _read_station_sections(self):
station_section_names = [n for n in self.config.sections() if n != "General"]
if not len(station_section_names):
raise configparser.NoSectionError("No stations have been specified")
for section_name in station_section_names:
section = self.config[section_name]
klassname = "MeteologgerStorage_" + section["storage_format"]
if not hasattr(meteologgerstorage, klassname):
raise UnsupportedFormat(section["storage_format"])
klass = getattr(meteologgerstorage, klassname)
self.meteologger_storages.append(
klass(
section,
max_records=self.max_records,
logger=self.logging_system.logger,
)
raise configparser.NoSectionError(
"No stations have been specified in the configuration file"
)
if self.insert_all:
insert_all_section, insert_all_filepath = self.insert_all
if insert_all_section not in station_section_names:
raise configparser.NoSectionError(
f"Section '{insert_all_section}' not found in configuration file"
)
section = self.config[insert_all_section]
section["path"] = insert_all_filepath
self._read_station_section(section)
else:
for section_name in station_section_names:
section = self.config[section_name]
self._read_station_section(section)

def _read_station_section(self, section: configparser.SectionProxy):
klassname = "MeteologgerStorage_" + section["storage_format"]
if not hasattr(meteologgerstorage, klassname):
raise UnsupportedFormat(section["storage_format"])
klass = getattr(meteologgerstorage, klassname)
self.meteologger_storages[section.name] = klass(
section,
max_records=self.max_records,
logger=self.logging_system.logger,
)


if __name__ == "__main__":
Expand Down
42 changes: 31 additions & 11 deletions loggertodb/enhydris.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
from __future__ import annotations

import datetime as dt
from collections import namedtuple
from logging import Logger
from typing import TYPE_CHECKING, NamedTuple

from enhydris_api_client import EnhydrisApiClient
from htimeseries import HTimeseries

CompositeTimeseriesId = namedtuple(
"CompositeTimeseriesId", ("timeseries_group_id", "timeseries_id")
)
from .meteologgerstorage import MeteologgerStorage

if TYPE_CHECKING:
from .cli import Configuration


class CompositeTimeseriesId(NamedTuple):
timeseries_group_id: int
timeseries_id: int


class Enhydris:
def __init__(self, configuration, logger):
def __init__(
self, configuration: Configuration, logger: Logger, use_insert_mode: bool
):
self.base_url = configuration.base_url
self.auth_token = configuration.auth_token
self.max_records = configuration.max_records
self.use_insert_mode = use_insert_mode
self.client = EnhydrisApiClient(self.base_url, self.auth_token)
self.logger = logger

def upload(self, meteologger_storage):
def upload(self, meteologger_storage: "MeteologgerStorage"):
self._meteologger_storage = meteologger_storage
self._get_composite_timeseries_ids()
self._get_ts_end_dates()
Expand All @@ -26,21 +38,21 @@ def upload(self, meteologger_storage):
def _get_composite_timeseries_ids(self):
"""Create a list of (timeseries_group_id, initial_timeseries_id) pairs."""
station_id = self._meteologger_storage.station_id
self._composite_timeseries_ids = []
self._composite_timeseries_ids: list[CompositeTimeseriesId] = []
for timeseries_group_id in self._meteologger_storage.timeseries_group_ids:
timeseries_id = self._get_timeseries_id(station_id, timeseries_group_id)
self._composite_timeseries_ids.append(
CompositeTimeseriesId(timeseries_group_id, timeseries_id)
)

def _get_timeseries_id(self, station_id, timeseries_group_id):
def _get_timeseries_id(self, station_id: int, timeseries_group_id: int) -> int:
timeseries = self.client.list_timeseries(station_id, timeseries_group_id)
for item in timeseries:
if item["type"] == "Initial":
return item["id"]
return self._create_timeseries(station_id, timeseries_group_id)

def _create_timeseries(self, station_id, timeseries_group_id):
def _create_timeseries(self, station_id: int, timeseries_group_id: int) -> int:
return self.client.post_timeseries(
station_id,
timeseries_group_id,
Expand All @@ -55,8 +67,11 @@ def _get_ts_end_dates(self):
station_id = self._meteologger_storage.station_id
start_of_time = dt.datetime(1700, 1, 1, tzinfo=dt.timezone.utc)
utc = dt.timezone.utc
self._ts_end_dates = {}
self._ts_end_dates: dict[CompositeTimeseriesId, dt.datetime] = {}
for cts_id in self._composite_timeseries_ids:
if self.use_insert_mode:
self._ts_end_dates[cts_id] = start_of_time
continue
Comment thread
aptiko marked this conversation as resolved.
e = self.client.get_ts_end_date(station_id, *cts_id, timezone="Etc/GMT")
assert e is None or e.tzinfo is None
if e:
Expand All @@ -78,7 +93,12 @@ def _upload_all_new_data(self):
f"Timeseries group {cts_id.timeseries_group_id}: "
f"uploading {len(new_data)} new records"
)
self.client.post_tsdata(station_id, *cts_id, HTimeseries(new_data))
self.client.post_tsdata(
station_id,
*cts_id,
HTimeseries(new_data),
mode="insert" if self.use_insert_mode else "append",
)
else:
self.logger.info(
f"Timeseries group {cts_id.timeseries_group_id}: no new records"
Expand Down
Loading