From 7031edbfdbc3f3155dc45a8d0cb43cf220e12941 Mon Sep 17 00:00:00 2001 From: Vanessa Hassouna Date: Tue, 21 Nov 2023 16:16:28 +0100 Subject: [PATCH] [failure_handling] rewrote failure handling, added test cases --- src/pycram/designator.py | 8 +++ src/pycram/failure_handling.py | 93 ++++++++++++++++++++++++++++++++++ test/test_failure_handling.py | 57 +++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 src/pycram/failure_handling.py create mode 100644 test/test_failure_handling.py diff --git a/src/pycram/designator.py b/src/pycram/designator.py index 20f1beb48..71d0601d9 100644 --- a/src/pycram/designator.py +++ b/src/pycram/designator.py @@ -540,6 +540,14 @@ def ground(self) -> Action: """Fill all missing parameters and chose plan to execute. """ raise NotImplementedError(f"{type(self)}.ground() is not implemented.") + def __iter__(self): + """ + Iterate through all possible actions fitting this description + + :yield: A resolved action designator + """ + yield self.ground() + class LocationDesignatorDescription(DesignatorDescription): """ diff --git a/src/pycram/failure_handling.py b/src/pycram/failure_handling.py new file mode 100644 index 000000000..5aab138c8 --- /dev/null +++ b/src/pycram/failure_handling.py @@ -0,0 +1,93 @@ +from .designator import DesignatorDescription +from .plan_failures import PlanFailure + +class FailureHandling: + """ + Base class for failure handling mechanisms in automated systems or workflows. + + This class provides a structure for implementing different strategies to handle + failures that may occur during the execution of a plan or process. It is designed + to be extended by subclasses that implement specific failure handling behaviors. + + Attributes: + designator_description (DesignatorDescription): An instance of DesignatorDescription. + + Methods: + perform(): Abstract method + """ + + def __init__(self, designator_description:DesignatorDescription): + """ + Initializes a new instance of the FailureHandling class. + + Args: + designator_description (DesignatorDescription): The description or context + of the task or process for which the failure handling is being set up. + """ + self.designator_description = designator_description + + def perform(self): + """ + Abstract method to perform the failure handling mechanism. + + This method should be overridden in subclasses to implement the specific + behavior for handling failures. + + Raises: + NotImplementedError: If the method is not implemented in a subclass. + """ + raise NotImplementedError() + +class Retry(FailureHandling): + """ + A subclass of FailureHandling that implements a retry mechanism. + + This class represents a specific failure handling strategy where the system + attempts to retry a failed action a certain number of times before giving up. + + Attributes: + max_tries (int): The maximum number of attempts to retry the action. + + Inherits: + All attributes and methods from the FailureHandling class. + + Overrides: + perform(): Implements the retry logic. + """ + + def __init__(self, designator_description:DesignatorDescription, max_tries:int=3): + """ + Initializes a new instance of the Retry class. + + Args: + designator_description (DesignatorDescription): The description or context + of the task or process for which the retry mechanism is being set up. + max_tries (int, optional): The maximum number of attempts to retry. Defaults to 3. + """ + super().__init__(designator_description) + self.max_tries = max_tries + + def perform(self): + """ + Implementation of the retry mechanism. + + This method attempts to perform the action specified in the designator_description. + If the action fails, it is retried up to max_tries times. If all attempts fail, + the last exception is raised. + + Raises: + PlanFailure: If all retry attempts fail. + """ + tries = 0 + for action in iter(self.designator_description): + tries += 1 + try: + action.perform() + break + except PlanFailure as e: + if tries >= self.max_tries: + raise e + + + + diff --git a/test/test_failure_handling.py b/test/test_failure_handling.py new file mode 100644 index 000000000..ab0a8491b --- /dev/null +++ b/test/test_failure_handling.py @@ -0,0 +1,57 @@ +import unittest + +import roslaunch + +from pycram.bullet_world import BulletWorld, Object +from pycram.designator import ActionDesignatorDescription +from pycram.designators.action_designator import ParkArmsAction +from pycram.enums import ObjectType, Arms +from pycram.failure_handling import Retry +from pycram.plan_failures import PlanFailure +from pycram.process_module import ProcessModule, simulated_robot +from pycram.robot_descriptions import robot_description + + +# start ik_and_description.launch +class DummyActionDesignator(ActionDesignatorDescription): + class Action(ActionDesignatorDescription.Action): + def perform(self): + raise PlanFailure("Dummy action failed") + + def __iter__(self): + for _ in range(100): + yield self.Action() + + +class FailureHandlingTest(unittest.TestCase): + world: BulletWorld + process: roslaunch.scriptapi.ROSLaunch + + @classmethod + def setUpClass(cls): + cls.world = BulletWorld("DIRECT") + cls.robot = Object(robot_description.name, ObjectType.ROBOT, robot_description.name + ".urdf") + ProcessModule.execution_delay = True + + def setUp(self): + self.world.reset_bullet_world() + + def test_retry_with_success(self): + with simulated_robot: + Retry(ParkArmsAction([Arms.LEFT]), max_tries=5).perform() + + def test_retry_with_failure(self): + with simulated_robot: + with self.assertRaises(PlanFailure): + Retry(DummyActionDesignator(), max_tries=5).perform() + + def tearDown(self): + self.world.reset_bullet_world() + + @classmethod + def tearDownClass(cls): + cls.world.exit() + + +if __name__ == '__main__': + unittest.main()