Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions doc/dev/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,14 @@ they result in the "checked" time series.)

The time series group to which this auto-process applies.

.. method:: execute()

Performs the auto-processing. It retrieves the new part of the
source time series (i.e. the part that starts after the last date
of the target time series) and calls the
:meth:`process_timeseries` method.
.. method:: execute(recalculate: bool)

Performs the auto-processing. If ``recalculate`` is ``False``, it
retrieves the new part of the source time series (i.e. the part that
starts after the last date of the target time series) and calls the
:meth:`process_timeseries` method. If ``recalculate`` is ``True``, it
deletes all records of the target timeseries first and therefore
recalculates its entirety.

.. attribute:: source_timeseries

Expand Down
9 changes: 8 additions & 1 deletion enhydris/autoprocess/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@


def enqueue_auto_process(sender, *, instance, **kwargs):
def enqueue_on_commit(auto_process_id: int, has_non_append_modifications: bool):
transaction.on_commit(
lambda: execute_auto_process.delay(
auto_process_id, has_non_append_modifications
Comment thread
aptiko marked this conversation as resolved.
)
)

auto_processes = [
auto_process
for auto_process in instance.timeseries_group.autoprocess_set.all()
if auto_process.as_specific_instance.source_timeseries == instance
]
for auto_process in auto_processes:
transaction.on_commit(lambda: execute_auto_process.delay(auto_process.id))
enqueue_on_commit(auto_process.id, instance.has_non_append_modifications)
Comment thread
aptiko marked this conversation as resolved.


class AutoprocessConfig(AppConfig):
Expand Down
35 changes: 23 additions & 12 deletions enhydris/autoprocess/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from django.db import DataError, IntegrityError, models, transaction
from django.db.models.signals import post_delete
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _

import numpy as np
Expand All @@ -27,10 +28,16 @@ class Meta:
db_table = "enhydris_autoprocess_autoprocess"
verbose_name_plural = _("Auto processes")

def execute(self):
def execute(self, recalculate: bool):
try:
result = self.process_timeseries()
self.target_timeseries.insert_or_append_data(result)
with transaction.atomic():
target_timeseries = self.target_timeseries
if recalculate:
target_timeseries.timeseriesrecord_set.all().delete()
target_timeseries.has_non_append_modifications = True
target_timeseries._invalidate_cached_data()
Comment thread
aptiko marked this conversation as resolved.
result = self.process_timeseries()
target_timeseries.insert_or_append_data(result)
except Exception as e:
msg = (
f"{e.__class__.__name__} while executing AutoProcess with "
Expand Down Expand Up @@ -74,14 +81,18 @@ def _get_start_date(self):

def save(self, *args, **kwargs):
result = super().save(*args, **kwargs)
transaction.on_commit(lambda: tasks.execute_auto_process.delay(self.id))
transaction.on_commit(
lambda: tasks.execute_auto_process.delay(
self.id, has_non_append_modifications=True
)
)
return result

@property
@cached_property
def source_timeseries(self):
raise NotImplementedError("This property is available only in subclasses")

@property
@cached_property
def target_timeseries(self):
raise NotImplementedError("This property is available only in subclasses")

Expand All @@ -108,14 +119,14 @@ class Meta:
def __str__(self):
return _("Checks for {}").format(str(self.timeseries_group))

@property
@cached_property
def source_timeseries(self):
obj, created = self.timeseries_group.timeseries_set.get_or_create(
type=Timeseries.INITIAL
)
return obj

@property
@cached_property
def target_timeseries(self):
obj, created = self.timeseries_group.timeseries_set.get_or_create(
type=Timeseries.CHECKED,
Expand Down Expand Up @@ -319,7 +330,7 @@ class Meta:
def __str__(self):
return f"=> {self.target_timeseries_group}"

@property
@cached_property
def source_timeseries(self):
try:
return self.timeseries_group.timeseries_set.get(type=Timeseries.CHECKED)
Expand All @@ -330,7 +341,7 @@ def source_timeseries(self):
)
return obj

@property
@cached_property
def target_timeseries(self):
obj, created = self.target_timeseries_group.timeseries_set.get_or_create(
type=Timeseries.INITIAL
Expand Down Expand Up @@ -460,7 +471,7 @@ class Meta:
def __str__(self):
return _("Aggregation for {}").format(str(self.timeseries_group))

@property
@cached_property
def source_timeseries(self):
try:
return self.timeseries_group.timeseries_set.get(type=Timeseries.CHECKED)
Expand All @@ -470,7 +481,7 @@ def source_timeseries(self):
)
return obj

@property
@cached_property
def target_timeseries(self):
obj, created = self.timeseries_group.timeseries_set.get_or_create(
type=Timeseries.AGGREGATED,
Expand Down
5 changes: 3 additions & 2 deletions enhydris/autoprocess/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@


@app.task
def execute_auto_process(auto_process_id):
def execute_auto_process(auto_process_id, has_non_append_modifications: bool):
from .models import AutoProcess

Comment thread
aptiko marked this conversation as resolved.
AutoProcess.objects.get(id=auto_process_id).as_specific_instance.execute()
auto_process = AutoProcess.objects.get(id=auto_process_id).as_specific_instance
auto_process.execute(recalculate=has_non_append_modifications)
21 changes: 20 additions & 1 deletion enhydris/autoprocess/tests/test_apps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from io import StringIO
from unittest import mock

from django.contrib.gis.geos import Point
Expand Down Expand Up @@ -38,10 +39,28 @@ def setUp(self, m1, m2):
def test_enqueues_auto_process(self, m):
with transaction.atomic():
self.timeseries.save()
m.delay.assert_any_call(self.auto_process.id)
m.delay.assert_called_once_with(self.auto_process.id, False)

@mock.patch("enhydris.autoprocess.apps.execute_auto_process")
def test_auto_process_is_not_triggered_before_commit(self, m):
with transaction.atomic():
self.timeseries.save()
m.delay.assert_not_called()

@mock.patch("enhydris.autoprocess.apps.execute_auto_process")
def test_inserting_data_enqueues_task_with_non_append_modifications_true(self, m):
self.timeseries.insert_or_append_data(
StringIO("2020-01-01 00:10,15,\n"),
default_timezone="UTC",
append_only=False,
)
m.delay.assert_called_once_with(self.auto_process.id, True)

@mock.patch("enhydris.autoprocess.apps.execute_auto_process")
def test_appending_data_enqueues_task_with_non_append_modifications_false(self, m):
self.timeseries.insert_or_append_data(
StringIO("2020-01-01 00:10,15,\n"),
default_timezone="UTC",
append_only=True,
)
m.delay.assert_called_once_with(self.auto_process.id, False)
10 changes: 5 additions & 5 deletions enhydris/autoprocess/tests/test_models/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def setUp(self):

def test_initial_target_timeseries(self):
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()
aggregation.execute(recalculate=False)
actual_data = aggregation.target_timeseries.get_data().data
expected_data = pd.DataFrame(
data={"value": [21.0, 34.0], "flags": ["", "MISSING2"]},
Expand All @@ -438,11 +438,11 @@ def test_initial_target_timeseries(self):
pd.testing.assert_frame_equal(actual_data, expected_data)

def test_updated_target_timeseries(self):
Aggregation.objects.get(id=self.aggregation_id).execute()
Aggregation.objects.get(id=self.aggregation_id).execute(recalculate=False)

self._extend_source_timeseries()
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()
aggregation.execute(recalculate=False)

ahtimeseries = aggregation.target_timeseries.get_data()
expected_data = pd.DataFrame(
Expand Down Expand Up @@ -514,13 +514,13 @@ def setUp(self):

def test_result(self, m):
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()
aggregation.execute(recalculate=False)
actual_data = aggregation.target_timeseries.get_data().data
self.assertEqual(len(actual_data), 0)

def test_log(self, logging_mock):
aggregation = Aggregation.objects.get(id=self.aggregation_id)
aggregation.execute()
aggregation.execute(recalculate=False)
logging_mock.getLogger.return_value.error.assert_called_once_with(
"Need at least 3 dates to infer frequency"
)
38 changes: 31 additions & 7 deletions enhydris/autoprocess/tests/test_models/test_autoprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def test_save_triggers_auto_process(self):
with transaction.atomic():
auto_process = baker.make(Checks, timeseries_group=self.timeseries_group)
auto_process.save()
tasks.execute_auto_process.delay.assert_any_call(auto_process.id)
tasks.execute_auto_process.delay.assert_any_call(
auto_process.id, has_non_append_modifications=True
)

def test_auto_process_is_not_triggered_before_commit(self):
with transaction.atomic():
Expand All @@ -88,11 +90,11 @@ def setUp(self):
self.range_check = baker.make(RangeCheck, checks=self.checks)

def test_called_once(self, m):
self.checks.execute()
self.checks.execute(recalculate=False)
self.assertEqual(len(m.mock_calls), 1)

def test_called_with_empty_content(self, m):
self.checks.execute()
self.checks.execute(recalculate=False)
self.assertEqual(len(self.checks.htimeseries.data), 0)

def test_critical_error(self, m):
Expand All @@ -101,16 +103,16 @@ def test_critical_error(self, m):
f"^ValueError while executing AutoProcess with id={self.checks.id}: hello$"
)
with self.assertRaisesRegex(RuntimeError, msg):
self.checks.execute()
self.checks.execute(recalculate=False)


class AutoProcessExecuteDealsOnlyWithNewerTimeseriesPartTestCase(TestCase):
class AutoProcessRecalculateTestCaseBase(TestCase):
@mock.patch(
"enhydris.autoprocess.models.Checks.process_timeseries",
side_effect=lambda self: self.htimeseries,
autospec=True,
)
def setUp(self, m):
def setUp(self, m: mock.MagicMock):
self.mock_process_timeseries = m
station = baker.make(Station, display_timezone="Etc/GMT-2")
self.timeseries_group = baker.make(
Expand Down Expand Up @@ -149,7 +151,11 @@ def setUp(self, m):
)
self.checks = baker.make(Checks, timeseries_group=self.timeseries_group)
self.range_check = baker.make(RangeCheck, checks=self.checks)
self.checks.execute()
self.checks.execute(recalculate=self.recalculate)


class AutoProcessExecuteRecalculateFalseTestCase(AutoProcessRecalculateTestCaseBase):
recalculate = False

def test_called_once(self):
self.assertEqual(len(self.mock_process_timeseries.mock_calls), 1)
Expand Down Expand Up @@ -181,3 +187,21 @@ def test_appended_the_data(self):
pd.testing.assert_frame_equal(
self.target_timeseries.get_data().data, expected_result
)


class AutoProcessExecuteRecalculateTestCase(AutoProcessRecalculateTestCaseBase):
recalculate = True

def test_called_with_entire_source_timeseries(self):
expected_arg = pd.DataFrame(
data={"value": [1.0, 2.0, 3.0, 4.0], "flags": ["", "", "", ""]},
columns=["value", "flags"],
index=[
dt.datetime(2019, 5, 21, 17, 0, tzinfo=get_tzinfo("Etc/GMT-2")),
dt.datetime(2019, 5, 21, 17, 10, tzinfo=get_tzinfo("Etc/GMT-2")),
dt.datetime(2019, 5, 21, 17, 20, tzinfo=get_tzinfo("Etc/GMT-2")),
dt.datetime(2019, 5, 21, 17, 30, tzinfo=get_tzinfo("Etc/GMT-2")),
],
)
expected_arg.index.name = "date"
pd.testing.assert_frame_equal(self.checks.htimeseries.data, expected_arg)
2 changes: 1 addition & 1 deletion enhydris/autoprocess/tests/test_models/test_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_runs_range_check(self, m1, m2):
checks__timeseries_group__gentity=station,
checks__timeseries_group__variable__descr="Temperature",
)
range_check.checks.execute()
range_check.checks.execute(recalculate=False)
m2.assert_called_once()

def test_no_extra_queries_for_str(self):
Expand Down
29 changes: 29 additions & 0 deletions enhydris/autoprocess/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from io import StringIO
from unittest import mock

from django.test import TestCase

from model_bakery import baker

from enhydris.autoprocess.models import Checks
from enhydris.autoprocess.tasks import execute_auto_process


class ExecuteAutoProcessTaskTestCase(TestCase):
@mock.patch("enhydris.autoprocess.models.tasks.execute_auto_process")
def setUp(self, m):
self.auto_process = baker.make(Checks)
self.target_timeseries = self.auto_process.target_timeseries
self.target_timeseries.insert_or_append_data(
StringIO("2020-01-01 00:00,10,\n"), default_timezone="UTC"
)

@mock.patch("enhydris.autoprocess.models.Checks.execute")
def test_calls_execute_with_recalculate_true(self, m):
execute_auto_process(self.auto_process.id, has_non_append_modifications=True)
m.assert_called_once_with(recalculate=True)

@mock.patch("enhydris.autoprocess.models.Checks.execute")
def test_calls_execute_with_recalculate_false(self, m):
execute_auto_process(self.auto_process.id, has_non_append_modifications=False)
m.assert_called_once_with(recalculate=False)
13 changes: 12 additions & 1 deletion enhydris/models/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def get_default_publicly_available():
class Timeseries(models.Model):
timeseriesrecord_set: models.Manager["TimeseriesRecord"]

# When the calculation of derivative time series (e.g. checked, regularized,
# aggregated) is triggered, it is performed on the newer data only; the older data
# is left as is. This does not work if the modification of the source time series
# (i.e. this timeseries) is anything other than append (e.g. if data is inserted in
# the middle of the time series, or if some records are deleted). In this case, the
# derivative time series should be fully recalculated. The
# has_non_append_modifications attribute is used to signal this case.
has_non_append_modifications: bool = False
Comment thread
aptiko marked this conversation as resolved.

INITIAL = 100
CHECKED = 200
REGULARIZED = 300
Expand Down Expand Up @@ -290,7 +299,7 @@ def set_data(
default_timezone: str | None = None,
):
self.timeseriesrecord_set.all().delete()
return self.insert_or_append_data(data, default_timezone)
return self.insert_or_append_data(data, default_timezone, append_only=False)

def insert_or_append_data(
self,
Expand All @@ -299,6 +308,8 @@ def insert_or_append_data(
append_only: bool = True,
):
ahtimeseries = self._get_htimeseries_from_data(data, default_timezone)
if not append_only:
self.has_non_append_modifications = True
Comment thread
aptiko marked this conversation as resolved.
if append_only:
self._check_new_data_is_newer(ahtimeseries)
Comment thread
aptiko marked this conversation as resolved.
result = TimeseriesRecord.bulk_insert(self, ahtimeseries)
Expand Down
Loading