From 78de2894a71ad629755f28008fa0604e9463046b Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 12 Jun 2024 23:44:42 -0600 Subject: [PATCH 1/2] have the same lock for upload + preprocess task --- tasks/preprocess_upload.py | 9 +++++++-- tasks/upload.py | 4 +++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tasks/preprocess_upload.py b/tasks/preprocess_upload.py index 9ec3287ae..a4c3ff7f9 100644 --- a/tasks/preprocess_upload.py +++ b/tasks/preprocess_upload.py @@ -24,6 +24,7 @@ from services.yaml import save_repo_yaml_to_database_if_needed from services.yaml.fetcher import fetch_commit_yaml_from_provider from tasks.base import BaseCodecovTask +from tasks.upload import UPLOAD_LOCK_NAME log = logging.getLogger(__name__) @@ -48,7 +49,8 @@ def run_impl( "Received preprocess upload task", extra=dict(repoid=repoid, commit=commitid, report_code=report_code), ) - lock_name = f"preprocess_upload_lock_{repoid}_{commitid}_{report_code}" + lock_name = UPLOAD_LOCK_NAME(repoid, commitid) + # lock_name = f"preprocess_upload_lock_{repoid}_{commitid}_{report_code}" redis_connection = get_redis_connection() # This task only needs to run once per commit (per report_code) # To generate the report. So if one is already running we don't need another @@ -62,7 +64,10 @@ def run_impl( with redis_connection.lock( lock_name, timeout=60 * 5, - blocking_timeout=None, + # This is the timeout that this task will wait to wait for the lock. This should + # be non-zero as otherwise it waits indefinitely to get the lock, and ideally smaller than + # the blocking timeout for the upload task so this one goes first, although it can go second + blocking_timeout=3, ): return self.process_impl_within_lock( db_session=db_session, diff --git a/tasks/upload.py b/tasks/upload.py index 5db7fd854..ebe3200d1 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -64,6 +64,8 @@ CHUNK_SIZE = 3 +UPLOAD_LOCK_NAME = lambda repoid, commitid: f"upload_lock_{repoid}_{commitid}" + class UploadContext: """ @@ -91,7 +93,7 @@ def lock_name(self, lock_type: str): if lock_type == "upload_processing": return UPLOAD_PROCESSING_LOCK_NAME(self.repoid, self.commitid) else: - return f"{lock_type}_lock_{self.repoid}_{self.commitid}" + return UPLOAD_LOCK_NAME(self.repoid, self.commitid) else: return f"{lock_type}_lock_{self.repoid}_{self.commitid}_{self.report_type.value}" From 17000fd664427c0d477655b58a744c07bde63c5e Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 14 Jun 2024 10:20:10 -0600 Subject: [PATCH 2/2] Add/fix explanation comments --- tasks/preprocess_upload.py | 5 ++--- tasks/upload.py | 2 ++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tasks/preprocess_upload.py b/tasks/preprocess_upload.py index a4c3ff7f9..892c3c49d 100644 --- a/tasks/preprocess_upload.py +++ b/tasks/preprocess_upload.py @@ -50,7 +50,6 @@ def run_impl( extra=dict(repoid=repoid, commit=commitid, report_code=report_code), ) lock_name = UPLOAD_LOCK_NAME(repoid, commitid) - # lock_name = f"preprocess_upload_lock_{repoid}_{commitid}_{report_code}" redis_connection = get_redis_connection() # This task only needs to run once per commit (per report_code) # To generate the report. So if one is already running we don't need another @@ -65,8 +64,8 @@ def run_impl( lock_name, timeout=60 * 5, # This is the timeout that this task will wait to wait for the lock. This should - # be non-zero as otherwise it waits indefinitely to get the lock, and ideally smaller than - # the blocking timeout for the upload task so this one goes first, although it can go second + # be non-zero as otherwise it waits indefinitely to get the lock. It's also smaller than + # the upload task's blocking timeout to guarantee an Upload tasks runs blocking_timeout=3, ): return self.process_impl_within_lock( diff --git a/tasks/upload.py b/tasks/upload.py index ebe3200d1..15e4b0ab6 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -64,6 +64,8 @@ CHUNK_SIZE = 3 +# Making the upload lock name a constant so it can be shared between the preprocess and +# the upload task, as they have overlapping logic that collides in some cases UPLOAD_LOCK_NAME = lambda repoid, commitid: f"upload_lock_{repoid}_{commitid}"