diff --git a/corehq/motech/repeaters/const.py b/corehq/motech/repeaters/const.py index 415c516a91a3..8580b6e81e6e 100644 --- a/corehq/motech/repeaters/const.py +++ b/corehq/motech/repeaters/const.py @@ -23,3 +23,8 @@ (RECORD_FAILURE_STATE, _('Failed')), (RECORD_CANCELLED_STATE, _('Cancelled')), ] +# State for Couch records that have been migrated to SQL. If the +# migration is rolled back, the `migrated` property is set to False, and +# `RepeatRecord.state` will return PENDING or FAILED as it did before +# migration. +RECORD_MIGRATED_STATE = 'MIGRATED' diff --git a/corehq/motech/repeaters/dbaccessors.py b/corehq/motech/repeaters/dbaccessors.py index 7face06c46aa..ef09d20bd078 100644 --- a/corehq/motech/repeaters/dbaccessors.py +++ b/corehq/motech/repeaters/dbaccessors.py @@ -365,3 +365,45 @@ def delete_all_repeaters(): from .models import Repeater for repeater in Repeater.get_db().view('repeaters/repeaters', reduce=False, include_docs=True).all(): Repeater.wrap(repeater['doc']).delete() + + +# Functions for evaluating scale +def get_repeat_record_domain_total(): + from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records + + return len(get_domains_that_have_repeat_records()) + + +def get_couch_repeater_total(): + from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records + from corehq.motech.repeaters.models import Repeater + + total = 0 + for domain in get_domains_that_have_repeat_records(): + result = Repeater.get_db().view( + 'repeaters/repeaters', + startkey=[domain], + endkey=[domain, {}], + include_docs=False, + reduce=True, + ).one() + total += result['value'] if result else 0 + return total + + +def get_couch_repeat_record_total(): + from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records + from corehq.motech.repeaters.models import RepeatRecord + + total = 0 + for domain in get_domains_that_have_repeat_records(): + result = RepeatRecord.get_db().view( + 'repeaters/repeat_records', + startkey=[domain, None, {}], + endkey=[domain, None], + include_docs=False, + reduce=True, + descending=True, + ).one() + total += result['value'] if result else 0 + return total diff --git a/corehq/motech/repeaters/management/commands/migrate_records.py b/corehq/motech/repeaters/management/commands/migrate_records.py new file mode 100644 index 000000000000..a22b786390c9 --- /dev/null +++ b/corehq/motech/repeaters/management/commands/migrate_records.py @@ -0,0 +1,42 @@ +from inspect import cleandoc + +from django.core.management.base import BaseCommand + +from corehq.util.log import with_progress_bar + +from ...dbaccessors import ( + get_domains_that_have_repeat_records, + get_repeaters_by_domain, + iter_repeat_records_by_repeater, +) +from ...models import Repeater, RepeaterStub +from ...tasks import migrate_repeat_record + + +class Command(BaseCommand): + help = cleandoc(""" + Migrate Couch RepeatRecords to SQL. + + If a Couch RepeatRecord cannot be migrated (usually because it + encounters a ResourceConflict error when trying to set its + "migrated" state) then this command can be run again, and + already-migrated RepeatRecords will be skipped. + + See the "roll_back_record_migration" management command for + instructions to roll the migration back, if necessary. + """) + + def handle(self, *args, **options): + # Migrate by domain to minimise impact on Repeat Record report + domains = get_domains_that_have_repeat_records() + for domain in with_progress_bar(domains): + for repeater in get_repeaters_by_domain(domain): + migrate_repeater(repeater) + + +def migrate_repeater(repeater: Repeater): + for couch_record in iter_repeat_records_by_repeater(repeater.domain, + repeater.get_id): + if couch_record.migrated: + continue + migrate_repeat_record.delay(repeater.repeater_stub, couch_record) diff --git a/corehq/motech/repeaters/management/commands/sample_repeaters.py b/corehq/motech/repeaters/management/commands/sample_repeaters.py new file mode 100644 index 000000000000..df367f44f7b7 --- /dev/null +++ b/corehq/motech/repeaters/management/commands/sample_repeaters.py @@ -0,0 +1,82 @@ +from datetime import datetime +from uuid import uuid4 + +from django.core.management.base import BaseCommand + +from corehq.apps.domain.shortcuts import create_domain +from corehq.motech.models import ConnectionSettings +from corehq.motech.repeaters.models import ( + FormRepeater, + RepeatRecord, + RepeatRecordAttempt, +) +from corehq.util.log import with_progress_bar + +# NUM_DOMAINS = 500 # Prod +NUM_DOMAINS = 5 +REPEATERS_PER_DOMAIN = 3 +# RECORDS_PER_REPEATER = 50_000 # Prod +RECORDS_PER_REPEATER = 250 +ATTEMPTS_PER_RECORD = 3 + + +class Command(BaseCommand): + help = 'Create a ton of repeaters to mimic Prod' + + def handle(self, *args, **options): + create_sample_repeaters() + + +def create_sample_repeaters(): + for domain_name in with_progress_bar(get_domain_names(), + length=NUM_DOMAINS): + create_domain(domain_name) + connset = ConnectionSettings.objects.create( + domain=domain_name, + name='local httpbin', + url='http://127.0.0.1:10080/anything', + ) + for i in range(1, REPEATERS_PER_DOMAIN + 1): + rep = FormRepeater( + domain=domain_name, + connection_settings_id=connset.pk, + ) + rep.save() + for j in range(1, RECORDS_PER_REPEATER + 1): + now = datetime.utcnow() + attempts = third_time_is_the_charm() + rec = RepeatRecord( + domain=domain_name, + repeater_id=rep.get_id, + repeater_type=rep.__class__.__name__, + payload_id=str(uuid4()), + registered_on=now, + next_check=None, + succeeded=True, + overall_tries=len(attempts), + attempts=attempts, + ) + rec.save() + + +def get_domain_names(): + for i in range(1, NUM_DOMAINS + 1): + yield f'repeaters-{i:03d}' + + +def third_time_is_the_charm(): + return [ + RepeatRecordAttempt( + datetime=datetime.utcnow(), + failure_reason='Boo', + ), + RepeatRecordAttempt( + datetime=datetime.utcnow(), + failure_reason='Boo', + ), + RepeatRecordAttempt( + datetime=datetime.utcnow(), + success_response='Yay', + succeeded=True, + ), + ] diff --git a/corehq/motech/repeaters/models.py b/corehq/motech/repeaters/models.py index 68212d6534d9..88cf619bce7d 100644 --- a/corehq/motech/repeaters/models.py +++ b/corehq/motech/repeaters/models.py @@ -129,6 +129,7 @@ MIN_RETRY_WAIT, RECORD_CANCELLED_STATE, RECORD_FAILURE_STATE, + RECORD_MIGRATED_STATE, RECORD_PENDING_STATE, RECORD_STATES, RECORD_SUCCESS_STATE, @@ -374,25 +375,31 @@ def get_attempt_info(self, repeat_record): return None def register(self, payload): + """ + Registers a SQLRepeatRecord. + """ if not self.allowed_to_forward(payload): return - now = datetime.utcnow() - repeat_record = RepeatRecord( - repeater_id=self.get_id, - repeater_type=self.doc_type, + if SQLRepeatRecord.objects.filter( + domain=self.domain, + repeater_stub=self.repeater_stub, + payload_id=payload.get_id, + state__in=(RECORD_PENDING_STATE, RECORD_FAILURE_STATE), + ).exists(): + # Payload is already waiting to be sent + return + + self.repeater_stub.repeat_records.create( domain=self.domain, - registered_on=now, - next_check=now, - payload_id=payload.get_id + payload_id=payload.get_id, + registered_at=timezone.now(), ) metrics_counter('commcare.repeaters.new_record', tags={ 'domain': self.domain, 'doc_type': self.doc_type }) - repeat_record.save() - repeat_record.attempt_forward_now() - return repeat_record + attempt_forward_now(self.repeater_stub) def allowed_to_forward(self, payload): """ @@ -872,6 +879,7 @@ class RepeatRecord(Document): failure_reason = StringProperty() next_check = DateTimeProperty() succeeded = BooleanProperty(default=False) + migrated = BooleanProperty(default=False) @property def record_id(self): @@ -916,6 +924,8 @@ def state(self): state = RECORD_SUCCESS_STATE elif self.cancelled: state = RECORD_CANCELLED_STATE + elif self.migrated: + state = RECORD_MIGRATED_STATE elif self.failure_reason: state = RECORD_FAILURE_STATE return state diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index ec133003b9c5..353b68901fb9 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -87,8 +87,6 @@ def check_repeaters(): start = datetime.utcnow() twentythree_hours_sec = 23 * 60 * 60 twentythree_hours_later = start + timedelta(hours=23) - - # Long timeout to allow all waiting repeat records to be iterated check_repeater_lock = get_redis_lock( CHECK_REPEATERS_KEY, timeout=twentythree_hours_sec, @@ -103,14 +101,17 @@ def check_repeaters(): "commcare.repeaters.check.processing", timing_buckets=_check_repeaters_buckets, ): - for record in iterate_repeat_records(start, chunk_size=5000): + soft_migrate_repeat_records(start) + for repeater_stub in RepeaterStub.objects.all_ready(): if not _soft_assert( datetime.utcnow() < twentythree_hours_later, - "I've been iterating repeat records for 23 hours. I quit!" + "I've been iterating repeaters for 23 hours. I quit!" ): break - metrics_counter("commcare.repeaters.check.attempt_forward") - record.attempt_forward_now(is_retry=True) + if domain_can_forward(repeater_stub.domain): + metrics_counter("commcare.repeaters.check.attempt_forward") + # TODO: Function call should indicate that we are retrying + process_repeater_stub.delay(repeater_stub) else: iterating_time = datetime.utcnow() - start _soft_assert( @@ -121,6 +122,41 @@ def check_repeaters(): check_repeater_lock.release() +def soft_migrate_repeat_records(start): + """ + Soft-migrate repeat records to be sent. + + .. note:: + The ``migrate_records`` management command will migrate the + remaining cancelled and succeeded repeat records ... or they can + be ignored ... or deleted. + + .. note:: + The repeat records report will only show newer repeat records, + unless the ``migrate_records`` management command has migrated + older records. + + """ + # After the first run, this should be quick. After seven days + # (MAX_RETRY_WAIT) only cancelled and succeeded Couch repeat records + # will not have been migrated. + + repeater_stub_cache = {} + + def get_repeater_stub(domain, repeater_id): + if (domain, repeater_id) not in repeater_stub_cache: + stub = RepeaterStub.objects.get( + domain=domain, + repeater_id=repeater_id, + ) + repeater_stub_cache[(domain, repeater_id)] = stub + return repeater_stub_cache[(domain, repeater_id)] + + for record in iterate_repeat_records(start): + repeater_stub = get_repeater_stub(record.domain, record.repeater_id) + migrate_repeat_record.delay(repeater_stub, record) + + @task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE) def process_repeat_record(repeat_record): _process_repeat_record(repeat_record) @@ -192,6 +228,7 @@ def process_repeater_stub(repeater_stub: RepeaterStub): fail_hard=False, block=False, timeout=5 * 60 * 60, ): for repeat_record in repeater_stub.repeat_records_ready[:RECORDS_AT_A_TIME]: + metrics_counter("commcare.repeaters.check.attempt_forward") try: payload = get_payload(repeater_stub.repeater, repeat_record) except Exception: @@ -204,6 +241,36 @@ def process_repeater_stub(repeater_stub: RepeaterStub): break +@task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def migrate_repeat_record( + repeater_stub: RepeaterStub, + couch_record: RepeatRecord, +): + assert repeater_stub.domain == couch_record.domain + sql_record = repeater_stub.repeat_records.create( + domain=couch_record.domain, + couch_id=couch_record.record_id, + payload_id=couch_record.payload_id, + state=couch_record.state, + registered_at=couch_record.registered_on, + ) + for attempt in couch_record.attempts: + sql_record.sqlrepeatrecordattempt_set.create( + state=attempt.state, + message=attempt.message, + created_at=attempt.datetime, + ) + + couch_record.migrated = True + couch_record.next_check = None + try: + couch_record.save() + except: # noqa: E722 + logging.exception('Failed to migrate repeat record: ' + f'{couch_record.record_id}') + sql_record.delete() + + @task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE) def revert_migrated(couch_record): """ diff --git a/corehq/motech/repeaters/tests/test_repeater.py b/corehq/motech/repeaters/tests/test_repeater.py index 88348cefb61e..6f935c2e3706 100644 --- a/corehq/motech/repeaters/tests/test_repeater.py +++ b/corehq/motech/repeaters/tests/test_repeater.py @@ -1,8 +1,10 @@ import json import uuid from collections import namedtuple +from contextlib import contextmanager from datetime import datetime, timedelta +from django.conf import settings from django.test import SimpleTestCase, TestCase, override_settings import attr @@ -52,8 +54,11 @@ Repeater, RepeatRecord, ShortFormRepeater, + SQLRepeatRecord, UserRepeater, _get_retry_interval, + attempt_forward_now, + get_payload, ) from corehq.motech.repeaters.repeater_generators import ( BasePayloadGenerator, @@ -137,9 +142,8 @@ def post_xml(cls, xml, domain_name): @classmethod def repeat_records(cls, domain_name): - # Enqueued repeat records have next_check set 48 hours in the future. - later = datetime.utcnow() + timedelta(hours=48 + 1) - return RepeatRecord.all(domain=domain_name, due_before=later) + # TODO: You're probably doing it wrong. + return SQLRepeatRecord.objects.filter(domain=domain_name).all() class RepeaterTest(BaseRepeaterTest): @@ -349,7 +353,8 @@ def test_retry_process_repeat_record_locking(self): @run_with_all_backends def test_automatic_cancel_repeat_record(self): - repeat_record = self.case_repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + self.case_repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.case_repeater) self.assertEqual(1, repeat_record.overall_tries) with patch('corehq.motech.repeaters.models.simple_post', side_effect=Exception('Boom!')): for __ in range(repeat_record.max_possible_tries - repeat_record.overall_tries): @@ -631,7 +636,8 @@ def tearDown(self): @run_with_all_backends def test_get_payload_exception(self): - repeat_record = self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.repeater) with self.assertRaises(Exception): with patch.object(CaseRepeater, 'get_payload', side_effect=Exception('Boom!')): repeat_record.fire() @@ -641,7 +647,8 @@ def test_get_payload_exception(self): @run_with_all_backends def test_failure(self): - repeat_record = self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.repeater) with patch('corehq.motech.repeaters.models.simple_post', side_effect=RequestException('Boom!')): repeat_record.fire() @@ -650,7 +657,8 @@ def test_failure(self): @run_with_all_backends def test_unexpected_failure(self): - repeat_record = self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.repeater) with patch('corehq.motech.repeaters.models.simple_post', side_effect=Exception('Boom!')): repeat_record.fire() @@ -659,8 +667,8 @@ def test_unexpected_failure(self): @run_with_all_backends def test_success(self): - repeat_record = self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) - repeat_record = RepeatRecord.get(repeat_record.record_id) + self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.repeater) # Should be marked as successful after a successful run with patch('corehq.motech.repeaters.models.simple_post') as mock_simple_post: mock_simple_post.return_value.status_code = 200 @@ -783,7 +791,8 @@ def get_payload(self, repeat_record, payload_doc): @run_with_all_backends def test_new_format_payload(self): - repeat_record = self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + self.repeater.register(CaseAccessors(self.domain).get_case(CASE_ID)) + repeat_record = _get_last_repeat_record(self.repeater) with patch('corehq.motech.repeaters.models.simple_post') as mock_post, \ patch.object(ConnectionSettings, 'get_auth_manager') as mock_manager: mock_post.return_value.status_code = 200 @@ -816,61 +825,52 @@ class UserRepeaterTest(TestCase, DomainSubscriptionMixin): def setUpClass(cls): super().setUpClass() cls.domain = 'user-repeater' - cls.domain_obj = create_domain(name=cls.domain) - - # DATA_FORWARDING is on PRO and above + # DATA_FORWARDING is on Pro and above cls.setup_subscription(cls.domain, SoftwarePlanEdition.PRO) - - def setUp(self): - super().setUp() - self.connx = ConnectionSettings.objects.create( - domain=self.domain, - url='super-cool-url', + cls.connx = ConnectionSettings.objects.create( + domain=cls.domain, + url='http://localhost/api/', ) - self.repeater = UserRepeater( - domain=self.domain, - connection_settings_id=self.connx.id, + cls.repeater = UserRepeater( + domain=cls.domain, + connection_settings_id=cls.connx.id, ) - self.repeater.save() + cls.repeater.save() @classmethod def tearDownClass(cls): + delete_all_repeat_records() + delete_all_repeaters() + cls.connx.delete() cls.teardown_subscriptions() cls.domain_obj.delete() clear_plan_version_cache() super().tearDownClass() - def tearDown(self): - super().tearDown() - delete_all_repeat_records() - delete_all_repeaters() - self.connx.delete() - + @property def repeat_records(self): - # Enqueued repeat records have next_check set 48 hours in the future. - later = datetime.utcnow() + timedelta(hours=48 + 1) - return RepeatRecord.all(domain=self.domain, due_before=later) + return SQLRepeatRecord.objects.filter(domain=self.domain).all() def make_user(self, username): user = CommCareUser.create( - self.domain, - "{}@{}.commcarehq.org".format(username, self.domain), - "123", - None, - None, + domain=self.domain, + username=f'{username}@{self.domain}.commcarehq.org', + password="123", + created_by=None, + created_via=None, ) self.addCleanup(user.delete, deleted_by=None) return user def test_trigger(self): - self.assertEqual(0, len(self.repeat_records().all())) + self.assertEqual(0, len(self.repeat_records)) user = self.make_user("bselmy") - records = self.repeat_records().all() + records = self.repeat_records self.assertEqual(1, len(records)) - record = records[0] + payload = get_payload(self.repeater, records[0]) self.assertEqual( - json.loads(record.get_payload()), + json.loads(payload), { 'id': user._id, 'username': user.username, @@ -881,7 +881,7 @@ def test_trigger(self): 'groups': [], 'phone_numbers': [], 'email': '', - 'resource_uri': '/a/user-repeater/api/v0.5/user/{}/'.format(user._id), + 'resource_uri': f'/a/user-repeater/api/v0.5/user/{user._id}/', } ) @@ -987,8 +987,8 @@ def setUp(self): connection_settings_id=self.connx.id, ) self.repeater.save() - self.post_xml(self.xform_xml, self.domain) - self.repeater = reloaded(self.repeater) + # self.post_xml(self.xform_xml, self.domain) + # self.repeater = reloaded(self.repeater) def tearDown(self): self.repeater.delete() @@ -999,31 +999,31 @@ def tearDown(self): @run_with_all_backends def test_trigger_when_paused(self): - # not paused - with patch.object(RepeatRecord, 'fire') as mock_fire: - with patch.object(RepeatRecord, 'postpone_by') as mock_postpone_fire: - # calls _process_repeat_record(): - self.repeat_record = self.repeater.register(CaseAccessors(self.domain_obj).get_case(CASE_ID)) - self.assertEqual(mock_fire.call_count, 1) - self.assertEqual(mock_postpone_fire.call_count, 0) - - # paused - self.repeater.pause() - # re fetch repeat record - repeat_record_id = self.repeat_record.get_id - self.repeat_record = RepeatRecord.get(repeat_record_id) - _process_repeat_record(self.repeat_record) - self.assertEqual(mock_fire.call_count, 1) - self.assertEqual(mock_postpone_fire.call_count, 1) + """ + When a Repeater is paused, processing it should not send repeat records + """ + with patch('corehq.motech.repeaters.tasks.send_request') as mock_send, \ + _temp_settings('CELERY_TASK_ALWAYS_EAGER', True): + mock_send.return_value = True # payload succeeded or was cancelled + + # Not paused + self.post_xml(self.xform_xml, self.domain) + # payload = CaseAccessors(self.domain_obj).get_case(CASE_ID) + # self.repeater.register(payload) + # repeat_record = _get_last_repeat_record(self.repeater) + self.assertEqual(mock_send.call_count, 1) # Called - # resumed - self.repeater.resume() - # re fetch repeat record - repeat_record_id = self.repeat_record.get_id - self.repeat_record = RepeatRecord.get(repeat_record_id) - _process_repeat_record(self.repeat_record) - self.assertEqual(mock_fire.call_count, 2) - self.assertEqual(mock_postpone_fire.call_count, 1) + # Paused + self.repeater.pause() + repeat_record = _get_last_repeat_record(self.repeater) + repeat_record.requeue() + attempt_forward_now(self.repeater.repeater_stub) + self.assertEqual(mock_send.call_count, 1) # Not called again + + # Resumed + self.repeater.resume() + attempt_forward_now(self.repeater.repeater_stub) + self.assertEqual(mock_send.call_count, 2) # Called again class TestRepeaterDeleted(BaseRepeaterTest): @@ -1232,3 +1232,17 @@ def fromisoformat(isoformat): return datetime.fromisoformat(isoformat) # Python >= 3.7 except AttributeError: return datetime.strptime(isoformat, "%Y-%m-%d %H:%M:%S") + + +def _get_last_repeat_record(repeater: Repeater) -> SQLRepeatRecord: + return repeater.repeater_stub.repeat_records.last() + + +@contextmanager +def _temp_settings(setting_name, temp_value): + orig_value = getattr(settings, setting_name) + setattr(settings, setting_name, temp_value) + try: + yield + finally: + setattr(settings, setting_name, orig_value)