diff --git a/backend/api/migrations/0013_auto_20260119_1148.py b/backend/api/migrations/0013_auto_20260119_1148.py new file mode 100644 index 000000000..760fe25f1 --- /dev/null +++ b/backend/api/migrations/0013_auto_20260119_1148.py @@ -0,0 +1,37 @@ +# Generated by Django 3.2.25 on 2026-01-19 19:48 + +import db_comments.model_mixins +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('api', '0012_auto_20250131_1252'), + ] + + operations = [ + migrations.CreateModel( + name='IcbcUploadProgress', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('create_timestamp', models.DateTimeField(auto_now_add=True, null=True)), + ('create_user', models.CharField(default='SYSTEM', max_length=130)), + ('update_timestamp', models.DateTimeField(auto_now=True, null=True)), + ('update_user', models.CharField(max_length=130, null=True)), + ('progress', models.IntegerField(default=0)), + ('status_text', models.CharField(default='Starting...', max_length=255)), + ('current_page', models.IntegerField(default=0)), + ('total_pages', models.IntegerField(default=0)), + ('complete', models.BooleanField(default=False)), + ('error', models.TextField(blank=True, null=True)), + ('results', models.JSONField(blank=True, null=True)), + ('upload', models.OneToOneField(db_column='upload_id', on_delete=django.db.models.deletion.CASCADE, related_name='progress', to='api.icbcuploaddate')), + ], + options={ + 'db_table': 'icbc_upload_progress', + }, + bases=(models.Model, db_comments.model_mixins.DBComments), + ), + ] diff --git a/backend/api/models/__init__.py b/backend/api/models/__init__.py index f7598ee9e..7be975bbb 100644 --- a/backend/api/models/__init__.py +++ b/backend/api/models/__init__.py @@ -27,6 +27,7 @@ from . import credit_transfer_history from . import icbc_vehicle from . import icbc_upload_date +from . import icbc_upload_progress from . import notification, notification_subscription from . import sales_evidence from . import compliance_ratio diff --git a/backend/api/models/icbc_upload_progress.py b/backend/api/models/icbc_upload_progress.py new file mode 100644 index 000000000..0d826e193 --- /dev/null +++ b/backend/api/models/icbc_upload_progress.py @@ -0,0 +1,45 @@ +from django.db import models +from auditable.models import Auditable +from api.models.icbc_upload_date import IcbcUploadDate + + +class IcbcUploadProgress(Auditable): + """ + Tracks the progress of ICBC data uploads. + """ + upload = models.OneToOneField( + IcbcUploadDate, + on_delete=models.CASCADE, + related_name='progress', + db_column='upload_id' + ) + progress = models.IntegerField( + default=0 + ) + status_text = models.CharField( + max_length=255, + default='Starting...' + ) + current_page = models.IntegerField( + default=0 + ) + total_pages = models.IntegerField( + default=0 + ) + complete = models.BooleanField( + default=False + ) + error = models.TextField( + null=True, + blank=True + ) + results = models.JSONField( + null=True, + blank=True + ) + + class Meta: + db_table = 'icbc_upload_progress' + + def __str__(self): + return f"Upload {self.upload.id}: {self.progress}% - {self.status_text}" diff --git a/backend/api/serializers/icbc_upload_progress.py b/backend/api/serializers/icbc_upload_progress.py new file mode 100644 index 000000000..6b7989e96 --- /dev/null +++ b/backend/api/serializers/icbc_upload_progress.py @@ -0,0 +1,23 @@ +from rest_framework import serializers +from api.models.icbc_upload_progress import IcbcUploadProgress + + +class IcbcUploadProgressSerializer(serializers.ModelSerializer): + status = serializers.CharField(source='status_text', read_only=True) + upload_id = serializers.IntegerField(source='upload.id', read_only=True) + + class Meta: + model = IcbcUploadProgress + fields = [ + 'upload_id', + 'progress', + 'status', + 'current_page', + 'total_pages', + 'complete', + 'error', + 'results', + 'create_timestamp', + 'update_timestamp' + ] + read_only_fields = ['create_timestamp', 'update_timestamp'] diff --git a/backend/api/services/icbc_upload.py b/backend/api/services/icbc_upload.py index 1bfdde5c5..35e55e8d6 100644 --- a/backend/api/services/icbc_upload.py +++ b/backend/api/services/icbc_upload.py @@ -2,13 +2,82 @@ import numpy as np import math import time -from django.db import transaction +import traceback +from django.db import transaction, connection +from django.utils import timezone from datetime import datetime from dateutil.relativedelta import relativedelta from api.models.icbc_registration_data import IcbcRegistrationData from api.models.icbc_vehicle import IcbcVehicle from api.models.model_year import ModelYear from api.models.icbc_upload_date import IcbcUploadDate +from api.models.icbc_upload_progress import IcbcUploadProgress +from api.serializers.icbc_upload_progress import IcbcUploadProgressSerializer +from api.services.minio import get_minio_object, minio_remove_object + + +def get_upload_progress(upload_obj): + try: + progress_obj = IcbcUploadProgress.objects.get(upload=upload_obj) + serializer = IcbcUploadProgressSerializer(progress_obj) + return serializer.data + except IcbcUploadProgress.DoesNotExist: + return { + "progress": 0, + "status": "Upload not found", + "complete": False, + "error": "Upload not found", + } + + +def set_upload_progress( + upload_obj, + progress, + status_text, + current_page=0, + total_pages=0, + complete=False, + error=None, +): + try: + # Get or create the progress status object + progress_obj, created = IcbcUploadProgress.objects.get_or_create( + upload_id=upload_obj.id, + defaults={ + "progress": progress, + "status_text": status_text, + "current_page": current_page, + "total_pages": total_pages, + "complete": complete, + "error": error, + }, + ) + + # If it already exists, update it + if not created: + progress_obj.progress = progress + progress_obj.status_text = status_text + progress_obj.current_page = current_page + progress_obj.total_pages = total_pages + progress_obj.complete = complete + progress_obj.error = error + progress_obj.save() + + print( + f"Progress updated: {upload_obj.id} - {progress}% - {status_text} - Page {current_page}/{total_pages}" + ) + return True + except Exception as e: + print(f"Error updating progress for {upload_obj.id}: {e}") + traceback.print_exc() + return None + + +def clear_upload_progress(upload_obj): + try: + IcbcUploadProgress.objects.filter(upload=upload_obj).delete() + except Exception as e: + print(f"Error clearing progress for {upload_obj.id}: {e}") def trim_all_columns(df): @@ -21,200 +90,388 @@ def trim_all_columns(df): def format_dataframe(df): df = df[ - (df['HYBRID_VEHICLE_FLAG'] != 'N') | - (df['ELECTRIC_VEHICLE_FLAG'] != 'N') | - (df['FUEL_TYPE'].str.upper() == 'ELECTRIC') | - (df['FUEL_TYPE'].str.upper() == 'HYDROGEN') | - (df['FUEL_TYPE'].str.upper() == 'GASOLINEELECTRIC') + (df["HYBRID_VEHICLE_FLAG"] != "N") + | (df["ELECTRIC_VEHICLE_FLAG"] != "N") + | (df["FUEL_TYPE"].str.upper() == "ELECTRIC") + | (df["FUEL_TYPE"].str.upper() == "HYDROGEN") + | (df["FUEL_TYPE"].str.upper() == "GASOLINEELECTRIC") ] - df['MODEL_YEAR'].fillna(0, inplace=True) - df['MODEL_YEAR'] = pd.to_numeric(df['MODEL_YEAR']) - df.drop(df[df['MODEL_YEAR'] <= 2018].index, inplace = True) + df["MODEL_YEAR"].fillna(0, inplace=True) + df["MODEL_YEAR"] = pd.to_numeric(df["MODEL_YEAR"]) + df.drop(df[df["MODEL_YEAR"] <= 2018].index, inplace=True) - df['VIN'].fillna(0, inplace=True) - df.drop(df[df['VIN'] == 0].index, inplace = True) + df["VIN"].fillna(0, inplace=True) + df.drop(df[df["VIN"] == 0].index, inplace=True) - df = df[['MODEL_YEAR', 'MAKE', 'MODEL', 'VIN']] + df = df[["MODEL_YEAR", "MAKE", "MODEL", "VIN"]] return df -@transaction.atomic -def ingest_icbc_spreadsheet(current_excelfile, current_excelfile_name, requesting_user, dateCurrentTo, previous_excelfile): - try: - start_time = time.time() +def read_csv_file(filepath, source_label): + """ + Read CSV file in chunks and return as list of values. + """ + df_list = [] + for df in pd.read_csv( + filepath, + sep=",", + error_bad_lines=False, + iterator=True, + low_memory=True, + chunksize=50000, + header=0, + ): + df["SOURCE"] = source_label + df_list.extend(df.values.tolist()) + return df_list + + +def compare_dataframes(df_previous, df_latest): + """ + Compare two dataframes and return rows that are new or changed in the latest. + """ + c_result = ( + pd.concat([df_previous, df_latest]) + .drop_duplicates(subset=["MODEL_YEAR", "MAKE", "MODEL", "VIN"]) + .reset_index(drop=True) + ) + return c_result[c_result["SOURCE"] == "LATEST"] + - current_to_date = IcbcUploadDate.objects.create( - upload_date=dateCurrentTo, - create_user=requesting_user.username, - update_user=requesting_user.username, +def create_or_get_model_years(unique_model_years, requesting_user): + """ + Create or get ModelYear objects for the given years. + """ + model_years = [] + for unique_model_year in unique_model_years: + eff_date = datetime.strptime(str(unique_model_year), "%Y") + exp_date = eff_date + relativedelta(years=1) - relativedelta(days=1) + (model_year, _) = ModelYear.objects.get_or_create( + name=unique_model_year, + defaults={ + "create_user": requesting_user, + "update_user": requesting_user, + "effective_date": eff_date, + "expiration_date": exp_date, + }, ) + model_years.append(model_year) + return model_years - page_count = 0 - print("Processing Started") +def find_model_year_id(model_years, icbc_vehicle_year): + """ + Find the model year ID from the list of model years. + """ + for model_year in model_years: + if model_year.name == icbc_vehicle_year: + return model_year.id + return None - # Previous file processing - df_p = [] - for df in pd.read_csv( - previous_excelfile, sep=",", error_bad_lines=False, iterator=True, low_memory=True, - chunksize=50000, header=0 - ): - # df = format_dataframe(df) # pre-processing manually for now - df['SOURCE'] = 'PREVIOUS' - df_p.extend(df.values.tolist()) - - print("Read previous file", time.time() - start_time) - print("Previous file rows", len(df_p)) - - # Latest file processing - df_l = [] - for df in pd.read_csv( - current_excelfile, sep=",", error_bad_lines=False, iterator=True, low_memory=True, - chunksize=50000, header=0 + +def find_vehicle_id( + icbc_vehicles, icbc_vehicle_model, icbc_vehicle_year, icbc_vehicle_make +): + """ + Find the vehicle ID from the list of ICBC vehicles. + """ + for vh in icbc_vehicles: + if ( + vh.model_name == icbc_vehicle_model + and vh.model_year == icbc_vehicle_year + and vh.make == icbc_vehicle_make ): - # df = format_dataframe(df) # pre-processing manually for now - df['SOURCE'] = 'LATEST' - df_l.extend(df.values.tolist()) - - print("Read latest file", time.time() - start_time) - print("Latest file rows", len(df_l)) - - df_p = pd.DataFrame(df_p, columns=['MODEL_YEAR', 'MAKE', 'MODEL', 'VIN', 'SOURCE']) - df_l = pd.DataFrame(df_l, columns=['MODEL_YEAR', 'MAKE', 'MODEL', 'VIN', 'SOURCE']) - - # calculate any changes in the data between the latest file and the previously uploaded file - c_result = pd.concat([df_p, df_l]).drop_duplicates(subset=['MODEL_YEAR', 'MAKE', 'MODEL', 'VIN']).reset_index(drop=True) - c_result = c_result[c_result['SOURCE'] == 'LATEST'] - print("Compared files", time.time() - start_time) - print("Changed rows", c_result.shape) - - # If no changes detected then we end here - # and update the IcbcUploadDate Filename to the - # latest filename - if c_result.empty: - print("No file changes detected.") - current_to_date.filename = current_excelfile_name - current_to_date.save() - return (True, 0, 0) - - chunks = np.array_split(c_result, int(math.ceil(c_result.shape[0] / 25000))) - print("Number of Pages to process", len(chunks)) - - icbc_vehicles = IcbcVehicle.objects.all() - print("icbc_vehicles count:", len(icbc_vehicles)) - - # stats variables - created_records_count = 0 - updated_records_count = 0 - for df_ch in chunks: - chunk_time = time.time() - # This tells postgres to keep the db connection alive - _ = IcbcUploadDate.objects.get( - id=current_to_date.id + return vh.id + return None + + +def create_or_get_vehicle( + icbc_vehicle_model, icbc_vehicle_year_id, icbc_vehicle_make, requesting_user +): + """ + Create or get an IcbcVehicle. + """ + (vehicle, _) = IcbcVehicle.objects.get_or_create( + model_name=icbc_vehicle_model, + model_year_id=icbc_vehicle_year_id, + make=icbc_vehicle_make, + defaults={ + "create_user": requesting_user, + "update_user": requesting_user, + }, + ) + return vehicle.id + + +def process_registration_record( + icbc_vehicle_vin, vehicle_id, icbc_upload_date_id, requesting_user +): + """ + Create or update an ICBC registration data record. + Returns (created_count, updated_count) + """ + (row, created) = IcbcRegistrationData.objects.get_or_create( + vin=icbc_vehicle_vin, + defaults={ + "create_user": requesting_user, + "update_user": requesting_user, + "icbc_vehicle_id": vehicle_id, + "icbc_upload_date_id": icbc_upload_date_id, + }, + ) + + if created: + return (1, 0) + + # if vehicle id doesn't match then update id, date, username + if row.icbc_vehicle_id != vehicle_id: + row.icbc_vehicle_id = vehicle_id + row.icbc_upload_date_id = icbc_upload_date_id + row.update_user = requesting_user + row.save() + return (0, 1) + + return (0, 0) + + +def process_chunk_rows( + df_ch, model_years, icbc_vehicles, icbc_upload_date_id, requesting_user +): + """ + Process all rows in a dataframe chunk. + Returns (created_count, updated_count) + """ + created_count = 0 + updated_count = 0 + + for _, row in df_ch.iterrows(): + icbc_vehicle_year = str(int(row["MODEL_YEAR"])).strip() + icbc_vehicle_model = str(row["MODEL"]).upper().strip() + icbc_vehicle_make = str(row["MAKE"]).upper().strip() + icbc_vehicle_vin = str(row["VIN"]).upper().strip() + + # Find Model Year ID + icbc_vehicle_year_id = find_model_year_id(model_years, icbc_vehicle_year) + + # Find or create Vehicle + vehicle_id = find_vehicle_id( + icbc_vehicles, icbc_vehicle_model, icbc_vehicle_year, icbc_vehicle_make + ) + if vehicle_id is None: + vehicle_id = create_or_get_vehicle( + icbc_vehicle_model, + icbc_vehicle_year_id, + icbc_vehicle_make, + requesting_user, ) - print('Processing page: ' + str(page_count)) - print('Row Count: ' + str(df_ch.shape[0])) - page_count += 1 - - if df_ch.shape[0] <= 0: - continue - - unique_model_years = df_ch['MODEL_YEAR'].unique() - # unique_models = df_ch['MODEL'].unique() - # unique_makes = df_ch['MAKE'].unique() - # unique_vins = df_ch['VIN'].unique() - # print("unique_model_years", unique_model_years.shape[0]) - # print("unique_models", unique_models.shape[0]) - # print("unique_makes", unique_makes.shape[0]) - # print("unique_vins", unique_vins.shape[0]) - - model_years = [] - - for unique_model_year in unique_model_years: - eff_date = datetime.strptime(str(unique_model_year), '%Y') - exp_date = eff_date + relativedelta(years=1) - relativedelta(days=1) - (model_year, _) = ModelYear.objects.get_or_create( - name=unique_model_year, - defaults={ - 'create_user': requesting_user.username, - 'update_user': requesting_user.username, - 'effective_date': eff_date, - 'expiration_date': exp_date - }) - model_years.append(model_year) - - try: - with transaction.atomic(): - for _, row in df_ch.iterrows(): - icbc_vehicle_year = str(int(row['MODEL_YEAR'])).strip() - icbc_vehicle_model = str(row['MODEL']).upper().strip() - icbc_vehicle_make = str(row['MAKE']).upper().strip() - icbc_vehicle_vin = str(row['VIN']).upper().strip() - - # Searching for Model Year - for model_year in model_years: - if model_year.name == icbc_vehicle_year: - icbc_vehicle_year_id = model_year.id - - # Searching for Vehicle Id - vehicle_id = None - for vh in icbc_vehicles: - if vh.model_name == icbc_vehicle_model \ - and vh.model_year == icbc_vehicle_year \ - and vh.make == icbc_vehicle_make: - vehicle_id = vh.id - break - - # Create new vehicle - if vehicle_id == None: - (vehicle, _) = IcbcVehicle.objects.get_or_create( - model_name=icbc_vehicle_model, - model_year_id=icbc_vehicle_year_id, - make=icbc_vehicle_make, - defaults={ - 'create_user': requesting_user.username, - 'update_user': requesting_user.username - }) - vehicle_id = vehicle.id - - # Create new vin record - (row, created) = IcbcRegistrationData.objects.get_or_create( - vin=icbc_vehicle_vin, - defaults={ - 'create_user': requesting_user.username, - 'update_user': requesting_user.username, - 'icbc_vehicle_id': vehicle_id, - 'icbc_upload_date_id': current_to_date.id - }) - - if created: - created_records_count += 1 - - # if vehicle id doesn't match then update id, date, username - if not created and row.icbc_vehicle_id != vehicle_id: - row.icbc_vehicle_id = vehicle_id - row.icbc_upload_date_id = current_to_date.id - row.update_user = requesting_user.username - row.save() - updated_records_count += 1 - - except Exception as e: - print(e) - - print("Page Time: ", time.time() - chunk_time) - - """ Update IcbcUploadDate filename now that processing - has completed. If the upload failed then the IcbcUploadDate - object will have an empty filename which we can skip on - next upload """ - current_to_date.filename = current_excelfile_name - current_to_date.save() - - print("Total processing time: ", time.time() - start_time) - - return (True, created_records_count, updated_records_count) - except Exception as e: - print(e) + # Process registration record + (created, updated) = process_registration_record( + icbc_vehicle_vin, vehicle_id, icbc_upload_date_id, requesting_user + ) + created_count += created + updated_count += updated + + return (created_count, updated_count) + + +def ingest_icbc_spreadsheet( + current_excelfile, previous_excelfile, upload_obj, current_progress, progress_end +): + progress_initial_part = 40 + progress_initial_steps = 4 + progress_elements = [] + if current_progress > progress_end or current_progress > progress_initial_part: + raise Exception("Invalid progress inputs!") + increment_initial = (progress_initial_part - current_progress) // ( + progress_initial_steps + ) + for step in range(progress_initial_steps): + progress_elements.append(current_progress + (increment_initial * (step + 1))) + + start_time = time.time() + + print("Processing Started") + set_upload_progress( + upload_obj, progress_elements.pop(0), "Reading previous file...", 0, 0, False + ) + + # Read previous file + df_p = read_csv_file(previous_excelfile, "PREVIOUS") + print("Read previous file", time.time() - start_time) + print("Previous file rows", len(df_p)) + + set_upload_progress( + upload_obj, progress_elements.pop(0), "Reading latest file...", 0, 0, False + ) + + # Read latest file + df_l = read_csv_file(current_excelfile, "LATEST") + print("Read latest file", time.time() - start_time) + print("Latest file rows", len(df_l)) + + set_upload_progress( + upload_obj, progress_elements.pop(0), "Comparing files...", 0, 0, False + ) + + df_p = pd.DataFrame(df_p, columns=["MODEL_YEAR", "MAKE", "MODEL", "VIN", "SOURCE"]) + df_l = pd.DataFrame(df_l, columns=["MODEL_YEAR", "MAKE", "MODEL", "VIN", "SOURCE"]) + + # Calculate any changes in the data between files + c_result = compare_dataframes(df_p, df_l) + print("Compared files", time.time() - start_time) + print("Changed rows", c_result.shape) + + # If no changes detected, update filename and return + if c_result.empty: + print("No file changes detected.") + return (True, 0, 0) + + chunks = np.array_split(c_result, int(math.ceil(c_result.shape[0] / 25000))) + total_pages = len(chunks) + print("Number of Pages to process", total_pages) + + set_upload_progress( + upload_obj, + progress_elements.pop(0), + f"Processing {total_pages} pages...", + 0, + total_pages, + False, + ) + + increment_secondary = (progress_end - progress_initial_part) // (total_pages + 1) + for page in range(total_pages + 1): + progress_elements.append( + progress_initial_part + (increment_secondary * (page + 1)) + ) + + icbc_vehicles = IcbcVehicle.objects.all() + print("icbc_vehicles count:", len(icbc_vehicles)) + + # Process chunks + created_records_count = 0 + updated_records_count = 0 + page_count = 0 + + for df_ch in chunks: + chunk_time = time.time() + + print("Processing page: " + str(page_count)) + print("Row Count: " + str(df_ch.shape[0])) + page_count += 1 + + if df_ch.shape[0] <= 0: + continue + + unique_model_years = df_ch["MODEL_YEAR"].unique() + model_years = create_or_get_model_years( + unique_model_years, upload_obj.create_user + ) + + with transaction.atomic(): + (created, updated) = process_chunk_rows( + df_ch, + model_years, + icbc_vehicles, + upload_obj.id, + upload_obj.create_user, + ) + created_records_count += created + updated_records_count += updated + + set_upload_progress( + upload_obj, + progress_elements.pop(0), + f"Processing page {page_count} of {total_pages}...", + page_count, + total_pages, + False, + ) + + print("Page Time: ", time.time() - chunk_time) + + set_upload_progress( + upload_obj, + progress_elements.pop(0), + "Finalizing...", + total_pages, + total_pages, + False, + ) + print("Total processing time: ", time.time() - start_time) + + return (True, created_records_count, updated_records_count) + + +# meant to be used in a thread not managed by the django request-response lifecycle +def process_upload(upload_obj, filename): + previous_file = None + current_file = None + try: + current_progress = get_upload_progress(upload_obj) + if current_progress["progress"] != 0: + raise Exception("Invalid current progress!") + set_upload_progress( + upload_obj, 5, "Getting previous upload data...", 0, 0, False + ) + # get previous upload file so we can compare + last_icbc_date = ( + IcbcUploadDate.objects.exclude(filename__isnull=True) + .exclude(id=upload_obj.id) + .latest("create_timestamp") + ) + print("Last upload date", last_icbc_date.upload_date) + + # get previous file + previous_filename = last_icbc_date.filename + print("Downloading previous file", previous_filename) + set_upload_progress(upload_obj, 10, "Downloading previous file...", 0, 0, False) + previous_file = get_minio_object(previous_filename) + + # get latest file + print("Downloading latest file", filename) + set_upload_progress(upload_obj, 15, "Downloading latest file...", 0, 0, False) + current_file = get_minio_object(filename) + + print("Starting Ingest") + set_upload_progress(upload_obj, 20, "Starting data processing...", 0, 0, False) + + done = ingest_icbc_spreadsheet(current_file, previous_file, upload_obj, 20, 95) + + if done[0]: + # We remove the previous file from minio but keep the + # latest one so we can use it for compare on next upload + with transaction.atomic(): + IcbcUploadProgress.objects.filter(upload=upload_obj).update( + results={ + "dateCurrentTo": upload_obj.upload_date, + "createdRecords": done[1], + "updatedRecords": done[2], + }, + update_timestamp=timezone.now(), + ) + set_upload_progress( + upload_obj, 100, "Processing complete!", 0, 0, True, error=None + ) + upload_obj.filename = filename + upload_obj.save() + minio_remove_object(previous_filename) + print("Done processing") + + except Exception as error: + traceback.print_exc() + set_upload_progress( + upload_obj, 0, "Error occurred", 0, 0, True, error=str(error) + ) + + finally: + if previous_file: + previous_file.close() + previous_file.release_conn() + if current_file: + current_file.close() + current_file.release_conn() + connection.close() diff --git a/backend/api/viewsets/icbc_verification.py b/backend/api/viewsets/icbc_verification.py index a0d613af1..730812d4b 100644 --- a/backend/api/viewsets/icbc_verification.py +++ b/backend/api/viewsets/icbc_verification.py @@ -1,5 +1,6 @@ import json import os +import threading from django.http import HttpResponse from rest_framework import viewsets, status @@ -7,105 +8,100 @@ from rest_framework.permissions import AllowAny from rest_framework.response import Response -from api.services.icbc_upload import ingest_icbc_spreadsheet -from api.services.minio import get_minio_object, minio_remove_object +from api.services.icbc_upload import ( + get_upload_progress, + set_upload_progress, + process_upload, +) from api.models.icbc_upload_date import IcbcUploadDate from api.serializers.icbc_upload_date import IcbcUploadDateSerializer class IcbcVerificationViewSet(viewsets.GenericViewSet): permission_classes = [AllowAny] - http_method_names = ['get', 'post'] + http_method_names = ["get", "post"] - serializer_classes = { - 'default': IcbcUploadDateSerializer - } + serializer_classes = {"default": IcbcUploadDateSerializer} def get_serializer_class(self): if self.action in list(self.serializer_classes.keys()): return self.serializer_classes[self.action] - return self.serializer_classes['default'] + return self.serializer_classes["default"] - @action(detail=False, methods=['get']) + @action(detail=False, methods=["get"]) def date(self, request): - icbc_date = IcbcUploadDate.objects.last() + icbc_date = ( + IcbcUploadDate.objects.filter(filename__isnull=False) + .order_by("-upload_date") + .first() + ) serializer = self.get_serializer(icbc_date) return Response(serializer.data) - @action(detail=False, methods=['post']) + @action(detail=False, methods=["post"]) def chunk_upload(self, request): user = request.user if not user.is_government: return Response(status=status.HTTP_403_FORBIDDEN) try: - data = request.FILES.get('files') + data = request.FILES.get("files") os.rename(data.temporary_file_path(), data.name) except Exception as error: print(error) return HttpResponse(status=400, content=error) return HttpResponse( - status=201, content="nothing", content_type='application/json' + status=201, content="nothing", content_type="application/json" ) - @action(detail=False, methods=['post']) + @action(detail=False, methods=["post"]) def upload(self, request): user = request.user if not user.is_government: return Response(status=status.HTTP_403_FORBIDDEN) - - filename = request.data.get('filename') - try: - try: - # get previous upload file so we can compare - last_icbc_date = IcbcUploadDate.objects \ - .exclude(filename__isnull=True).latest('create_timestamp') - except IcbcUploadDate.DoesNotExist: - raise Exception( - """ - No previous IcbcUploadDate found with filename. Update previous Date with current filename. - """) - - print("Last upload date", last_icbc_date.upload_date) - - # get previous file - previous_filename = last_icbc_date.filename - print("Downloading previous file", previous_filename) - previous_file = get_minio_object(previous_filename) - - # get latest file - print("Downloading latest file", filename) - current_file = get_minio_object(filename) - - print("Starting Ingest") - date_current_to = request.data.get('submission_current_date') - try: - done = ingest_icbc_spreadsheet(current_file, filename, user, date_current_to, previous_file) - except: - return HttpResponse(status=400, content='Error processing data file. Please contact your administrator for assistance.') - - if done[0]: - # We remove the previous file from minio but keep the - # latest one so we can use it for compare on next upload - minio_remove_object(previous_filename) - print('Done processing') - except Exception as error: - return HttpResponse(status=400, content=error) - - finally: - previous_file.close() - previous_file.release_conn() - current_file.close() - current_file.release_conn() + filename = request.data.get("filename") + date_current_to = request.data.get("submission_current_date") + + # Create IcbcUploadDate object first + upload_obj = IcbcUploadDate.objects.create( + upload_date=date_current_to, + create_user=user.username, + update_user=user.username, + ) + print(type(upload_obj.upload_date)) + print(type(upload_obj.create_user)) + # Initialize progress with the upload object + set_upload_progress(upload_obj, 0, "Initializing...", 0, 0, False) + + # Start processing in background thread + thread = threading.Thread(target=process_upload, args=(upload_obj, filename)) + thread.daemon = True + thread.start() + + # Return immediately with upload_id for polling return HttpResponse( - status=201, - content=json.dumps({ - 'dateCurrentTo': date_current_to, - 'createdRecords': done[1], - 'updatedRecords': done[2] - }), - content_type='application/json' + status=202, + content=json.dumps({"upload_id": upload_obj.id}), + content_type="application/json", ) + + @action(detail=False, methods=["get"]) + def progress(self, request): + """Endpoint to poll for upload progress""" + upload_id = request.query_params.get("upload_id") + if not upload_id: + return Response( + {"error": "upload_id required"}, status=status.HTTP_400_BAD_REQUEST + ) + + try: + upload_obj = IcbcUploadDate.objects.get(id=upload_id) + progress_data = get_upload_progress(upload_obj) + return Response(progress_data) + except IcbcUploadDate.DoesNotExist: + return Response( + {"error": "Upload not found"}, status=status.HTTP_404_NOT_FOUND + ) diff --git a/frontend/src/app/routes/ICBCVerification.js b/frontend/src/app/routes/ICBCVerification.js index 19e05d0a3..ab09fc85c 100644 --- a/frontend/src/app/routes/ICBCVerification.js +++ b/frontend/src/app/routes/ICBCVerification.js @@ -3,7 +3,8 @@ const API_BASE_PATH = '/icbc-verification' const ICBC_VERIFICATION = { DATE: `${API_BASE_PATH}/date`, UPLOAD: `${API_BASE_PATH}/upload`, - CHUNK_UPLOAD: `${API_BASE_PATH}/chunk_upload` + CHUNK_UPLOAD: `${API_BASE_PATH}/chunk_upload`, + PROGRESS: `${API_BASE_PATH}/progress` } export default ICBC_VERIFICATION diff --git a/frontend/src/credits/UploadICBCVerificationContainer.js b/frontend/src/credits/UploadICBCVerificationContainer.js index 165e21ee7..72874bedc 100644 --- a/frontend/src/credits/UploadICBCVerificationContainer.js +++ b/frontend/src/credits/UploadICBCVerificationContainer.js @@ -21,6 +21,8 @@ const UploadICBCVerificationContainer = (props) => { const [showProcessing, setShowProcessing] = useState(false) const [showProgressBar, setShowProgressBar] = useState(false) const [uploadProgress, setUploadProgress] = useState(0) + const [progressStatus, setProgressStatus] = useState('') + const [pollingInterval, setPollingInterval] = useState(null) const today = new Date() const date = `${today.getFullYear()}-${ @@ -43,10 +45,82 @@ const UploadICBCVerificationContainer = (props) => { (100 * progressEvent.loaded) / progressEvent.total ) setUploadProgress(percentage) + setProgressStatus('Uploading file to storage...') + } + + const pollProgress = (uploadId) => { + // Store upload_id in localStorage for persistence across page refreshes + localStorage.setItem('icbc_upload_id', uploadId) + localStorage.setItem('icbc_upload_active', 'true') + + const interval = setInterval(() => { + axios + .get(ROUTES_ICBCVERIFICATION.PROGRESS, { + params: { upload_id: uploadId } + }) + .then((response) => { + const { progress, status: statusText, complete, error, results } = response.data + + setUploadProgress(progress) + setProgressStatus(statusText) + + if (complete) { + clearInterval(interval) + setPollingInterval(null) + + // Clear localStorage when complete + localStorage.removeItem('icbc_upload_id') + localStorage.removeItem('icbc_upload_active') + + if (error) { + setAlertMessage(error) + setShowProcessing(false) + setShowProgressBar(false) + } else if (results) { + const { createdRecords, updatedRecords } = results + setPreviousDateCurrentTo(results.dateCurrentTo) + + if (createdRecords === 0 && updatedRecords === 0) { + setAlertMessage('upload successful - no new records were found.') + } else { + setAlertMessage('upload successful - ' + + createdRecords + ' new records were created and ' + + updatedRecords + ' records were updated.') + } + + toastr.success('upload successful!', '', { + positionClass: 'toast-bottom-right' + }) + + setShowProcessing(false) + setShowProgressBar(false) + } + + setFiles([]) + } + }) + .catch((error) => { + console.error('Polling error:', error) + clearInterval(interval) + setPollingInterval(null) + + // Clear localStorage on error + localStorage.removeItem('icbc_upload_id') + localStorage.removeItem('icbc_upload_active') + + setAlertMessage('An error occurred while checking upload progress.') + setShowProcessing(false) + setShowProgressBar(false) + }) + }, 10000) // Poll every 10 seconds + + setPollingInterval(interval) } const doUpload = () => { setShowProgressBar(true) + setUploadProgress(0) + setProgressStatus('Preparing upload...') files.forEach((file) => { axios.get(ROUTES_UPLOADS.MINIO_URL).then((response) => { @@ -62,41 +136,31 @@ const UploadICBCVerificationContainer = (props) => { } }) .then(() => { + // File uploaded to storage, now start processing setShowProcessing(true) + setUploadProgress(0) + setProgressStatus('Starting data processing...') axios .post(ROUTES_ICBCVERIFICATION.UPLOAD, { filename, - submissionCurrentDate: dateCurrentTo + submission_current_date: dateCurrentTo }) .then((postResponse) => { - const { dateCurrentTo: updatedDateCurrentTo, createdRecords, updatedRecords } = - postResponse.data - setPreviousDateCurrentTo(updatedDateCurrentTo) - if (createdRecords === 0 && updatedRecords === 0) { - setAlertMessage('upload successful - no new records were found.') - } else { - setAlertMessage('upload successful - ' + - createdRecords + ' new records were created and ' + - updatedRecords + ' records were updated.') - } - toastr.success('upload successful!', '', { - positionClass: 'toast-bottom-right' - }) + const { upload_id: uploadId } = postResponse.data + // Start polling for progress + pollProgress(uploadId) }) .catch((error) => { console.error(error) const { response: errorResponse } = error - if (errorResponse.status === 400) { + if (errorResponse?.status === 400) { setAlertMessage(errorResponse.data) } else { setAlertMessage( 'An error has occurred while uploading. Please try again later.' ) } - }) - .finally(() => { - setFiles([]) setShowProcessing(false) setShowProgressBar(false) }) @@ -106,6 +170,8 @@ const UploadICBCVerificationContainer = (props) => { setAlertMessage( 'An error has occurred while uploading. Please try again later.' ) + setShowProcessing(false) + setShowProgressBar(false) }) }) }) @@ -113,6 +179,27 @@ const UploadICBCVerificationContainer = (props) => { useEffect(() => { refreshList(true) + + // Check if there's an active upload from a previous session + const activeUploadId = localStorage.getItem('icbc_upload_id') + const isUploadActive = localStorage.getItem('icbc_upload_active') === 'true' + + if (activeUploadId && isUploadActive) { + // Resume showing progress bar and processing status + setShowProgressBar(true) + setShowProcessing(true) + setProgressStatus('Resuming upload monitoring...') + + // Resume polling for the active upload + pollProgress(activeUploadId) + } + + // Cleanup polling interval on unmount + return () => { + if (pollingInterval) { + clearInterval(pollingInterval) + } + } }, []) if (loading) { @@ -134,6 +221,7 @@ const UploadICBCVerificationContainer = (props) => { title="Upload ICBC Registration Data" upload={doUpload} uploadProgress={uploadProgress} + progressStatus={progressStatus} user={user} /> ] diff --git a/frontend/src/credits/components/UploadVerificationData.js b/frontend/src/credits/components/UploadVerificationData.js index f6685d6db..85e296323 100644 --- a/frontend/src/credits/components/UploadVerificationData.js +++ b/frontend/src/credits/components/UploadVerificationData.js @@ -19,7 +19,8 @@ const UploadVerificationData = (props) => { showProgressBar, title, upload, - uploadProgress + uploadProgress, + progressStatus } = props const removeFile = (removedFile) => { @@ -39,6 +40,11 @@ const UploadVerificationData = (props) => {

Uploading:

+ {progressStatus && ( +
+ {progressStatus} +
+ )}