Skip to content

Commit

Permalink
Use asyncio.Lock, added more notes about the issue
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Chong <[email protected]>
  • Loading branch information
aaronchongth committed May 6, 2024
1 parent 4ded675 commit cf37add
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions packages/api-server/api_server/repositories/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import sys
from datetime import datetime
from multiprocessing import Lock
from typing import Dict, List, Optional, Sequence, Tuple, cast

from fastapi import Depends, HTTPException
Expand Down Expand Up @@ -128,7 +128,7 @@ def parse_destination(
class TaskRepository:
def __init__(self, user: User):
self.user = user
self.save_task_state_mutex = Lock()
self.save_task_state_mutex = asyncio.Lock()

async def save_task_request(
self, task_state: TaskState, task_request: TaskRequest
Expand Down Expand Up @@ -164,9 +164,16 @@ async def query_task_requests(self, task_ids: List[str]) -> List[DbTaskRequest]:
raise HTTPException(422, str(e)) from e

async def save_task_state(self, task_state: TaskState) -> None:
# FIXME: this may be fixed upstream in DB or ORM, this mutex can be
# removed once these libraries have been updated and tested to be fixed
with self.save_task_state_mutex:
# FIXME: If the task dispatcher is also provided websocket access to
# the API server, when a new task is dispatched via the API server,
# there may be a race condition where both the ROS 2 task response and
# task dispatcher websocket update may attempt to create a new task
# state model with the same task ID. This have unfortunately not been
# reproducible locally, only in the production environment, which uses
# Postgres instead of sqlite. This may be fixed upstream in DB or ORM,
# this mutex can be removed once these libraries have been updated and
# tested to be fixed.
async with self.save_task_state_mutex:
task_state_dict = {
"data": task_state.json(),
"category": task_state.category.__root__
Expand Down

0 comments on commit cf37add

Please sign in to comment.