Skip to content

Commit

Permalink
fix: optimize rerun_import_all_images job
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael0202 committed Dec 5, 2024
1 parent 80a7105 commit 9354aac
Showing 1 changed file with 43 additions and 10 deletions.
53 changes: 43 additions & 10 deletions robotoff/workers/tasks/import_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def rerun_import_all_images(
where_clauses.append(ImageModel.server_type == server_type.name)
query = (
ImageModel.select(
ImageModel.barcode, ImageModel.image_id, ImageModel.server_type
ImageModel.id,
ImageModel.barcode,
ImageModel.image_id,
ImageModel.server_type,
)
.where(*where_clauses)
.order_by(ImageModel.uploaded_at.desc())
Expand All @@ -104,18 +107,16 @@ def rerun_import_all_images(
if return_count:
return query.count()

for barcode, image_id, server_type_str in query:
for image_model_id, barcode, image_id, server_type_str in query:
if not isinstance(barcode, str) and not barcode.isdigit():
raise ValueError("Invalid barcode: %s" % barcode)

product_id = ProductIdentifier(barcode, ServerType[server_type_str])
image_url = generate_image_url(product_id, image_id)
ocr_url = generate_json_ocr_url(product_id, image_id)
enqueue_job(
run_import_image_job,
get_high_queue(product_id),
job_kwargs={"result_ttl": 0},
run_import_image(
product_id=product_id,
image_model_id=image_model_id,
image_url=image_url,
ocr_url=ocr_url,
flags=flags,
Expand Down Expand Up @@ -144,16 +145,16 @@ def run_import_image_job(
What tasks are performed can be controlled using the `flags` parameter. By
default, all tasks are performed. A new rq job is enqueued for each task.
Before running the tasks, the image is downloaded and stored in the Robotoff
DB.
:param product_id: the product identifier
:param image_url: the URL of the image to import
:param ocr_url: the URL of the OCR JSON file
:param flags: the list of flags to run, defaults to None (all)
"""
logger.info("Running `import_image` for %s, image %s", product_id, image_url)

if flags is None:
flags = [flag for flag in ImportImageFlag]

source_image = get_source_from_url(image_url)
product = get_product_store(product_id.server_type)[product_id]
if product is None and settings.ENABLE_MONGODB_ACCESS:
Expand Down Expand Up @@ -185,13 +186,45 @@ def run_import_image_job(
ImageModel.bulk_update([image_model], fields=["deleted"])
return

run_import_image(
product_id=product_id,
image_model_id=image_model.id,
image_url=image_url,
ocr_url=ocr_url,
flags=flags,
)


def run_import_image(
product_id: ProductIdentifier,
image_model_id: int,
image_url: str,
ocr_url: str,
flags: list[ImportImageFlag] | None = None,
) -> None:
"""Launch all extraction tasks on an image.
We assume that the image exists in the Robotoff DB.
What tasks are performed can be controlled using the `flags` parameter. By
default, all tasks are performed. A new rq job is enqueued for each task.
:param product_id: the product identifier
:param image_model_id: the DB ID of the image
:param image_url: the URL of the image to import
:param ocr_url: the URL of the OCR JSON file
:param flags: the list of flags to run, defaults to None (all)
"""
if flags is None:
flags = [flag for flag in ImportImageFlag]

if ImportImageFlag.add_image_fingerprint in flags:
# Compute image fingerprint, this job is low priority
enqueue_job(
add_image_fingerprint_job,
low_queue,
job_kwargs={"result_ttl": 0},
image_model_id=image_model.id,
image_model_id=image_model_id,
)

if product_id.server_type.is_food():
Expand Down

0 comments on commit 9354aac

Please sign in to comment.