From cf37addbef64dcd00d6fd7bf0de7994682e4c651 Mon Sep 17 00:00:00 2001 From: Aaron Chong Date: Mon, 6 May 2024 11:03:02 +0800 Subject: [PATCH] Use asyncio.Lock, added more notes about the issue Signed-off-by: Aaron Chong --- .../api-server/api_server/repositories/tasks.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/packages/api-server/api_server/repositories/tasks.py b/packages/api-server/api_server/repositories/tasks.py index a98f9e0d3..d82cae07c 100644 --- a/packages/api-server/api_server/repositories/tasks.py +++ b/packages/api-server/api_server/repositories/tasks.py @@ -1,6 +1,6 @@ +import asyncio import sys from datetime import datetime -from multiprocessing import Lock from typing import Dict, List, Optional, Sequence, Tuple, cast from fastapi import Depends, HTTPException @@ -128,7 +128,7 @@ def parse_destination( class TaskRepository: def __init__(self, user: User): self.user = user - self.save_task_state_mutex = Lock() + self.save_task_state_mutex = asyncio.Lock() async def save_task_request( self, task_state: TaskState, task_request: TaskRequest @@ -164,9 +164,16 @@ async def query_task_requests(self, task_ids: List[str]) -> List[DbTaskRequest]: raise HTTPException(422, str(e)) from e async def save_task_state(self, task_state: TaskState) -> None: - # FIXME: this may be fixed upstream in DB or ORM, this mutex can be - # removed once these libraries have been updated and tested to be fixed - with self.save_task_state_mutex: + # FIXME: If the task dispatcher is also provided websocket access to + # the API server, when a new task is dispatched via the API server, + # there may be a race condition where both the ROS 2 task response and + # task dispatcher websocket update may attempt to create a new task + # state model with the same task ID. This have unfortunately not been + # reproducible locally, only in the production environment, which uses + # Postgres instead of sqlite. This may be fixed upstream in DB or ORM, + # this mutex can be removed once these libraries have been updated and + # tested to be fixed. + async with self.save_task_state_mutex: task_state_dict = { "data": task_state.json(), "category": task_state.category.__root__