Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 60 additions & 33 deletions backend/api/services/icbc_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import math
import time
import traceback
from django.db import transaction, connection
from django.db import transaction, connections
from django.utils import timezone
from datetime import datetime
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -38,10 +38,11 @@ def set_upload_progress(
total_pages=0,
complete=False,
error=None,
db_conn=None,
):
try:
# Get or create the progress status object
progress_obj, created = IcbcUploadProgress.objects.get_or_create(
progress_obj, created = IcbcUploadProgress.objects.using(db_conn).get_or_create(
upload_id=upload_obj.id,
defaults={
"progress": progress,
Expand All @@ -61,7 +62,7 @@ def set_upload_progress(
progress_obj.total_pages = total_pages
progress_obj.complete = complete
progress_obj.error = error
progress_obj.save()
progress_obj.save(using=db_conn)

print(
f"Progress updated: {upload_obj.id} - {progress}% - {status_text} - Page {current_page}/{total_pages}"
Expand Down Expand Up @@ -278,7 +279,12 @@ def process_chunk_rows(


def ingest_icbc_spreadsheet(
current_excelfile, previous_excelfile, upload_obj, current_progress, progress_end
current_excelfile,
previous_excelfile,
upload_obj,
current_progress,
progress_end,
progress_db_conn,
):
progress_initial_part = 40
progress_initial_steps = 4
Expand All @@ -295,7 +301,13 @@ def ingest_icbc_spreadsheet(

print("Processing Started")
set_upload_progress(
upload_obj, progress_elements.pop(0), "Reading previous file...", 0, 0, False
upload_obj,
progress_elements.pop(0),
"Reading previous file...",
0,
0,
False,
db_conn=progress_db_conn,
)

# Read previous file
Expand All @@ -304,7 +316,13 @@ def ingest_icbc_spreadsheet(
print("Previous file rows", len(df_p))

set_upload_progress(
upload_obj, progress_elements.pop(0), "Reading latest file...", 0, 0, False
upload_obj,
progress_elements.pop(0),
"Reading latest file...",
0,
0,
False,
db_conn=progress_db_conn,
)

# Read latest file
Expand All @@ -313,7 +331,13 @@ def ingest_icbc_spreadsheet(
print("Latest file rows", len(df_l))

set_upload_progress(
upload_obj, progress_elements.pop(0), "Comparing files...", 0, 0, False
upload_obj,
progress_elements.pop(0),
"Comparing files...",
0,
0,
False,
db_conn=progress_db_conn,
)

df_p = pd.DataFrame(df_p, columns=["MODEL_YEAR", "MAKE", "MODEL", "VIN", "SOURCE"])
Expand All @@ -340,6 +364,7 @@ def ingest_icbc_spreadsheet(
0,
total_pages,
False,
db_conn=progress_db_conn,
)

increment_secondary = (progress_end - progress_initial_part) // (total_pages + 1)
Expand Down Expand Up @@ -371,25 +396,25 @@ def ingest_icbc_spreadsheet(
unique_model_years, upload_obj.create_user
)

with transaction.atomic():
(created, updated) = process_chunk_rows(
df_ch,
model_years,
icbc_vehicles,
upload_obj.id,
upload_obj.create_user,
)
created_records_count += created
updated_records_count += updated

set_upload_progress(
upload_obj,
progress_elements.pop(0),
f"Processing page {page_count} of {total_pages}...",
page_count,
total_pages,
False,
)
(created, updated) = process_chunk_rows(
df_ch,
model_years,
icbc_vehicles,
upload_obj.id,
upload_obj.create_user,
)
created_records_count += created
updated_records_count += updated

set_upload_progress(
upload_obj,
progress_elements.pop(0),
f"Processing page {page_count} of {total_pages}...",
page_count,
total_pages,
False,
db_conn=progress_db_conn,
)

print("Page Time: ", time.time() - chunk_time)

Expand All @@ -400,6 +425,7 @@ def ingest_icbc_spreadsheet(
total_pages,
total_pages,
False,
db_conn=progress_db_conn,
)
print("Total processing time: ", time.time() - start_time)

Expand Down Expand Up @@ -439,12 +465,13 @@ def process_upload(upload_obj, filename):
print("Starting Ingest")
set_upload_progress(upload_obj, 20, "Starting data processing...", 0, 0, False)

done = ingest_icbc_spreadsheet(current_file, previous_file, upload_obj, 20, 95)

if done[0]:
# We remove the previous file from minio but keep the
# latest one so we can use it for compare on next upload
with transaction.atomic():
with transaction.atomic():
done = ingest_icbc_spreadsheet(
current_file, previous_file, upload_obj, 20, 95, "other"
)
if done[0]:
# We remove the previous file from minio but keep the
# latest one so we can use it for compare on next upload
IcbcUploadProgress.objects.filter(upload=upload_obj).update(
results={
"dateCurrentTo": upload_obj.upload_date,
Expand Down Expand Up @@ -474,4 +501,4 @@ def process_upload(upload_obj, filename):
if current_file:
current_file.close()
current_file.release_conn()
connection.close()
connections.close_all()
3 changes: 0 additions & 3 deletions backend/api/viewsets/icbc_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,12 @@ def upload(self, request):
create_user=user.username,
update_user=user.username,
)
print(type(upload_obj.upload_date))
print(type(upload_obj.create_user))

# Initialize progress with the upload object
set_upload_progress(upload_obj, 0, "Initializing...", 0, 0, False)

# Start processing in background thread
thread = threading.Thread(target=process_upload, args=(upload_obj, filename))
thread.daemon = True
thread.start()

# Return immediately with upload_id for polling
Expand Down
4 changes: 3 additions & 1 deletion backend/zeva/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@
# https://docs.djangoproject.com/en/3.0/ref/settings/#databases

DATABASES = {
'default': database.config()
'default': database.config(),
# currently used to exclude certain db operations inside a transaction from being part of said transaction
'other': database.config()
}

KEYCLOAK = keycloak.config()
Expand Down