|
| 1 | +import time |
| 2 | + |
1 | 3 | from django.conf import settings |
2 | 4 | from django.core.cache import cache |
3 | 5 | from django.utils import timezone |
|
17 | 19 | cleanup_stale_snapshots_and_refresh_mv, |
18 | 20 | get_or_create_run, |
19 | 21 | iter_org_chunks_after, |
20 | | - process_chunk, |
| 22 | + process_chunk, refresh_user_reports_materialized_view, |
21 | 23 | ) |
22 | 24 | from kobo.celery import celery_app |
23 | 25 | from kpi.utils.log import logging |
24 | 26 |
|
25 | 27 |
|
26 | 28 | @celery_app.task( |
27 | | - queue='kpi_low_priority_queue', |
| 29 | + queue='kpi_long_running_tasks_queue', |
28 | 30 | soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT, |
29 | 31 | time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT, |
30 | 32 | ) |
@@ -71,38 +73,66 @@ def refresh_user_report_snapshots(**kwargs): |
71 | 73 | if not lock.acquire(blocking=False, blocking_timeout=0): |
72 | 74 | logging.info('Nothing to do, task is already running!') |
73 | 75 | return |
| 76 | + else: |
| 77 | + logging.info('Starting process, refreshing materialized view!') |
74 | 78 |
|
75 | 79 | # Claim the existing snapshot run or create a new one |
76 | 80 | run = get_or_create_run() |
| 81 | + |
| 82 | + # Update last heart-beat |
| 83 | + BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update( |
| 84 | + date_modified=timezone.now() |
| 85 | + ) |
| 86 | + |
77 | 87 | last_processed_org_id = run.last_processed_org_id or '' |
| 88 | + last_time = time.time() |
| 89 | + |
78 | 90 | try: |
79 | 91 | while chunk_qs := iter_org_chunks_after(last_processed_org_id): |
| 92 | + logging.info( |
| 93 | + f'Processing queue, last_processed_org_id: {last_processed_org_id}' |
| 94 | + ) |
80 | 95 | billing_map = get_current_billing_period_dates_by_org(chunk_qs) |
| 96 | + logging.info('\tBilling map retrieved') |
81 | 97 | limits_map = get_organizations_effective_limits(chunk_qs, True, True) |
| 98 | + logging.info('\tLimits map retrieved') |
82 | 99 | usage_map = calc.calculate_usage_batch(chunk_qs, billing_map) |
| 100 | + logging.info('\tUsage map retrieved') |
83 | 101 | last_processed_org_id = process_chunk( |
84 | 102 | chunk_qs, usage_map, limits_map, run.pk |
85 | 103 | ) |
86 | 104 |
|
87 | 105 | # Update the run progress |
| 106 | + logging.info( |
| 107 | + f'\tUpdating hearbeat, ' |
| 108 | + f'new last_processed_org_id: {last_processed_org_id}' |
| 109 | + ) |
88 | 110 | BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update( |
89 | 111 | last_processed_org_id=last_processed_org_id, |
90 | 112 | date_modified=timezone.now(), |
91 | 113 | ) |
92 | 114 |
|
| 115 | + if time.time() - last_time >= 15 * 60: |
| 116 | + logging.info('\tRefreshing the materialized view…') |
| 117 | + last_time = time.time() |
| 118 | + refresh_user_reports_materialized_view() |
| 119 | + |
93 | 120 | # All orgs processed: cleanup stale, refresh MV and mark run as completed |
| 121 | + logging.info('Clean-up') |
94 | 122 | cleanup_stale_snapshots_and_refresh_mv(run.pk) |
| 123 | + logging.info('Mark run as complete') |
95 | 124 | BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update( |
96 | 125 | status=BillingAndUsageSnapshotStatus.COMPLETED, |
97 | 126 | date_modified=timezone.now(), |
98 | 127 | ) |
99 | 128 |
|
100 | | - # Release the lock |
101 | | - lock.release() |
102 | | - |
103 | 129 | except Exception as ex: |
104 | 130 | run = BillingAndUsageSnapshotRun.objects.get(pk=run.pk) |
105 | 131 | details = run.details or {} |
106 | 132 | details.update({'last_error': str(ex), 'ts': timezone.now().isoformat()}) |
107 | 133 | run.details = details |
108 | 134 | run.save(update_fields=['details', 'date_modified']) |
| 135 | + finally: |
| 136 | + # Release the lock |
| 137 | + lock.release() |
| 138 | + logging.info('Lock released!') |
0 commit comments