Skip to content

Commit

Permalink
Refactored alert db interactions to repository, with lru cache for lo…
Browse files Browse the repository at this point in the history
…cation alerts checking

Signed-off-by: Aaron Chong <[email protected]>
  • Loading branch information
aaronchongth committed May 29, 2024
1 parent deca732 commit b7c04a5
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 109 deletions.
24 changes: 21 additions & 3 deletions packages/api-server/api_server/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@
LiftState,
)
from .models.delivery_alerts import action_from_msg, category_from_msg, tier_from_msg
from .repositories import CachedFilesRepository, cached_files_repo
from .repositories import (
CachedFilesRepository,
LocationAlertFailResponse,
LocationAlertSuccessResponse,
cached_files_repo,
is_final_location_alert_check,
task_id_to_all_locations_success_cache,
)
from .rmf_io import alert_events, rmf_events
from .ros import ros_node

Expand Down Expand Up @@ -274,8 +281,6 @@ def convert_fleet_alert(fleet_alert: RmfFleetAlert):
for p in fleet_alert.alert_parameters:
parameters.append(AlertParameter(name=p.name, value=p.value))

# check task phases to find out what waypoint it is?

return AlertRequest(
id=fleet_alert.id,
unix_millis_alert_time=round(datetime.now().timestamp() * 1000),
Expand All @@ -292,6 +297,19 @@ def convert_fleet_alert(fleet_alert: RmfFleetAlert):
def handle_fleet_alert(fleet_alert: AlertRequest):
logging.info("Received fleet alert:")
logging.info(fleet_alert)

# Handle request for checking all location completion success for
# this task
is_final_check = is_final_location_alert_check(fleet_alert)
if is_final_check:
successful_so_far = task_id_to_all_locations_success_cache.lookup(
fleet_alert.task_id
)
if successful_so_far is None or not successful_so_far:
self.respond_to_alert(fleet_alert.id, LocationAlertFailResponse)
else:
self.respond_to_alert(fleet_alert.id, LocationAlertSuccessResponse)

alert_events.alert_requests.on_next(fleet_alert)

fleet_alert_sub = ros_node().create_subscription(
Expand Down
7 changes: 7 additions & 0 deletions packages/api-server/api_server/repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from .alerts import (
AlertRepository,
LocationAlertFailResponse,
LocationAlertSuccessResponse,
is_final_location_alert_check,
task_id_to_all_locations_success_cache,
)
from .cached_files import CachedFilesRepository, cached_files_repo
from .fleets import FleetRepository
from .rmf import RmfRepository
Expand Down
270 changes: 270 additions & 0 deletions packages/api-server/api_server/repositories/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import logging
from collections import deque
from datetime import datetime
from typing import List, Optional

from api_server.models import AlertRequest, AlertResponse
from api_server.models import tortoise_models as ttm

# from api_server.gateway import rmf_gateway


# TODO: not hardcode all these expected values
LocationAlertSuccessResponse = "success"
LocationAlertFailResponse = "fail"
LocationAlertTypeParameterName = "type"
LocationAlertTypeParameterValue = "location_result"
LocationAlertLocationParameterName = "location_name"
LocationAlertFinalCheckTypeParameterValue = "check_all_task_location_alerts"


def get_location_from_location_alert(alert: AlertRequest) -> Optional[str]:
"""
Returns the location name from a location alert when possible, otherwise
returns None.
Note: This is an experimental feature and may be subjected to
modifications often.
"""
if (
len(alert.alert_parameters) < 2
or LocationAlertSuccessResponse not in alert.responses_available
or LocationAlertFailResponse not in alert.responses_available
):
return None

# Check type
alert_type = None
for param in alert.alert_parameters:
if param.name == LocationAlertTypeParameterName:
alert_type = param.value
break
if alert_type != LocationAlertTypeParameterValue:
return None

# Check location name
# TODO: make sure that there are no duplicated locations that have
# not been responded to yet
for param in alert.alert_parameters:
if param.name == LocationAlertLocationParameterName:
return param.value
return None


def is_final_location_alert_check(alert: AlertRequest) -> bool:
"""
Checks if the alert request requires a check on all location alerts of this
task.
Note: This is an experimental feature and may be subjected to
modifications often.
"""
if (
alert.task_id is None
or len(alert.alert_parameters) < 1
or LocationAlertSuccessResponse not in alert.responses_available
or LocationAlertFailResponse not in alert.responses_available
):
return False

# Check type
for param in alert.alert_parameters:
if param.name == LocationAlertTypeParameterName:
if param.value == LocationAlertFinalCheckTypeParameterValue:
return True
return False
return False


class LRUCache:
def __init__(self, capacity: int):
self._cache = deque(maxlen=capacity)
self._lookup = {}

def add(self, key, value):
if key in self._lookup:
self._cache.remove(key)
elif len(self._cache) == self._cache.maxlen:
oldest_key = self._cache.popleft()
del self._lookup[oldest_key]

self._cache.append(key)
self._lookup[key] = value

def remove(self, key):
if key in self._lookup:
self._cache.remove(key)
del self._lookup[key]

def lookup(self, key):
if key in self._lookup:
self._cache.remove(key)
self._cache.append(key)
return self._lookup[key]
return None


task_id_to_all_locations_success_cache: LRUCache = LRUCache(20)


class AlertRepository:
async def create_new_alert(self, alert: AlertRequest) -> Optional[AlertRequest]:
exists = await ttm.AlertRequest.exists(id=alert.id)
if exists:
logging.error(f"Alert with ID {alert.id} already exists")
return None

await ttm.AlertRequest.create(
id=alert.id,
data=alert.json(),
response_expected=(len(alert.responses_available) > 0),
task_id=alert.task_id,
)
return alert

async def get_alert(self, alert_id: str) -> Optional[AlertRequest]:
alert = await ttm.AlertRequest.get_or_none(id=alert_id)
if alert is None:
logging.error(f"Alert with ID {alert_id} does not exists")
return None

alert_model = AlertRequest(**alert.data)
return alert_model

async def create_response(
self, alert_id: str, response: str
) -> Optional[AlertResponse]:
alert = await ttm.AlertRequest.get_or_none(id=alert_id)
if alert is None:
logging.error(f"Alert with ID {alert_id} does not exists")
return None

alert_model = AlertRequest(**alert.data)
if response not in alert_model.responses_available:
logging.error(
f"Alert with ID {alert_model.id} does not have allow response of {response}"
)
return None

alert_response_model = AlertResponse(
id=alert_id,
unix_millis_response_time=round(datetime.now().timestamp() * 1000),
response=response,
)
await ttm.AlertResponse.create(
id=alert_id, alert_request=alert, data=alert_response_model.json()
)
return alert_response_model

async def get_alert_response(self, alert_id: str) -> Optional[AlertResponse]:
response = await ttm.AlertResponse.get_or_none(id=alert_id)
if response is None:
logging.error(f"Response to alert with ID {alert_id} does not exists")
return None

response_model = AlertResponse(**response.data)
return response_model

async def get_alerts_of_task(
self, task_id: str, unresponded: bool = True
) -> List[AlertRequest]:
if unresponded:
task_id_alerts = await ttm.AlertRequest.filter(
response_expected=True,
task_id=task_id,
alert_response=None,
)
else:
task_id_alerts = await ttm.AlertRequest.filter(task_id=task_id)

alert_models = [AlertRequest(**alert.data) for alert in task_id_alerts]
return alert_models

async def get_unresponded_alerts(self) -> List[AlertRequest]:
unresponded_alerts = await ttm.AlertRequest.filter(
alert_response=None, response_expected=True
)
return [AlertRequest(**alert.data) for alert in unresponded_alerts]

async def create_location_alert_response(
self,
task_id: str,
location: str,
success: bool,
) -> Optional[AlertResponse]:
"""
Creates an alert response for a location alert of the task.
Note: This is an experimental feature and may be subjected to
modifications often.
"""
alerts = await self.get_alerts_of_task(task_id=task_id, unresponded=True)
if len(alerts) == 0:
logging.error(
f"There are no location alerts awaiting response for task {task_id}"
)
return None

for alert in alerts:
location_alert_location = get_location_from_location_alert(alert)
if location_alert_location is None:
continue

if location_alert_location == location:
response = (
LocationAlertSuccessResponse
if success
else LocationAlertFailResponse
)
alert_response_model = await self.create_response(alert.id, response)
if alert_response_model is None:
logging.error(
f"Failed to create response {response} to alert with ID {alert.id}"
)
return None

# Cache if all locations of this task has been successful so far
cache = task_id_to_all_locations_success_cache.lookup(task_id)
if cache is None:
task_id_to_all_locations_success_cache.add(task_id, success)
else:
task_id_to_all_locations_success_cache.add(
task_id, cache and success
)

return alert_response_model

logging.error(
f"Task {task_id} is not awaiting completion of location {location}"
)
return None

async def check_all_task_location_alerts_if_succeeded(self, task_id: str) -> bool:
"""
Checks if all location alert reponses for the task were successful.
Note: This is an experimental feature and may be subjected to
modifications often.
"""
task_id_alerts = await ttm.AlertRequest.filter(task_id=task_id)
if len(task_id_alerts) == 0:
logging.info(f"There were no location alerts for task {task_id}")
return False

for alert in task_id_alerts:
alert_model = AlertRequest(**alert.data)
location_alert_location = get_location_from_location_alert(alert_model)
if location_alert_location is None:
continue

if alert.alert_response is None:
logging.info(
f"Alert {alert_model.id} does not have a response, check return False"
)
return False

alert_response_model = AlertResponse(**alert.alert_response.data)
if alert_response_model.response != LocationAlertSuccessResponse:
logging.info(
f"Alert {alert_model.id} has a response {alert_response_model.response}, check return False"
)
return False

logging.info(f"All location alerts for task {task_id} succeeded")
return True
6 changes: 6 additions & 0 deletions packages/api-server/api_server/rmf_io/book_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
LiftState,
)
from api_server.models.health import BaseBasicHealth
from api_server.repositories import (
AlertRepository, # , is_final_location_alert_check, LocationAlertFailResponse, LocationAlertSuccessResponse
)

from .events import AlertEvents, RmfEvents

# from api_server.gateway import rmf_gateway


class RmfBookKeeperEvents:
def __init__(self):
Expand All @@ -38,6 +43,7 @@ def __init__(
):
self.rmf_events = rmf_events
self.alert_events = alert_events
self.alert_repository = AlertRepository()
self.bookkeeper_events = RmfBookKeeperEvents()
self._loop: asyncio.AbstractEventLoop
self._pending_tasks = set()
Expand Down
Loading

0 comments on commit b7c04a5

Please sign in to comment.