Skip to content

Commit

Permalink
chg: [Pub/Sub] Internal Pub/Sub improvements in management of backgro…
Browse files Browse the repository at this point in the history
…und publishing.
  • Loading branch information
cedricbonhomme committed Dec 10, 2024
1 parent 2be4eef commit f3a6948
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions website/stream/listeners.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any

import threading
import json
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone

from flask import url_for
Expand All @@ -16,11 +17,31 @@
from website.web.bootstrap import vulnerabilitylookup


logging.config.dictConfig(get_config("logging"))

executor = ThreadPoolExecutor(max_workers=10)


def handle_exception(future: Any) -> None:
try:
future.result() # Retrieve the result of the task; raises if an exception occurred
except Exception as e:
logging.error(f"Background task failed: {e}", exc_info=True)


def publish_in_background(channel: str, message: str) -> None:
"""Offload the publishing logic to a background thread to avoid blocking the database transaction."""
threading.Thread(
target=vulnerabilitylookup.redis_client.publish, args=(channel, message)
).start()

def task() -> None:
try:
vulnerabilitylookup.redis_client.publish(channel, message)
except Exception as e:
logging.error(f"Error in publish_in_background: {e}", exc_info=True)
raise # Optionally re-raise the exception if needed

future = executor.submit(task)
# Optionally handle the exception when the future is retrieved
future.add_done_callback(handle_exception)


def stream_comment(mapper: Mapper, connection: Connection, target: Any) -> None: # type: ignore[type-arg]
Expand Down

0 comments on commit f3a6948

Please sign in to comment.