diff --git a/doc/dev/database.rst b/doc/dev/database.rst index c914b771..703c117a 100644 --- a/doc/dev/database.rst +++ b/doc/dev/database.rst @@ -534,12 +534,14 @@ they result in the "checked" time series.) The time series group to which this auto-process applies. - .. method:: execute() - - Performs the auto-processing. It retrieves the new part of the - source time series (i.e. the part that starts after the last date - of the target time series) and calls the - :meth:`process_timeseries` method. + .. method:: execute(recalculate: bool) + + Performs the auto-processing. If ``recalculate`` is ``False``, it + retrieves the new part of the source time series (i.e. the part that + starts after the last date of the target time series) and calls the + :meth:`process_timeseries` method. If ``recalculate`` is ``True``, it + deletes all records of the target timeseries first and therefore + recalculates its entirety. .. attribute:: source_timeseries diff --git a/enhydris/autoprocess/apps.py b/enhydris/autoprocess/apps.py index dba92c36..3afb4dba 100644 --- a/enhydris/autoprocess/apps.py +++ b/enhydris/autoprocess/apps.py @@ -6,13 +6,20 @@ def enqueue_auto_process(sender, *, instance, **kwargs): + def enqueue_on_commit(auto_process_id: int, has_non_append_modifications: bool): + transaction.on_commit( + lambda: execute_auto_process.delay( + auto_process_id, has_non_append_modifications + ) + ) + auto_processes = [ auto_process for auto_process in instance.timeseries_group.autoprocess_set.all() if auto_process.as_specific_instance.source_timeseries == instance ] for auto_process in auto_processes: - transaction.on_commit(lambda: execute_auto_process.delay(auto_process.id)) + enqueue_on_commit(auto_process.id, instance.has_non_append_modifications) class AutoprocessConfig(AppConfig): diff --git a/enhydris/autoprocess/models.py b/enhydris/autoprocess/models.py index e593f313..76a6bd70 100644 --- a/enhydris/autoprocess/models.py +++ b/enhydris/autoprocess/models.py @@ -6,6 +6,7 @@ from django.db import DataError, IntegrityError, models, transaction from django.db.models.signals import post_delete +from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ import numpy as np @@ -27,10 +28,16 @@ class Meta: db_table = "enhydris_autoprocess_autoprocess" verbose_name_plural = _("Auto processes") - def execute(self): + def execute(self, recalculate: bool): try: - result = self.process_timeseries() - self.target_timeseries.insert_or_append_data(result) + with transaction.atomic(): + target_timeseries = self.target_timeseries + if recalculate: + target_timeseries.timeseriesrecord_set.all().delete() + target_timeseries.has_non_append_modifications = True + target_timeseries._invalidate_cached_data() + result = self.process_timeseries() + target_timeseries.insert_or_append_data(result) except Exception as e: msg = ( f"{e.__class__.__name__} while executing AutoProcess with " @@ -74,14 +81,18 @@ def _get_start_date(self): def save(self, *args, **kwargs): result = super().save(*args, **kwargs) - transaction.on_commit(lambda: tasks.execute_auto_process.delay(self.id)) + transaction.on_commit( + lambda: tasks.execute_auto_process.delay( + self.id, has_non_append_modifications=True + ) + ) return result - @property + @cached_property def source_timeseries(self): raise NotImplementedError("This property is available only in subclasses") - @property + @cached_property def target_timeseries(self): raise NotImplementedError("This property is available only in subclasses") @@ -108,14 +119,14 @@ class Meta: def __str__(self): return _("Checks for {}").format(str(self.timeseries_group)) - @property + @cached_property def source_timeseries(self): obj, created = self.timeseries_group.timeseries_set.get_or_create( type=Timeseries.INITIAL ) return obj - @property + @cached_property def target_timeseries(self): obj, created = self.timeseries_group.timeseries_set.get_or_create( type=Timeseries.CHECKED, @@ -319,7 +330,7 @@ class Meta: def __str__(self): return f"=> {self.target_timeseries_group}" - @property + @cached_property def source_timeseries(self): try: return self.timeseries_group.timeseries_set.get(type=Timeseries.CHECKED) @@ -330,7 +341,7 @@ def source_timeseries(self): ) return obj - @property + @cached_property def target_timeseries(self): obj, created = self.target_timeseries_group.timeseries_set.get_or_create( type=Timeseries.INITIAL @@ -460,7 +471,7 @@ class Meta: def __str__(self): return _("Aggregation for {}").format(str(self.timeseries_group)) - @property + @cached_property def source_timeseries(self): try: return self.timeseries_group.timeseries_set.get(type=Timeseries.CHECKED) @@ -470,7 +481,7 @@ def source_timeseries(self): ) return obj - @property + @cached_property def target_timeseries(self): obj, created = self.timeseries_group.timeseries_set.get_or_create( type=Timeseries.AGGREGATED, diff --git a/enhydris/autoprocess/tasks.py b/enhydris/autoprocess/tasks.py index 1d6e991e..91d3606f 100644 --- a/enhydris/autoprocess/tasks.py +++ b/enhydris/autoprocess/tasks.py @@ -2,7 +2,8 @@ @app.task -def execute_auto_process(auto_process_id): +def execute_auto_process(auto_process_id, has_non_append_modifications: bool): from .models import AutoProcess - AutoProcess.objects.get(id=auto_process_id).as_specific_instance.execute() + auto_process = AutoProcess.objects.get(id=auto_process_id).as_specific_instance + auto_process.execute(recalculate=has_non_append_modifications) diff --git a/enhydris/autoprocess/tests/test_apps.py b/enhydris/autoprocess/tests/test_apps.py index 17c73eaf..5f4fb388 100644 --- a/enhydris/autoprocess/tests/test_apps.py +++ b/enhydris/autoprocess/tests/test_apps.py @@ -1,3 +1,4 @@ +from io import StringIO from unittest import mock from django.contrib.gis.geos import Point @@ -38,10 +39,28 @@ def setUp(self, m1, m2): def test_enqueues_auto_process(self, m): with transaction.atomic(): self.timeseries.save() - m.delay.assert_any_call(self.auto_process.id) + m.delay.assert_called_once_with(self.auto_process.id, False) @mock.patch("enhydris.autoprocess.apps.execute_auto_process") def test_auto_process_is_not_triggered_before_commit(self, m): with transaction.atomic(): self.timeseries.save() m.delay.assert_not_called() + + @mock.patch("enhydris.autoprocess.apps.execute_auto_process") + def test_inserting_data_enqueues_task_with_non_append_modifications_true(self, m): + self.timeseries.insert_or_append_data( + StringIO("2020-01-01 00:10,15,\n"), + default_timezone="UTC", + append_only=False, + ) + m.delay.assert_called_once_with(self.auto_process.id, True) + + @mock.patch("enhydris.autoprocess.apps.execute_auto_process") + def test_appending_data_enqueues_task_with_non_append_modifications_false(self, m): + self.timeseries.insert_or_append_data( + StringIO("2020-01-01 00:10,15,\n"), + default_timezone="UTC", + append_only=True, + ) + m.delay.assert_called_once_with(self.auto_process.id, False) diff --git a/enhydris/autoprocess/tests/test_models/test_aggregation.py b/enhydris/autoprocess/tests/test_models/test_aggregation.py index a5fbd481..0fa675da 100644 --- a/enhydris/autoprocess/tests/test_models/test_aggregation.py +++ b/enhydris/autoprocess/tests/test_models/test_aggregation.py @@ -424,7 +424,7 @@ def setUp(self): def test_initial_target_timeseries(self): aggregation = Aggregation.objects.get(id=self.aggregation_id) - aggregation.execute() + aggregation.execute(recalculate=False) actual_data = aggregation.target_timeseries.get_data().data expected_data = pd.DataFrame( data={"value": [21.0, 34.0], "flags": ["", "MISSING2"]}, @@ -438,11 +438,11 @@ def test_initial_target_timeseries(self): pd.testing.assert_frame_equal(actual_data, expected_data) def test_updated_target_timeseries(self): - Aggregation.objects.get(id=self.aggregation_id).execute() + Aggregation.objects.get(id=self.aggregation_id).execute(recalculate=False) self._extend_source_timeseries() aggregation = Aggregation.objects.get(id=self.aggregation_id) - aggregation.execute() + aggregation.execute(recalculate=False) ahtimeseries = aggregation.target_timeseries.get_data() expected_data = pd.DataFrame( @@ -514,13 +514,13 @@ def setUp(self): def test_result(self, m): aggregation = Aggregation.objects.get(id=self.aggregation_id) - aggregation.execute() + aggregation.execute(recalculate=False) actual_data = aggregation.target_timeseries.get_data().data self.assertEqual(len(actual_data), 0) def test_log(self, logging_mock): aggregation = Aggregation.objects.get(id=self.aggregation_id) - aggregation.execute() + aggregation.execute(recalculate=False) logging_mock.getLogger.return_value.error.assert_called_once_with( "Need at least 3 dates to infer frequency" ) diff --git a/enhydris/autoprocess/tests/test_models/test_autoprocess.py b/enhydris/autoprocess/tests/test_models/test_autoprocess.py index e3c9c58a..15987bf2 100644 --- a/enhydris/autoprocess/tests/test_models/test_autoprocess.py +++ b/enhydris/autoprocess/tests/test_models/test_autoprocess.py @@ -62,7 +62,9 @@ def test_save_triggers_auto_process(self): with transaction.atomic(): auto_process = baker.make(Checks, timeseries_group=self.timeseries_group) auto_process.save() - tasks.execute_auto_process.delay.assert_any_call(auto_process.id) + tasks.execute_auto_process.delay.assert_any_call( + auto_process.id, has_non_append_modifications=True + ) def test_auto_process_is_not_triggered_before_commit(self): with transaction.atomic(): @@ -88,11 +90,11 @@ def setUp(self): self.range_check = baker.make(RangeCheck, checks=self.checks) def test_called_once(self, m): - self.checks.execute() + self.checks.execute(recalculate=False) self.assertEqual(len(m.mock_calls), 1) def test_called_with_empty_content(self, m): - self.checks.execute() + self.checks.execute(recalculate=False) self.assertEqual(len(self.checks.htimeseries.data), 0) def test_critical_error(self, m): @@ -101,16 +103,16 @@ def test_critical_error(self, m): f"^ValueError while executing AutoProcess with id={self.checks.id}: hello$" ) with self.assertRaisesRegex(RuntimeError, msg): - self.checks.execute() + self.checks.execute(recalculate=False) -class AutoProcessExecuteDealsOnlyWithNewerTimeseriesPartTestCase(TestCase): +class AutoProcessRecalculateTestCaseBase(TestCase): @mock.patch( "enhydris.autoprocess.models.Checks.process_timeseries", side_effect=lambda self: self.htimeseries, autospec=True, ) - def setUp(self, m): + def setUp(self, m: mock.MagicMock): self.mock_process_timeseries = m station = baker.make(Station, display_timezone="Etc/GMT-2") self.timeseries_group = baker.make( @@ -149,7 +151,11 @@ def setUp(self, m): ) self.checks = baker.make(Checks, timeseries_group=self.timeseries_group) self.range_check = baker.make(RangeCheck, checks=self.checks) - self.checks.execute() + self.checks.execute(recalculate=self.recalculate) + + +class AutoProcessExecuteRecalculateFalseTestCase(AutoProcessRecalculateTestCaseBase): + recalculate = False def test_called_once(self): self.assertEqual(len(self.mock_process_timeseries.mock_calls), 1) @@ -181,3 +187,21 @@ def test_appended_the_data(self): pd.testing.assert_frame_equal( self.target_timeseries.get_data().data, expected_result ) + + +class AutoProcessExecuteRecalculateTestCase(AutoProcessRecalculateTestCaseBase): + recalculate = True + + def test_called_with_entire_source_timeseries(self): + expected_arg = pd.DataFrame( + data={"value": [1.0, 2.0, 3.0, 4.0], "flags": ["", "", "", ""]}, + columns=["value", "flags"], + index=[ + dt.datetime(2019, 5, 21, 17, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 17, 10, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 17, 20, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 17, 30, tzinfo=get_tzinfo("Etc/GMT-2")), + ], + ) + expected_arg.index.name = "date" + pd.testing.assert_frame_equal(self.checks.htimeseries.data, expected_arg) diff --git a/enhydris/autoprocess/tests/test_models/test_checks.py b/enhydris/autoprocess/tests/test_models/test_checks.py index db03e16d..60e332ab 100644 --- a/enhydris/autoprocess/tests/test_models/test_checks.py +++ b/enhydris/autoprocess/tests/test_models/test_checks.py @@ -97,7 +97,7 @@ def test_runs_range_check(self, m1, m2): checks__timeseries_group__gentity=station, checks__timeseries_group__variable__descr="Temperature", ) - range_check.checks.execute() + range_check.checks.execute(recalculate=False) m2.assert_called_once() def test_no_extra_queries_for_str(self): diff --git a/enhydris/autoprocess/tests/test_tasks.py b/enhydris/autoprocess/tests/test_tasks.py new file mode 100644 index 00000000..dbe6611d --- /dev/null +++ b/enhydris/autoprocess/tests/test_tasks.py @@ -0,0 +1,29 @@ +from io import StringIO +from unittest import mock + +from django.test import TestCase + +from model_bakery import baker + +from enhydris.autoprocess.models import Checks +from enhydris.autoprocess.tasks import execute_auto_process + + +class ExecuteAutoProcessTaskTestCase(TestCase): + @mock.patch("enhydris.autoprocess.models.tasks.execute_auto_process") + def setUp(self, m): + self.auto_process = baker.make(Checks) + self.target_timeseries = self.auto_process.target_timeseries + self.target_timeseries.insert_or_append_data( + StringIO("2020-01-01 00:00,10,\n"), default_timezone="UTC" + ) + + @mock.patch("enhydris.autoprocess.models.Checks.execute") + def test_calls_execute_with_recalculate_true(self, m): + execute_auto_process(self.auto_process.id, has_non_append_modifications=True) + m.assert_called_once_with(recalculate=True) + + @mock.patch("enhydris.autoprocess.models.Checks.execute") + def test_calls_execute_with_recalculate_false(self, m): + execute_auto_process(self.auto_process.id, has_non_append_modifications=False) + m.assert_called_once_with(recalculate=False) diff --git a/enhydris/models/timeseries.py b/enhydris/models/timeseries.py index 0e9a1f9d..65bf65ae 100644 --- a/enhydris/models/timeseries.py +++ b/enhydris/models/timeseries.py @@ -76,6 +76,15 @@ def get_default_publicly_available(): class Timeseries(models.Model): timeseriesrecord_set: models.Manager["TimeseriesRecord"] + # When the calculation of derivative time series (e.g. checked, regularized, + # aggregated) is triggered, it is performed on the newer data only; the older data + # is left as is. This does not work if the modification of the source time series + # (i.e. this timeseries) is anything other than append (e.g. if data is inserted in + # the middle of the time series, or if some records are deleted). In this case, the + # derivative time series should be fully recalculated. The + # has_non_append_modifications attribute is used to signal this case. + has_non_append_modifications: bool = False + INITIAL = 100 CHECKED = 200 REGULARIZED = 300 @@ -290,7 +299,7 @@ def set_data( default_timezone: str | None = None, ): self.timeseriesrecord_set.all().delete() - return self.insert_or_append_data(data, default_timezone) + return self.insert_or_append_data(data, default_timezone, append_only=False) def insert_or_append_data( self, @@ -299,6 +308,8 @@ def insert_or_append_data( append_only: bool = True, ): ahtimeseries = self._get_htimeseries_from_data(data, default_timezone) + if not append_only: + self.has_non_append_modifications = True if append_only: self._check_new_data_is_newer(ahtimeseries) result = TimeseriesRecord.bulk_insert(self, ahtimeseries) diff --git a/enhydris/tests/test_models/test_timeseries.py b/enhydris/tests/test_models/test_timeseries.py index e7afce4a..06b4d9f6 100644 --- a/enhydris/tests/test_models/test_timeseries.py +++ b/enhydris/tests/test_models/test_timeseries.py @@ -15,7 +15,7 @@ from model_bakery import baker from enhydris import models -from enhydris.tests import ClearCacheMixin, TestTimeseriesMixin +from enhydris.tests import ClearCacheMixin, TestTimeseriesMixin, TimeseriesDataMixin def get_tzinfo(tzname: str): @@ -714,6 +714,41 @@ def test_disallows_overwrite(self): ) +class TimeseriesHasNonAppendModificationsTestCase(TimeseriesDataMixin, TestCase): + def setUp(self): + self.create_timeseries() + + def test_default_value_is_false(self): + timeseries = baker.make(models.Timeseries) + self.assertFalse(timeseries.has_non_append_modifications) + + def test_append_keeps_value_false(self): + self.timeseries.has_non_append_modifications = False + self.timeseries.insert_or_append_data( + StringIO("2019-01-01 01:00,43,\n"), + default_timezone=self.timezone, + append_only=True, + ) + self.assertFalse(self.timeseries.has_non_append_modifications) + + def test_insert_sets_value_true(self): + self.timeseries.has_non_append_modifications = False + self.timeseries.insert_or_append_data( + StringIO("2019-01-01 01:00,43,\n"), + default_timezone=self.timezone, + append_only=False, + ) + self.assertTrue(self.timeseries.has_non_append_modifications) + + def test_set_data_sets_value_true(self): + self.timeseries.has_non_append_modifications = False + self.timeseries.set_data( + StringIO("2018-01-01 00:00,42,\n"), + default_timezone=self.timezone, + ) + self.assertTrue(self.timeseries.has_non_append_modifications) + + class TimeseriesGetLastRecordAsStringTestCase(TestTimeseriesMixin, TestCase): def test_when_record_exists(self): self._create_test_timeseries("2017-11-23 17:23,1,\n2018-11-25 01:00,2,\n")