Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch over #29601

Draft
wants to merge 6 commits into
base: nh/rep/commands
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@
(RECORD_FAILURE_STATE, _('Failed')),
(RECORD_CANCELLED_STATE, _('Cancelled')),
]
# State for Couch records that have been migrated to SQL. If the
# migration is rolled back, the `migrated` property is set to False, and
# `RepeatRecord.state` will return PENDING or FAILED as it did before
# migration.
RECORD_MIGRATED_STATE = 'MIGRATED'
42 changes: 42 additions & 0 deletions corehq/motech/repeaters/dbaccessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,45 @@ def delete_all_repeaters():
from .models import Repeater
for repeater in Repeater.get_db().view('repeaters/repeaters', reduce=False, include_docs=True).all():
Repeater.wrap(repeater['doc']).delete()


# Functions for evaluating scale
def get_repeat_record_domain_total():
from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this import? get_domains_that_have_repeat_records is in this same module


return len(get_domains_that_have_repeat_records())


def get_couch_repeater_total():
from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as previous -- this import should be unnecessary

from corehq.motech.repeaters.models import Repeater
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this isn't a circular import, can this be moved to to the top of the file?


total = 0
for domain in get_domains_that_have_repeat_records():
result = Repeater.get_db().view(
'repeaters/repeaters',
startkey=[domain],
endkey=[domain, {}],
include_docs=False,
reduce=True,
).one()
total += result['value'] if result else 0
return total


def get_couch_repeat_record_total():
from corehq.motech.repeaters.dbaccessors import get_domains_that_have_repeat_records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as previous -- this import should be unnecessary

from corehq.motech.repeaters.models import RepeatRecord

total = 0
for domain in get_domains_that_have_repeat_records():
result = RepeatRecord.get_db().view(
'repeaters/repeat_records',
startkey=[domain, None, {}],
endkey=[domain, None],
include_docs=False,
reduce=True,
descending=True,
).one()
total += result['value'] if result else 0
return total
42 changes: 42 additions & 0 deletions corehq/motech/repeaters/management/commands/migrate_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from inspect import cleandoc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command (migrating couch repeater records to SQL) will have no use once migration is complete, right? Would a one-off script make more sense for this? I'd rather not create another command users need to ignore in ./manage.py (and I'd imagine it is a slight performance hit every time ./manage.py is executed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually end up checking scripts like this in if only because they also need to be run by third parties that host HQ. We can delete them after we're reasonably confident everyone should have run them.

This comment reminded me, @kaapstorm how will the migration experience be for third parties? Will this command be integrated into a django migration like in the auto-managed pattern? Apologies if you've already discussed this - I'm quite late to the party, haven't been paying close attention to this migration recently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(@kaapstorm ignore this question, I'm going to go read the doc about the repeaters migration plan)


from django.core.management.base import BaseCommand

from corehq.util.log import with_progress_bar

from ...dbaccessors import (
get_domains_that_have_repeat_records,
get_repeaters_by_domain,
iter_repeat_records_by_repeater,
)
from ...models import Repeater, RepeaterStub

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F401 '...models.RepeaterStub' imported but unused

from ...tasks import migrate_repeat_record


class Command(BaseCommand):
help = cleandoc("""
Migrate Couch RepeatRecords to SQL.

If a Couch RepeatRecord cannot be migrated (usually because it
encounters a ResourceConflict error when trying to set its
"migrated" state) then this command can be run again, and
already-migrated RepeatRecords will be skipped.

See the "roll_back_record_migration" management command for
instructions to roll the migration back, if necessary.
""")

def handle(self, *args, **options):
# Migrate by domain to minimise impact on Repeat Record report
domains = get_domains_that_have_repeat_records()
for domain in with_progress_bar(domains):
for repeater in get_repeaters_by_domain(domain):
migrate_repeater(repeater)


def migrate_repeater(repeater: Repeater):
for couch_record in iter_repeat_records_by_repeater(repeater.domain,
repeater.get_id):
if couch_record.migrated:
continue
migrate_repeat_record.delay(repeater.repeater_stub, couch_record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is delay being used here as a form of multi-threading?

82 changes: 82 additions & 0 deletions corehq/motech/repeaters/management/commands/sample_repeaters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime
from uuid import uuid4

from django.core.management.base import BaseCommand

from corehq.apps.domain.shortcuts import create_domain
from corehq.motech.models import ConnectionSettings
from corehq.motech.repeaters.models import (
FormRepeater,
RepeatRecord,
RepeatRecordAttempt,
)
from corehq.util.log import with_progress_bar

# NUM_DOMAINS = 500 # Prod
NUM_DOMAINS = 5
REPEATERS_PER_DOMAIN = 3
# RECORDS_PER_REPEATER = 50_000 # Prod
RECORDS_PER_REPEATER = 250
ATTEMPTS_PER_RECORD = 3


class Command(BaseCommand):
help = 'Create a ton of repeaters to mimic Prod'

def handle(self, *args, **options):
create_sample_repeaters()


def create_sample_repeaters():
for domain_name in with_progress_bar(get_domain_names(),
length=NUM_DOMAINS):
create_domain(domain_name)
connset = ConnectionSettings.objects.create(
domain=domain_name,
name='local httpbin',
url='http://127.0.0.1:10080/anything',
)
for i in range(1, REPEATERS_PER_DOMAIN + 1):
rep = FormRepeater(
domain=domain_name,
connection_settings_id=connset.pk,
)
rep.save()
for j in range(1, RECORDS_PER_REPEATER + 1):
now = datetime.utcnow()
attempts = third_time_is_the_charm()
rec = RepeatRecord(
domain=domain_name,
repeater_id=rep.get_id,
repeater_type=rep.__class__.__name__,
payload_id=str(uuid4()),
registered_on=now,
next_check=None,
succeeded=True,
overall_tries=len(attempts),
attempts=attempts,
)
rec.save()


def get_domain_names():
for i in range(1, NUM_DOMAINS + 1):
yield f'repeaters-{i:03d}'


def third_time_is_the_charm():
return [
RepeatRecordAttempt(
datetime=datetime.utcnow(),
failure_reason='Boo',
),
RepeatRecordAttempt(
datetime=datetime.utcnow(),
failure_reason='Boo',
),
RepeatRecordAttempt(
datetime=datetime.utcnow(),
success_response='Yay',
succeeded=True,
),
]
30 changes: 20 additions & 10 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
MIN_RETRY_WAIT,
RECORD_CANCELLED_STATE,
RECORD_FAILURE_STATE,
RECORD_MIGRATED_STATE,
RECORD_PENDING_STATE,
RECORD_STATES,
RECORD_SUCCESS_STATE,
Expand Down Expand Up @@ -374,25 +375,31 @@ def get_attempt_info(self, repeat_record):
return None

def register(self, payload):
"""
Registers a SQLRepeatRecord.
"""
if not self.allowed_to_forward(payload):
return

now = datetime.utcnow()
repeat_record = RepeatRecord(
repeater_id=self.get_id,
repeater_type=self.doc_type,
if SQLRepeatRecord.objects.filter(
domain=self.domain,
repeater_stub=self.repeater_stub,
payload_id=payload.get_id,
state__in=(RECORD_PENDING_STATE, RECORD_FAILURE_STATE),
).exists():
# Payload is already waiting to be sent
return

self.repeater_stub.repeat_records.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the model already exists for this specific PR, but if it's within scope, I'll advocate for renaming RepeaterStub, as Stub already has a well-defined meaning within testing.

domain=self.domain,
registered_on=now,
next_check=now,
payload_id=payload.get_id
payload_id=payload.get_id,
registered_at=timezone.now(),
)
metrics_counter('commcare.repeaters.new_record', tags={
'domain': self.domain,
'doc_type': self.doc_type
})
repeat_record.save()
repeat_record.attempt_forward_now()
return repeat_record
attempt_forward_now(self.repeater_stub)

def allowed_to_forward(self, payload):
"""
Expand Down Expand Up @@ -872,6 +879,7 @@ class RepeatRecord(Document):
failure_reason = StringProperty()
next_check = DateTimeProperty()
succeeded = BooleanProperty(default=False)
migrated = BooleanProperty(default=False)

@property
def record_id(self):
Expand Down Expand Up @@ -916,6 +924,8 @@ def state(self):
state = RECORD_SUCCESS_STATE
elif self.cancelled:
state = RECORD_CANCELLED_STATE
elif self.migrated:
state = RECORD_MIGRATED_STATE
elif self.failure_reason:
state = RECORD_FAILURE_STATE
return state
Expand Down
79 changes: 73 additions & 6 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ def check_repeaters():
start = datetime.utcnow()
twentythree_hours_sec = 23 * 60 * 60
twentythree_hours_later = start + timedelta(hours=23)

# Long timeout to allow all waiting repeat records to be iterated
check_repeater_lock = get_redis_lock(
CHECK_REPEATERS_KEY,
timeout=twentythree_hours_sec,
Expand All @@ -103,14 +101,17 @@ def check_repeaters():
"commcare.repeaters.check.processing",
timing_buckets=_check_repeaters_buckets,
):
for record in iterate_repeat_records(start, chunk_size=5000):
soft_migrate_repeat_records(start)
for repeater_stub in RepeaterStub.objects.all_ready():
if not _soft_assert(
datetime.utcnow() < twentythree_hours_later,
"I've been iterating repeat records for 23 hours. I quit!"
"I've been iterating repeaters for 23 hours. I quit!"
):
break
metrics_counter("commcare.repeaters.check.attempt_forward")
record.attempt_forward_now(is_retry=True)
if domain_can_forward(repeater_stub.domain):
metrics_counter("commcare.repeaters.check.attempt_forward")
# TODO: Function call should indicate that we are retrying
process_repeater_stub.delay(repeater_stub)
else:
iterating_time = datetime.utcnow() - start
_soft_assert(
Expand All @@ -121,6 +122,41 @@ def check_repeaters():
check_repeater_lock.release()


def soft_migrate_repeat_records(start):
"""
Soft-migrate repeat records to be sent.

.. note::
The ``migrate_records`` management command will migrate the
remaining cancelled and succeeded repeat records ... or they can
be ignored ... or deleted.

.. note::
The repeat records report will only show newer repeat records,
unless the ``migrate_records`` management command has migrated
older records.

"""
# After the first run, this should be quick. After seven days
# (MAX_RETRY_WAIT) only cancelled and succeeded Couch repeat records
# will not have been migrated.

repeater_stub_cache = {}

def get_repeater_stub(domain, repeater_id):
if (domain, repeater_id) not in repeater_stub_cache:
stub = RepeaterStub.objects.get(
domain=domain,
repeater_id=repeater_id,
)
repeater_stub_cache[(domain, repeater_id)] = stub
return repeater_stub_cache[(domain, repeater_id)]

for record in iterate_repeat_records(start):
repeater_stub = get_repeater_stub(record.domain, record.repeater_id)
migrate_repeat_record.delay(repeater_stub, record)


@task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def process_repeat_record(repeat_record):
_process_repeat_record(repeat_record)
Expand Down Expand Up @@ -192,6 +228,7 @@ def process_repeater_stub(repeater_stub: RepeaterStub):
fail_hard=False, block=False, timeout=5 * 60 * 60,
):
for repeat_record in repeater_stub.repeat_records_ready[:RECORDS_AT_A_TIME]:
metrics_counter("commcare.repeaters.check.attempt_forward")
try:
payload = get_payload(repeater_stub.repeater, repeat_record)
except Exception:
Expand All @@ -204,6 +241,36 @@ def process_repeater_stub(repeater_stub: RepeaterStub):
break


@task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def migrate_repeat_record(
repeater_stub: RepeaterStub,
couch_record: RepeatRecord,
):
assert repeater_stub.domain == couch_record.domain
sql_record = repeater_stub.repeat_records.create(
domain=couch_record.domain,
couch_id=couch_record.record_id,
payload_id=couch_record.payload_id,
state=couch_record.state,
registered_at=couch_record.registered_on,
)
for attempt in couch_record.attempts:
sql_record.sqlrepeatrecordattempt_set.create(
state=attempt.state,
message=attempt.message,
created_at=attempt.datetime,
)

couch_record.migrated = True
couch_record.next_check = None
try:
couch_record.save()
except: # noqa: E722
logging.exception('Failed to migrate repeat record: '
f'{couch_record.record_id}')
sql_record.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this delete also fails? Do we have any logging in that circumstance?



@task(serializer='pickle', queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def revert_migrated(couch_record):
"""
Expand Down
Loading