Skip to content

Commit

Permalink
Merge pull request #28952 from dimagi/nh/rep/sqlrepeatrecord
Browse files Browse the repository at this point in the history
SQLRepeatRecord
  • Loading branch information
kaapstorm authored Jan 8, 2021
2 parents 0a5391e + 85f8d7f commit 7e5a03c
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 9 deletions.
3 changes: 3 additions & 0 deletions corehq/apps/domain/deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ def _delete_demo_user_restores(domain_name):
ModelDeletion('dhis2', 'Dhis2Connection', 'domain'),
ModelDeletion('motech', 'RequestLog', 'domain'),
ModelDeletion('motech', 'ConnectionSettings', 'domain'),
ModelDeletion('repeaters', 'RepeaterStub', 'domain'),
ModelDeletion('repeaters', 'SQLRepeatRecord', 'domain'),
ModelDeletion('repeaters', 'SQLRepeatRecordAttempt', 'repeat_record__domain'),
ModelDeletion('couchforms', 'UnfinishedSubmissionStub', 'domain'),
ModelDeletion('couchforms', 'UnfinishedArchiveStub', 'domain'),
CustomDeletion('ucr', delete_all_ucr_tables_for_domain, []),
Expand Down
48 changes: 43 additions & 5 deletions corehq/apps/domain/tests/test_delete_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from decimal import Decimal
from io import BytesIO

from dateutil.relativedelta import relativedelta
from django.contrib.auth.models import User
from django.core.management import call_command
from django.test import TestCase, override_settings

from dateutil.relativedelta import relativedelta
from mock import patch

from casexml.apps.case.mock import CaseFactory
Expand All @@ -17,6 +18,8 @@
StockReport,
StockTransaction,
)
from couchforms.models import UnfinishedSubmissionStub

from corehq.apps.accounting.models import (
BillingAccount,
CreditLine,
Expand All @@ -33,10 +36,12 @@
)
from corehq.apps.app_manager.models import (
AppReleaseByLocation,
LatestEnabledBuildProfiles,
GlobalAppConfig,
LatestEnabledBuildProfiles,
)
from corehq.apps.app_manager.suite_xml.post_process.resources import (
ResourceOverride,
)
from corehq.apps.app_manager.suite_xml.post_process.resources import ResourceOverride
from corehq.apps.case_importer.tracking.models import (
CaseUploadFormRecord,
CaseUploadRecord,
Expand Down Expand Up @@ -98,7 +103,7 @@
from corehq.apps.users.models import DomainRequest, Invitation
from corehq.apps.zapier.consts import EventTypes
from corehq.apps.zapier.models import ZapierSubscription
from corehq.blobs import NotFound, get_blob_db, CODES
from corehq.blobs import CODES, NotFound, get_blob_db
from corehq.form_processor.backends.sql.dbaccessors import (
CaseAccessorSQL,
FormAccessorSQL,
Expand All @@ -111,7 +116,12 @@
from corehq.form_processor.models import XFormInstanceSQL
from corehq.form_processor.tests.utils import create_form_for_test
from corehq.motech.models import RequestLog
from couchforms.models import UnfinishedSubmissionStub
from corehq.motech.repeaters.const import RECORD_SUCCESS_STATE
from corehq.motech.repeaters.models import (
RepeaterStub,
SQLRepeatRecord,
SQLRepeatRecordAttempt,
)
from settings import HQ_ACCOUNT_ROOT


Expand Down Expand Up @@ -901,6 +911,34 @@ def test_motech_delete(self):
self._assert_motech_count(self.domain.name, 0)
self._assert_motech_count(self.domain2.name, 1)

def _assert_repeaters_count(self, domain_name, count):
self._assert_queryset_count([
RepeaterStub.objects.filter(domain=domain_name),
SQLRepeatRecord.objects.filter(domain=domain_name),
SQLRepeatRecordAttempt.objects.filter(repeat_record__domain=domain_name),
], count)

def test_repeaters_delete(self):
for domain_name in [self.domain.name, self.domain2.name]:
stub = RepeaterStub.objects.create(
domain=domain_name,
repeater_id=str(uuid.uuid4()),
)
record = stub.repeat_records.create(
domain=domain_name,
payload_id=str(uuid.uuid4()),
registered_at=datetime.utcnow(),
)
record.sqlrepeatrecordattempt_set.create(
state=RECORD_SUCCESS_STATE,
)
self._assert_repeaters_count(domain_name, 1)

self.domain.delete()

self._assert_repeaters_count(self.domain.name, 0)
self._assert_repeaters_count(self.domain2.name, 1)

def _assert_couchforms_counts(self, domain_name, count):
self._assert_queryset_count([
UnfinishedSubmissionStub.objects.filter(domain=domain_name)
Expand Down
6 changes: 6 additions & 0 deletions corehq/apps/dump_reload/sql/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
FilteredModelIteratorBuilder('case_importer.CaseUploadFileMeta', SimpleFilter('caseuploadrecord__domain')),
FilteredModelIteratorBuilder('case_importer.CaseUploadFormRecord', SimpleFilter('case_upload_record__domain')),
FilteredModelIteratorBuilder('case_importer.CaseUploadRecord', SimpleFilter('domain')),
FilteredModelIteratorBuilder('motech.ConnectionSettings', SimpleFilter('domain')),
FilteredModelIteratorBuilder('repeaters.RepeaterStub', SimpleFilter('domain')),
# NH (2021-01-08): Including SQLRepeatRecord because we dump (Couch)
# RepeatRecord, but this does not seem like a good idea.
FilteredModelIteratorBuilder('repeaters.SQLRepeatRecord', SimpleFilter('domain')),
FilteredModelIteratorBuilder('repeaters.SQLRepeatRecordAttempt', SimpleFilter('repeat_record__domain')),
FilteredModelIteratorBuilder('translations.SMSTranslations', SimpleFilter('domain')),
FilteredModelIteratorBuilder('translations.TransifexBlacklist', SimpleFilter('domain')),
UniqueFilteredModelIteratorBuilder('translations.TransifexOrganization', SimpleFilter('transifexproject__domain')),
Expand Down
7 changes: 7 additions & 0 deletions corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
from django.utils.translation import gettext_lazy as _

MAX_RETRY_WAIT = timedelta(days=7)
MIN_RETRY_WAIT = timedelta(minutes=60)
Expand All @@ -9,3 +10,9 @@
RECORD_SUCCESS_STATE = 'SUCCESS'
RECORD_FAILURE_STATE = 'FAIL'
RECORD_CANCELLED_STATE = 'CANCELLED'
RECORD_STATES = [
(RECORD_PENDING_STATE, _('Pending')),
(RECORD_SUCCESS_STATE, _('Succeeded')),
(RECORD_FAILURE_STATE, _('Failed')),
(RECORD_CANCELLED_STATE, _('Cancelled')),
]
117 changes: 117 additions & 0 deletions corehq/motech/repeaters/migrations/0002_sqlrepeatrecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone


class Migration(migrations.Migration):

initial = True

dependencies = [
('repeaters', '0001_adjust_auth_field_format'),
]

operations = [
migrations.CreateModel(
name='RepeaterStub',
fields=[
('id', models.AutoField(auto_created=True,
primary_key=True,
serialize=False,
verbose_name='ID')),
('domain', models.CharField(max_length=126)),
('repeater_id', models.CharField(max_length=36)),
('is_paused', models.BooleanField(default=False)),
('next_attempt_at', models.DateTimeField(blank=True, null=True)),
('last_attempt_at', models.DateTimeField(blank=True, null=True)),
],
),
migrations.CreateModel(
name='SQLRepeatRecord',
fields=[
('id', models.AutoField(auto_created=True,
primary_key=True,
serialize=False,
verbose_name='ID')),
('domain', models.CharField(max_length=126)),
('couch_id', models.CharField(blank=True, max_length=36, null=True)),
('payload_id', models.CharField(max_length=36)),
('state', models.TextField(
choices=[
('PENDING', 'Pending'),
('SUCCESS', 'Succeeded'),
('FAIL', 'Failed'),
('CANCELLED', 'Cancelled'),
],
default='PENDING',
)),
('registered_at', models.DateTimeField()),
('repeater_stub', models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name='repeat_records',
to='repeaters.RepeaterStub',
)),
],
options={
'db_table': 'repeaters_repeatrecord',
'ordering': ['registered_at'],
},
),
migrations.CreateModel(
name='SQLRepeatRecordAttempt',
fields=[
('id', models.AutoField(auto_created=True,
primary_key=True,
serialize=False,
verbose_name='ID')),
('state', models.TextField(choices=[
('PENDING', 'Pending'),
('SUCCESS', 'Succeeded'),
('FAIL', 'Failed'),
('CANCELLED', 'Cancelled'),
])),
('message', models.TextField(blank=True, null=True)),
('traceback', models.TextField(blank=True, null=True)),
('created_at', models.DateTimeField(
default=django.utils.timezone.now)),
('repeat_record', models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to='repeaters.SQLRepeatRecord',
)),
],
options={
'db_table': 'repeaters_repeatrecordattempt',
'ordering': ['created_at'],
},
),
migrations.AddIndex(
model_name='repeaterstub',
index=models.Index(fields=['domain'],
name='repeaters_r_domain_23d304_idx'),
),
migrations.AddIndex(
model_name='repeaterstub',
index=models.Index(fields=['repeater_id'],
name='repeaters_r_repeate_4c833b_idx'),
),
migrations.AddIndex(
model_name='sqlrepeatrecord',
index=models.Index(fields=['domain'],
name='repeaters_r_domain_3ae9ab_idx'),
),
migrations.AddIndex(
model_name='sqlrepeatrecord',
index=models.Index(fields=['couch_id'],
name='repeaters_r_couch_i_ea5782_idx'),
),
migrations.AddIndex(
model_name='sqlrepeatrecord',
index=models.Index(fields=['payload_id'],
name='repeaters_r_payload_f64556_idx'),
),
migrations.AddIndex(
model_name='sqlrepeatrecord',
index=models.Index(fields=['registered_at'],
name='repeaters_r_registe_b48c68_idx'),
),
]
78 changes: 78 additions & 0 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@

from django.utils.functional import cached_property
from django.conf import settings
from django.db import models
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from couchdbkit.exceptions import ResourceConflict, ResourceNotFound
Expand Down Expand Up @@ -122,6 +124,7 @@
RECORD_CANCELLED_STATE,
RECORD_FAILURE_STATE,
RECORD_PENDING_STATE,
RECORD_STATES,
RECORD_SUCCESS_STATE,
)
from .dbaccessors import (
Expand Down Expand Up @@ -164,6 +167,46 @@ def log_repeater_success_in_datadog(domain, status_code, repeater_type):
})


class RepeaterStubManager(models.Manager):

def all_ready(self):
"""
Return all RepeaterStubs ready to be forwarded.
"""
not_paused = models.Q(is_paused=False)
next_attempt_not_in_the_future = (
models.Q(next_attempt_at__isnull=True)
| models.Q(next_attempt_at__lte=timezone.now())
)
repeat_records_ready_to_send = models.Q(
repeat_records__state__in=(RECORD_PENDING_STATE,
RECORD_FAILURE_STATE)
)
return (self.get_queryset()
.filter(not_paused)
.filter(next_attempt_not_in_the_future)
.filter(repeat_records_ready_to_send))


class RepeaterStub(models.Model):
"""
This model links the SQLRepeatRecords of a Repeater.
"""
domain = models.CharField(max_length=126)
repeater_id = models.CharField(max_length=36)
is_paused = models.BooleanField(default=False)
next_attempt_at = models.DateTimeField(null=True, blank=True)
last_attempt_at = models.DateTimeField(null=True, blank=True)

objects = RepeaterStubManager()

class Meta:
indexes = [
models.Index(fields=['domain']),
models.Index(fields=['repeater_id']),
]


class Repeater(QuickCachedDocumentMixin, Document):
"""
Represents the configuration of a repeater. Will specify the URL to forward to and
Expand Down Expand Up @@ -933,6 +976,41 @@ def requeue(self):
self.next_check = datetime.utcnow()


class SQLRepeatRecord(models.Model):
domain = models.CharField(max_length=126)
couch_id = models.CharField(max_length=36, null=True, blank=True)
payload_id = models.CharField(max_length=36)
repeater_stub = models.ForeignKey(RepeaterStub,
on_delete=models.CASCADE,
related_name='repeat_records')
state = models.TextField(choices=RECORD_STATES,
default=RECORD_PENDING_STATE)
registered_at = models.DateTimeField()

class Meta:
db_table = 'repeaters_repeatrecord'
indexes = [
models.Index(fields=['domain']),
models.Index(fields=['couch_id']),
models.Index(fields=['payload_id']),
models.Index(fields=['registered_at']),
]
ordering = ['registered_at']


class SQLRepeatRecordAttempt(models.Model):
repeat_record = models.ForeignKey(SQLRepeatRecord,
on_delete=models.CASCADE)
state = models.TextField(choices=RECORD_STATES)
message = models.TextField(null=True, blank=True)
traceback = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(default=timezone.now)

class Meta:
db_table = 'repeaters_repeatrecordattempt'
ordering = ['created_at']


def _get_retry_interval(last_checked, now):
"""
Returns a timedelta between MIN_RETRY_WAIT and MAX_RETRY_WAIT that
Expand Down
Loading

0 comments on commit 7e5a03c

Please sign in to comment.