Skip to content

Commit

Permalink
feat: support hardware and controller states and predicates in functi…
Browse files Browse the repository at this point in the history
…ons (#157)
  • Loading branch information
eeberhard authored May 8, 2024
1 parent 1416d89 commit 72b46e9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
4 changes: 4 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Release Versions:
- [1.0.1](#101)
- [1.0.0](#100)

## Upcoming changes

- feat: support hardware and controller states and predicates in `wait_for` functions (#156)

## 2.0.0

This version of the AICA API client is compatible with the new AICA API server version 3.0 by using Socket.IO instead of
Expand Down
80 changes: 69 additions & 11 deletions python/src/aica_api/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Union, List
from warnings import deprecated

import os
import requests
Expand Down Expand Up @@ -266,38 +267,95 @@ def get_application(self):
endpoint = "application"
return requests.get(self._endpoint(endpoint))

def wait_for_component(self, component: str, state: str, timeout: Union[None, int, float] = None):
def wait_for_component(self, component: str, state: str, timeout: Union[None, int, float] = None) -> bool:
"""
Wait for a component to be in a particular state. Components can be in any of the following states:
['unloaded', 'loaded', 'unconfigured', 'inactive', 'active', 'finalized']
:param component: The name of the component
:param state: The state of the component
:param state: The state of the component to wait for
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the component is in the intended state
:return: True if the component is in the intended state before the timeout duration, False otherwise
"""
return read_until(lambda data: data[component]['state'] == state, url=self._address, namespace='/v2/components',
event='component_data', timeout=timeout)
event='component_data', timeout=timeout) is not None

def wait_for_predicate(self, component: str, predicate: str, timeout: Union[None, int, float] = None):
def wait_for_hardware(self, hardware: str, state: str, timeout: Union[None, int, float] = None) -> bool:
"""
Wait for a hardware interface to be in a particular state. Hardware can be in any of the following states:
['unloaded', 'loaded']
:param hardware: The name of the hardware interface
:param state: The state of the hardware to wait for
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: True if the hardware is in the intended state before the timeout duration, False otherwise
"""
return read_until(lambda data: data[hardware]['state'] == state, url=self._address, namespace='/v2/hardware',
event='hardware_data', timeout=timeout) is not None

def wait_for_controller(self, hardware: str, controller: str, state: str,
timeout: Union[None, int, float] = None) -> bool:
"""
Wait for a controller to be in a particular state. Controllers can be in any of the following states:
['unloaded', 'loaded', 'unconfigured', 'inactive', 'active', 'finalized']
:param hardware: The name of the hardware interface responsible for the controller
:param controller: The name of the controller
:param state: The state of the controller to wait for
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: True if the controller is in the intended state before the timeout duration, False otherwise
"""
return read_until(lambda data: data[hardware]['controllers'][controller]['state'] == state, url=self._address,
namespace='/v2/hardware', event='hardware_data', timeout=timeout) is not None

@deprecated("Use wait_for_component_predicate instead")
def wait_for_predicate(self, component: str, predicate: str, timeout: Union[None, int, float] = None) -> bool:
"""
Wait until a component predicate is true.
:param component: The name of the component
:param predicate: The name of the predicate
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: True if the predicate is true before the timeout duration, False otherwise
"""
return read_until(lambda data: data[component]['predicates'][predicate], url=self._address,
namespace='/v2/components', event='component_data', timeout=timeout) is not None

def wait_for_component_predicate(self, component: str, predicate: str,
timeout: Union[None, int, float] = None) -> bool:
"""
Wait until a component predicate is true.
:param component: The name of the component
:param predicate: The name of the predicate
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the predicate is true
:return: True if the predicate is true before the timeout duration, False otherwise
"""
return read_until(lambda data: data[component]['predicates'][predicate], url=self._address,
namespace='/v2/components', event='component_data', timeout=timeout) is not None

def wait_for_controller_predicate(self, hardware: str, controller: str, predicate: str,
timeout: Union[None, int, float] = None) -> bool:
"""
Wait until a controller predicate is true.
:param hardware: The name of the hardware interface responsible for the controller
:param controller: The name of the controller
:param predicate: The name of the predicate
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: True if the predicate is true before the timeout duration, False otherwise
"""
return read_until(lambda data: data[component]["predicates"][predicate], url=self._address,
namespace='/v2/components', event='component_data', timeout=timeout)
return read_until(lambda data: data[hardware]['controllers'][controller]['predicates'][predicate],
url=self._address, namespace='/v2/hardware', event='hardware_data',
timeout=timeout) is not None

def wait_for_condition(self, condition, timeout=None):
def wait_for_condition(self, condition, timeout=None) -> bool:
"""
Wait until a condition is true.
:param condition: The name of the condition
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the condition is true
:return: True if the condition is true before the timeout duration, False otherwise
"""
return read_until(lambda data: data[condition], url=self._address, namespace='/v2/conditions',
event='conditions', timeout=timeout)
event='conditions', timeout=timeout) is not None

0 comments on commit 72b46e9

Please sign in to comment.