From 52e31fdce4482e3c6805125373bd529e3fd0e926 Mon Sep 17 00:00:00 2001 From: tim738745 Date: Thu, 22 Jan 2026 15:46:48 -0800 Subject: [PATCH] 2545 - transactions --- backend/api/services/icbc_upload.py | 93 +++++++++++++++-------- backend/api/viewsets/icbc_verification.py | 3 - backend/zeva/settings.py | 4 +- 3 files changed, 63 insertions(+), 37 deletions(-) diff --git a/backend/api/services/icbc_upload.py b/backend/api/services/icbc_upload.py index 35e55e8d6..3ed18efe4 100644 --- a/backend/api/services/icbc_upload.py +++ b/backend/api/services/icbc_upload.py @@ -3,7 +3,7 @@ import math import time import traceback -from django.db import transaction, connection +from django.db import transaction, connections from django.utils import timezone from datetime import datetime from dateutil.relativedelta import relativedelta @@ -38,10 +38,11 @@ def set_upload_progress( total_pages=0, complete=False, error=None, + db_conn=None, ): try: # Get or create the progress status object - progress_obj, created = IcbcUploadProgress.objects.get_or_create( + progress_obj, created = IcbcUploadProgress.objects.using(db_conn).get_or_create( upload_id=upload_obj.id, defaults={ "progress": progress, @@ -61,7 +62,7 @@ def set_upload_progress( progress_obj.total_pages = total_pages progress_obj.complete = complete progress_obj.error = error - progress_obj.save() + progress_obj.save(using=db_conn) print( f"Progress updated: {upload_obj.id} - {progress}% - {status_text} - Page {current_page}/{total_pages}" @@ -278,7 +279,12 @@ def process_chunk_rows( def ingest_icbc_spreadsheet( - current_excelfile, previous_excelfile, upload_obj, current_progress, progress_end + current_excelfile, + previous_excelfile, + upload_obj, + current_progress, + progress_end, + progress_db_conn, ): progress_initial_part = 40 progress_initial_steps = 4 @@ -295,7 +301,13 @@ def ingest_icbc_spreadsheet( print("Processing Started") set_upload_progress( - upload_obj, progress_elements.pop(0), "Reading previous file...", 0, 0, False + upload_obj, + progress_elements.pop(0), + "Reading previous file...", + 0, + 0, + False, + db_conn=progress_db_conn, ) # Read previous file @@ -304,7 +316,13 @@ def ingest_icbc_spreadsheet( print("Previous file rows", len(df_p)) set_upload_progress( - upload_obj, progress_elements.pop(0), "Reading latest file...", 0, 0, False + upload_obj, + progress_elements.pop(0), + "Reading latest file...", + 0, + 0, + False, + db_conn=progress_db_conn, ) # Read latest file @@ -313,7 +331,13 @@ def ingest_icbc_spreadsheet( print("Latest file rows", len(df_l)) set_upload_progress( - upload_obj, progress_elements.pop(0), "Comparing files...", 0, 0, False + upload_obj, + progress_elements.pop(0), + "Comparing files...", + 0, + 0, + False, + db_conn=progress_db_conn, ) df_p = pd.DataFrame(df_p, columns=["MODEL_YEAR", "MAKE", "MODEL", "VIN", "SOURCE"]) @@ -340,6 +364,7 @@ def ingest_icbc_spreadsheet( 0, total_pages, False, + db_conn=progress_db_conn, ) increment_secondary = (progress_end - progress_initial_part) // (total_pages + 1) @@ -371,25 +396,25 @@ def ingest_icbc_spreadsheet( unique_model_years, upload_obj.create_user ) - with transaction.atomic(): - (created, updated) = process_chunk_rows( - df_ch, - model_years, - icbc_vehicles, - upload_obj.id, - upload_obj.create_user, - ) - created_records_count += created - updated_records_count += updated - - set_upload_progress( - upload_obj, - progress_elements.pop(0), - f"Processing page {page_count} of {total_pages}...", - page_count, - total_pages, - False, - ) + (created, updated) = process_chunk_rows( + df_ch, + model_years, + icbc_vehicles, + upload_obj.id, + upload_obj.create_user, + ) + created_records_count += created + updated_records_count += updated + + set_upload_progress( + upload_obj, + progress_elements.pop(0), + f"Processing page {page_count} of {total_pages}...", + page_count, + total_pages, + False, + db_conn=progress_db_conn, + ) print("Page Time: ", time.time() - chunk_time) @@ -400,6 +425,7 @@ def ingest_icbc_spreadsheet( total_pages, total_pages, False, + db_conn=progress_db_conn, ) print("Total processing time: ", time.time() - start_time) @@ -439,12 +465,13 @@ def process_upload(upload_obj, filename): print("Starting Ingest") set_upload_progress(upload_obj, 20, "Starting data processing...", 0, 0, False) - done = ingest_icbc_spreadsheet(current_file, previous_file, upload_obj, 20, 95) - - if done[0]: - # We remove the previous file from minio but keep the - # latest one so we can use it for compare on next upload - with transaction.atomic(): + with transaction.atomic(): + done = ingest_icbc_spreadsheet( + current_file, previous_file, upload_obj, 20, 95, "other" + ) + if done[0]: + # We remove the previous file from minio but keep the + # latest one so we can use it for compare on next upload IcbcUploadProgress.objects.filter(upload=upload_obj).update( results={ "dateCurrentTo": upload_obj.upload_date, @@ -474,4 +501,4 @@ def process_upload(upload_obj, filename): if current_file: current_file.close() current_file.release_conn() - connection.close() + connections.close_all() diff --git a/backend/api/viewsets/icbc_verification.py b/backend/api/viewsets/icbc_verification.py index 730812d4b..24a755e88 100644 --- a/backend/api/viewsets/icbc_verification.py +++ b/backend/api/viewsets/icbc_verification.py @@ -70,15 +70,12 @@ def upload(self, request): create_user=user.username, update_user=user.username, ) - print(type(upload_obj.upload_date)) - print(type(upload_obj.create_user)) # Initialize progress with the upload object set_upload_progress(upload_obj, 0, "Initializing...", 0, 0, False) # Start processing in background thread thread = threading.Thread(target=process_upload, args=(upload_obj, filename)) - thread.daemon = True thread.start() # Return immediately with upload_id for polling diff --git a/backend/zeva/settings.py b/backend/zeva/settings.py index 4282f1e73..b4c4044d4 100644 --- a/backend/zeva/settings.py +++ b/backend/zeva/settings.py @@ -105,7 +105,9 @@ # https://docs.djangoproject.com/en/3.0/ref/settings/#databases DATABASES = { - 'default': database.config() + 'default': database.config(), + # currently used to exclude certain db operations inside a transaction from being part of said transaction + 'other': database.config() } KEYCLOAK = keycloak.config()