diff --git a/MissionPlanning/BehaviorTree/behavior_tree.py b/MissionPlanning/BehaviorTree/behavior_tree.py new file mode 100644 index 0000000000..59f4c713f1 --- /dev/null +++ b/MissionPlanning/BehaviorTree/behavior_tree.py @@ -0,0 +1,690 @@ +""" +Behavior Tree + +author: Wang Zheng (@Aglargil) + +Ref: + +- [Behavior Tree](https://en.wikipedia.org/wiki/Behavior_tree_(artificial_intelligence,_robotics_and_control)) +""" + +import time +import xml.etree.ElementTree as ET +from enum import Enum + + +class Status(Enum): + SUCCESS = "success" + FAILURE = "failure" + RUNNING = "running" + + +class NodeType(Enum): + CONTROL_NODE = "ControlNode" + ACTION_NODE = "ActionNode" + DECORATOR_NODE = "DecoratorNode" + + +class Node: + """ + Base class for all nodes in a behavior tree. + """ + + def __init__(self, name): + self.name = name + self.status = None + + def tick(self) -> Status: + """ + Tick the node. + + Returns: + Status: The status of the node. + """ + raise ValueError("Node is not implemented") + + def tick_and_set_status(self) -> Status: + """ + Tick the node and set the status. + + Returns: + Status: The status of the node. + """ + self.status = self.tick() + return self.status + + def reset(self): + """ + Reset the node. + """ + self.status = None + + def reset_children(self): + """ + Reset the children of the node. + """ + pass + + +class ControlNode(Node): + """ + Base class for all control nodes in a behavior tree. + + Control nodes manage the execution flow of their child nodes according to specific rules. + They typically have multiple children and determine which children to execute and in what order. + """ + + def __init__(self, name): + super().__init__(name) + self.children = [] + self.type = NodeType.CONTROL_NODE + + def not_set_children_raise_error(self): + if len(self.children) == 0: + raise ValueError("Children are not set") + + def reset_children(self): + for child in self.children: + child.reset() + + +class SequenceNode(ControlNode): + """ + Executes child nodes in sequence until one fails or all succeed. + + Returns: + - Returns FAILURE if any child returns FAILURE + - Returns SUCCESS when all children have succeeded + - Returns RUNNING when a child is still running or when moving to the next child + + Example: + .. code-block:: xml + + + + + + """ + + def __init__(self, name): + super().__init__(name) + self.current_child_index = 0 + + def tick(self) -> Status: + self.not_set_children_raise_error() + + if self.current_child_index >= len(self.children): + self.reset_children() + return Status.SUCCESS + status = self.children[self.current_child_index].tick_and_set_status() + if status == Status.FAILURE: + self.reset_children() + return Status.FAILURE + elif status == Status.SUCCESS: + self.current_child_index += 1 + return Status.RUNNING + elif status == Status.RUNNING: + return Status.RUNNING + else: + raise ValueError("Unknown status") + + +class SelectorNode(ControlNode): + """ + Executes child nodes in sequence until one succeeds or all fail. + + Returns: + - Returns SUCCESS if any child returns SUCCESS + - Returns FAILURE when all children have failed + - Returns RUNNING when a child is still running or when moving to the next child + + Examples: + .. code-block:: xml + + + + + + """ + + def __init__(self, name): + super().__init__(name) + self.current_child_index = 0 + + def tick(self) -> Status: + self.not_set_children_raise_error() + + if self.current_child_index >= len(self.children): + self.reset_children() + return Status.FAILURE + status = self.children[self.current_child_index].tick_and_set_status() + if status == Status.SUCCESS: + self.reset_children() + return Status.SUCCESS + elif status == Status.FAILURE: + self.current_child_index += 1 + return Status.RUNNING + elif status == Status.RUNNING: + return Status.RUNNING + else: + raise ValueError("Unknown status") + + +class WhileDoElseNode(ControlNode): + """ + Conditional execution node with three parts: condition, do, and optional else. + + Returns: + First executes the condition node (child[0]) + If condition succeeds, executes do node (child[1]) and returns RUNNING + If condition fails, executes else node (child[2]) if present and returns result of else node + If condition fails and there is no else node, returns SUCCESS + + Example: + .. code-block:: xml + + + + + + + """ + + def __init__(self, name): + super().__init__(name) + + def tick(self) -> Status: + if len(self.children) != 3 and len(self.children) != 2: + raise ValueError("WhileDoElseNode must have exactly 3 or 2 children") + + condition_node = self.children[0] + do_node = self.children[1] + else_node = self.children[2] if len(self.children) == 3 else None + + condition_status = condition_node.tick_and_set_status() + if condition_status == Status.SUCCESS: + do_node.tick_and_set_status() + return Status.RUNNING + elif condition_status == Status.FAILURE: + if else_node is not None: + else_status = else_node.tick_and_set_status() + if else_status == Status.SUCCESS: + self.reset_children() + return Status.SUCCESS + elif else_status == Status.FAILURE: + self.reset_children() + return Status.FAILURE + elif else_status == Status.RUNNING: + return Status.RUNNING + else: + raise ValueError("Unknown status") + else: + self.reset_children() + return Status.SUCCESS + else: + raise ValueError("Unknown status") + + +class ActionNode(Node): + """ + Base class for all action nodes in a behavior tree. + + Action nodes are responsible for performing specific tasks or actions. + They do not have children and are typically used to execute logic or operations. + """ + + def __init__(self, name): + super().__init__(name) + self.type = NodeType.ACTION_NODE + + +class SleepNode(ActionNode): + """ + Sleep node that sleeps for a specified duration. + + Returns: + Returns SUCCESS after the specified duration has passed + Returns RUNNING if the duration has not yet passed + + Example: + .. code-block:: xml + + + """ + + def __init__(self, name, duration): + super().__init__(name) + self.duration = duration + self.start_time = None + + def tick(self) -> Status: + if self.start_time is None: + self.start_time = time.time() + if time.time() - self.start_time > self.duration: + return Status.SUCCESS + return Status.RUNNING + + +class EchoNode(ActionNode): + """ + Echo node that prints a message to the console. + + Returns: + Returns SUCCESS after the message has been printed + + Example: + .. code-block:: xml + + + """ + + def __init__(self, name, message): + super().__init__(name) + self.message = message + + def tick(self) -> Status: + print(self.name, self.message) + return Status.SUCCESS + + +class DecoratorNode(Node): + """ + Base class for all decorator nodes in a behavior tree. + + Decorator nodes modify the behavior of their child node. + They must have a single child and can alter the status of the child node. + """ + + def __init__(self, name): + super().__init__(name) + self.type = NodeType.DECORATOR_NODE + self.child = None + + def not_set_child_raise_error(self): + if self.child is None: + raise ValueError("Child is not set") + + def reset_children(self): + self.child.reset() + + +class InverterNode(DecoratorNode): + """ + Inverter node that inverts the status of its child node. + + Returns: + - Returns SUCCESS if the child returns FAILURE + - Returns FAILURE if the child returns SUCCESS + - Returns RUNNING if the child returns RUNNING + + Examples: + .. code-block:: xml + + + + + """ + + def __init__(self, name): + super().__init__(name) + + def tick(self) -> Status: + self.not_set_child_raise_error() + status = self.child.tick_and_set_status() + return Status.SUCCESS if status == Status.FAILURE else Status.FAILURE + + +class TimeoutNode(DecoratorNode): + """ + Timeout node that fails if the child node takes too long to execute + + Returns: + - FAILURE: If the timeout duration has been exceeded + - Child's status: Otherwise, passes through the status of the child node + + Example: + .. code-block:: xml + + + + + """ + + def __init__(self, name, timeout): + super().__init__(name) + self.timeout = timeout + self.start_time = None + + def tick(self) -> Status: + self.not_set_child_raise_error() + if self.start_time is None: + self.start_time = time.time() + if time.time() - self.start_time > self.timeout: + return Status.FAILURE + print(f"{self.name} is running") + return self.child.tick_and_set_status() + + +class DelayNode(DecoratorNode): + """ + Delay node that delays the execution of its child node for a specified duration. + + Returns: + - Returns RUNNING if the duration has not yet passed + - Returns child's status after the duration has passed + + Example: + .. code-block:: xml + + + + + """ + + def __init__(self, name, delay): + super().__init__(name) + self.delay = delay + self.start_time = None + + def tick(self) -> Status: + self.not_set_child_raise_error() + if self.start_time is None: + self.start_time = time.time() + if time.time() - self.start_time > self.delay: + return self.child.tick_and_set_status() + return Status.RUNNING + + +class ForceSuccessNode(DecoratorNode): + """ + ForceSuccess node that always returns SUCCESS. + + Returns: + - Returns RUNNING if the child returns RUNNING + - Returns SUCCESS if the child returns SUCCESS or FAILURE + """ + + def __init__(self, name): + super().__init__(name) + + def tick(self) -> Status: + self.not_set_child_raise_error() + status = self.child.tick_and_set_status() + if status == Status.FAILURE: + return Status.SUCCESS + return status + + +class ForceFailureNode(DecoratorNode): + """ + ForceFailure node that always returns FAILURE. + + Returns: + - Returns RUNNING if the child returns RUNNING + - Returns FAILURE if the child returns SUCCESS or FAILURE + """ + + def __init__(self, name): + super().__init__(name) + + def tick(self) -> Status: + self.not_set_child_raise_error() + status = self.child.tick_and_set_status() + if status == Status.SUCCESS: + return Status.FAILURE + return status + + +class BehaviorTree: + """ + Behavior tree class that manages the execution of a behavior tree. + """ + + def __init__(self, root): + self.root = root + + def tick(self): + """ + Tick once on the behavior tree. + """ + self.root.tick_and_set_status() + + def reset(self): + """ + Reset the behavior tree. + """ + self.root.reset() + + def tick_while_running(self, interval=None, enable_print=True): + """ + Tick the behavior tree while it is running. + + Args: + interval (float, optional): The interval between ticks. Defaults to None. + enable_print (bool, optional): Whether to print the behavior tree. Defaults to True. + """ + while self.root.tick_and_set_status() == Status.RUNNING: + if enable_print: + self.print_tree() + if interval is not None: + time.sleep(interval) + if enable_print: + self.print_tree() + + def to_text(self, root, indent=0): + """ + Recursively convert the behavior tree to a text representation. + """ + current_text = "" + if root.status == Status.RUNNING: + # yellow + current_text = "\033[93m" + root.name + "\033[0m" + elif root.status == Status.SUCCESS: + # green + current_text = "\033[92m" + root.name + "\033[0m" + elif root.status == Status.FAILURE: + # red + current_text = "\033[91m" + root.name + "\033[0m" + else: + current_text = root.name + if root.type == NodeType.CONTROL_NODE: + current_text = " " * indent + "[" + current_text + "]\n" + for child in root.children: + current_text += self.to_text(child, indent + 2) + elif root.type == NodeType.DECORATOR_NODE: + current_text = " " * indent + "(" + current_text + ")\n" + current_text += self.to_text(root.child, indent + 2) + elif root.type == NodeType.ACTION_NODE: + current_text = " " * indent + "<" + current_text + ">\n" + return current_text + + def print_tree(self): + """ + Print the behavior tree. + + Node print format: + Action: + Decorator: (Decorator) + Control: [Control] + + Node status colors: + Yellow: RUNNING + Green: SUCCESS + Red: FAILURE + """ + text = self.to_text(self.root) + text = text.strip() + print("\033[94m" + "Behavior Tree" + "\033[0m") + print(text) + print("\033[94m" + "Behavior Tree" + "\033[0m") + + +class BehaviorTreeFactory: + """ + Factory class for creating behavior trees from XML strings. + """ + + def __init__(self): + self.node_builders = {} + # Control nodes + self.register_node_builder( + "Sequence", + lambda node: SequenceNode(node.attrib.get("name", SequenceNode.__name__)), + ) + self.register_node_builder( + "Selector", + lambda node: SelectorNode(node.attrib.get("name", SelectorNode.__name__)), + ) + self.register_node_builder( + "WhileDoElse", + lambda node: WhileDoElseNode( + node.attrib.get("name", WhileDoElseNode.__name__) + ), + ) + # Decorator nodes + self.register_node_builder( + "Inverter", + lambda node: InverterNode(node.attrib.get("name", InverterNode.__name__)), + ) + self.register_node_builder( + "Timeout", + lambda node: TimeoutNode( + node.attrib.get("name", SelectorNode.__name__), + float(node.attrib["sec"]), + ), + ) + self.register_node_builder( + "Delay", + lambda node: DelayNode( + node.attrib.get("name", DelayNode.__name__), + float(node.attrib["sec"]), + ), + ) + self.register_node_builder( + "ForceSuccess", + lambda node: ForceSuccessNode( + node.attrib.get("name", ForceSuccessNode.__name__) + ), + ) + self.register_node_builder( + "ForceFailure", + lambda node: ForceFailureNode( + node.attrib.get("name", ForceFailureNode.__name__) + ), + ) + # Action nodes + self.register_node_builder( + "Sleep", + lambda node: SleepNode( + node.attrib.get("name", SleepNode.__name__), + float(node.attrib["sec"]), + ), + ) + self.register_node_builder( + "Echo", + lambda node: EchoNode( + node.attrib.get("name", EchoNode.__name__), + node.attrib["message"], + ), + ) + + def register_node_builder(self, node_name, builder): + """ + Register a builder for a node + + Args: + node_name (str): The name of the node. + builder (function): The builder function. + + Example: + .. code-block:: python + + factory = BehaviorTreeFactory() + factory.register_node_builder( + "MyNode", + lambda node: MyNode( + node.attrib.get("name", MyNode.__name__), + node.attrib["my_param"], + ), + ) + """ + self.node_builders[node_name] = builder + + def build_node(self, node): + """ + Build a node from an XML element. + + Args: + node (Element): The XML element to build the node from. + + Returns: + BehaviorTree Node: the built node + """ + if node.tag in self.node_builders: + root = self.node_builders[node.tag](node) + if root.type == NodeType.CONTROL_NODE: + if len(node) <= 0: + raise ValueError(f"{root.name} Control node must have children") + for child in node: + root.children.append(self.build_node(child)) + elif root.type == NodeType.DECORATOR_NODE: + if len(node) != 1: + raise ValueError( + f"{root.name} Decorator node must have exactly one child" + ) + root.child = self.build_node(node[0]) + elif root.type == NodeType.ACTION_NODE: + if len(node) != 0: + raise ValueError(f"{root.name} Action node must have no children") + return root + else: + raise ValueError(f"Unknown node type: {node.tag}") + + def build_tree(self, xml_string): + """ + Build a behavior tree from an XML string. + + Args: + xml_string (str): The XML string containing the behavior tree. + + Returns: + BehaviorTree: The behavior tree. + """ + xml_tree = ET.fromstring(xml_string) + root = self.build_node(xml_tree) + return BehaviorTree(root) + + def build_tree_from_file(self, file_path): + """ + Build a behavior tree from a file. + + Args: + file_path (str): The path to the file containing the behavior tree. + + Returns: + BehaviorTree: The behavior tree. + """ + with open(file_path) as file: + xml_string = file.read() + return self.build_tree(xml_string) + + +xml_string = """ + + + + + + + + """ + + +def main(): + factory = BehaviorTreeFactory() + tree = factory.build_tree(xml_string) + tree.tick_while_running() + + +if __name__ == "__main__": + main() diff --git a/MissionPlanning/BehaviorTree/robot_behavior_case.py b/MissionPlanning/BehaviorTree/robot_behavior_case.py new file mode 100644 index 0000000000..6c39aa76b2 --- /dev/null +++ b/MissionPlanning/BehaviorTree/robot_behavior_case.py @@ -0,0 +1,247 @@ +""" +Robot Behavior Tree Case + +This file demonstrates how to use a behavior tree to control robot behavior. +""" + +from behavior_tree import ( + BehaviorTreeFactory, + Status, + ActionNode, +) +import time +import random +import os + + +class CheckBatteryNode(ActionNode): + """ + Node to check robot battery level + + If battery level is below threshold, returns FAILURE, otherwise returns SUCCESS + """ + + def __init__(self, name, threshold=20): + super().__init__(name) + self.threshold = threshold + self.battery_level = 100 # Initial battery level is 100% + + def tick(self): + # Simulate battery level decreasing + self.battery_level -= random.randint(1, 5) + print(f"Current battery level: {self.battery_level}%") + + if self.battery_level <= self.threshold: + return Status.FAILURE + return Status.SUCCESS + + +class ChargeBatteryNode(ActionNode): + """ + Node to charge the robot's battery + """ + + def __init__(self, name, charge_rate=10): + super().__init__(name) + self.charge_rate = charge_rate + self.charging_time = 0 + + def tick(self): + # Simulate charging process + if self.charging_time == 0: + print("Starting to charge...") + + self.charging_time += 1 + charge_amount = self.charge_rate * self.charging_time + + if charge_amount >= 100: + print("Charging complete! Battery level: 100%") + self.charging_time = 0 + return Status.SUCCESS + else: + print(f"Charging in progress... Battery level: {min(charge_amount, 100)}%") + return Status.RUNNING + + +class MoveToPositionNode(ActionNode): + """ + Node to move to a specified position + """ + + def __init__(self, name, position, move_duration=2): + super().__init__(name) + self.position = position + self.move_duration = move_duration + self.start_time = None + + def tick(self): + if self.start_time is None: + self.start_time = time.time() + print(f"Starting movement to position {self.position}") + + elapsed_time = time.time() - self.start_time + + if elapsed_time >= self.move_duration: + print(f"Arrived at position {self.position}") + self.start_time = None + return Status.SUCCESS + else: + print( + f"Moving to position {self.position}... {int(elapsed_time / self.move_duration * 100)}% complete" + ) + return Status.RUNNING + + +class DetectObstacleNode(ActionNode): + """ + Node to detect obstacles + """ + + def __init__(self, name, obstacle_probability=0.3): + super().__init__(name) + self.obstacle_probability = obstacle_probability + + def tick(self): + # Use random probability to simulate obstacle detection + if random.random() < self.obstacle_probability: + print("Obstacle detected!") + return Status.SUCCESS + else: + print("No obstacle detected") + return Status.FAILURE + + +class AvoidObstacleNode(ActionNode): + """ + Node to avoid obstacles + """ + + def __init__(self, name, avoid_duration=1.5): + super().__init__(name) + self.avoid_duration = avoid_duration + self.start_time = None + + def tick(self): + if self.start_time is None: + self.start_time = time.time() + print("Starting obstacle avoidance...") + + elapsed_time = time.time() - self.start_time + + if elapsed_time >= self.avoid_duration: + print("Obstacle avoidance complete") + self.start_time = None + return Status.SUCCESS + else: + print("Avoiding obstacle...") + return Status.RUNNING + + +class PerformTaskNode(ActionNode): + """ + Node to perform a specific task + """ + + def __init__(self, name, task_name, task_duration=3): + super().__init__(name) + self.task_name = task_name + self.task_duration = task_duration + self.start_time = None + + def tick(self): + if self.start_time is None: + self.start_time = time.time() + print(f"Starting task: {self.task_name}") + + elapsed_time = time.time() - self.start_time + + if elapsed_time >= self.task_duration: + print(f"Task complete: {self.task_name}") + self.start_time = None + return Status.SUCCESS + else: + print( + f"Performing task: {self.task_name}... {int(elapsed_time / self.task_duration * 100)}% complete" + ) + return Status.RUNNING + + +def create_robot_behavior_tree(): + """ + Create robot behavior tree + """ + + factory = BehaviorTreeFactory() + + # Register custom nodes + factory.register_node_builder( + "CheckBattery", + lambda node: CheckBatteryNode( + node.attrib.get("name", "CheckBattery"), + int(node.attrib.get("threshold", "20")), + ), + ) + + factory.register_node_builder( + "ChargeBattery", + lambda node: ChargeBatteryNode( + node.attrib.get("name", "ChargeBattery"), + int(node.attrib.get("charge_rate", "10")), + ), + ) + + factory.register_node_builder( + "MoveToPosition", + lambda node: MoveToPositionNode( + node.attrib.get("name", "MoveToPosition"), + node.attrib.get("position", "Unknown Position"), + float(node.attrib.get("move_duration", "2")), + ), + ) + + factory.register_node_builder( + "DetectObstacle", + lambda node: DetectObstacleNode( + node.attrib.get("name", "DetectObstacle"), + float(node.attrib.get("obstacle_probability", "0.3")), + ), + ) + + factory.register_node_builder( + "AvoidObstacle", + lambda node: AvoidObstacleNode( + node.attrib.get("name", "AvoidObstacle"), + float(node.attrib.get("avoid_duration", "1.5")), + ), + ) + + factory.register_node_builder( + "PerformTask", + lambda node: PerformTaskNode( + node.attrib.get("name", "PerformTask"), + node.attrib.get("task_name", "Unknown Task"), + float(node.attrib.get("task_duration", "3")), + ), + ) + # Read XML from file + xml_path = os.path.join(os.path.dirname(__file__), "robot_behavior_tree.xml") + return factory.build_tree_from_file(xml_path) + + +def main(): + """ + Main function: Create and run the robot behavior tree + """ + print("Creating robot behavior tree...") + tree = create_robot_behavior_tree() + + print("\nStarting robot behavior tree execution...\n") + # Run for a period of time or until completion + + tree.tick_while_running(interval=0.01) + + print("\nBehavior tree execution complete!") + + +if __name__ == "__main__": + main() diff --git a/MissionPlanning/BehaviorTree/robot_behavior_tree.xml b/MissionPlanning/BehaviorTree/robot_behavior_tree.xml new file mode 100644 index 0000000000..0bca76a3ff --- /dev/null +++ b/MissionPlanning/BehaviorTree/robot_behavior_tree.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst b/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst new file mode 100644 index 0000000000..ae3e16da81 --- /dev/null +++ b/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst @@ -0,0 +1,104 @@ +Behavior Tree +------------- + +Behavior Tree is a modular, hierarchical decision model that is widely used in robot control, and game development. +It present some similarities to hierarchical state machines with the key difference that the main building block of a behavior is a task rather than a state. +Behavior Tree have been shown to generalize several other control architectures (https://ieeexplore.ieee.org/document/7790863) + +Core Concepts +~~~~~~~~~~~~~ + +Control Node +++++++++++++ + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.ControlNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.SequenceNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.SelectorNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.WhileDoElseNode + +Action Node +++++++++++++ + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.ActionNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.EchoNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.SleepNode + +Decorator Node +++++++++++++++ + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.DecoratorNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.InverterNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.TimeoutNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.DelayNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.ForceSuccessNode + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.ForceFailureNode + +Behavior Tree Factory ++++++++++++++++++++++ + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.BehaviorTreeFactory + :members: + +Behavior Tree ++++++++++++++ + +.. autoclass:: MissionPlanning.BehaviorTree.behavior_tree.BehaviorTree + :members: + +Example +~~~~~~~ + +Visualize the behavior tree by `xml-tree-visual `_. + +.. image:: ./robot_behavior_case.svg + +Print the behavior tree + +.. code-block:: text + + Behavior Tree + [Robot Main Controller] + [Battery Management] + (Low Battery Detection) + + + + [Patrol Task] + + [Move to Position A] + + [Obstacle Handling A] + [Obstacle Present] + + + + + [Move to Position B] + (Short Wait) + + + (Limited Time Obstacle Handling) + [Obstacle Present] + + + + [Conditional Move to C] + + [Perform Position C Task] + + (Ensure Completion) + + + + + Behavior Tree diff --git a/docs/modules/13_mission_planning/behavior_tree/robot_behavior_case.svg b/docs/modules/13_mission_planning/behavior_tree/robot_behavior_case.svg new file mode 100644 index 0000000000..a3d43aed52 --- /dev/null +++ b/docs/modules/13_mission_planning/behavior_tree/robot_behavior_case.svg @@ -0,0 +1,22 @@ +Selectorname: Robot Main ControllerSequencename: Battery ManagementSequencename: Patrol TaskInvertername: Low Battery DetectionEchoname: Low Battery Warningmessage: Battery level low!Charging neededChargeBatteryname: Charge Batterycharge_rate: 20Echoname: Start Taskmessage: Starting patrol taskSequencename: Move to Position ASequencename: Move to Position BWhileDoElsename: Conditional Move to CEchoname: Complete Patrolmessage: Patrol taskcompleted, returning tocharging stationMoveToPositionname: Return to ChargingStationposition: Charging Stationmove_duration: 4CheckBatteryname: Check Batterythreshold: 30MoveToPositionname: Move to Aposition: Amove_duration: 2Selectorname: Obstacle Handling APerformTaskname: Position A Tasktask_name: Check Device Statustask_duration: 2Delayname: Short Waitsec: 1MoveToPositionname: Move to Bposition: Bmove_duration: 3Timeoutname: Limited Time ObstacleHandlingsec: 2PerformTaskname: Position B Tasktask_name: Data Collectiontask_duration: 2.5CheckBatteryname: Check Sufficient Batterythreshold: 50Sequencename: Perform Position C TaskEchoname: Skip Position Cmessage: Insufficient power,skipping position C taskSequencename: Obstacle PresentEchoname: No Obstaclemessage: Path clearEchoname: Prepare Movementmessage: Preparing to move tonext positionSequencename: Obstacle PresentMoveToPositionname: Move to Cposition: Cmove_duration: 2.5ForceSuccessname: Ensure CompletionDetectObstaclename: Detect Obstacleobstacle_probability: 0.3AvoidObstaclename: Avoid Obstacleavoid_duration: 1.5DetectObstaclename: Detect Obstacleobstacle_probability: 0.4AvoidObstaclename: Avoid Obstacleavoid_duration: 1.8PerformTaskname: Position C Tasktask_name: EnvironmentMonitoringtask_duration: 2 \ No newline at end of file diff --git a/docs/modules/13_mission_planning/mission_planning_main.rst b/docs/modules/13_mission_planning/mission_planning_main.rst index 385e62f68e..c35eacd8d5 100644 --- a/docs/modules/13_mission_planning/mission_planning_main.rst +++ b/docs/modules/13_mission_planning/mission_planning_main.rst @@ -10,3 +10,4 @@ Mission planning includes tools such as finite state machines and behavior trees :caption: Contents state_machine/state_machine + behavior_tree/behavior_tree diff --git a/tests/test_behavior_tree.py b/tests/test_behavior_tree.py new file mode 100644 index 0000000000..0898690448 --- /dev/null +++ b/tests/test_behavior_tree.py @@ -0,0 +1,207 @@ +import pytest +import conftest + +from MissionPlanning.BehaviorTree.behavior_tree import ( + BehaviorTreeFactory, + Status, + ActionNode, +) + + +def test_sequence_node1(): + xml_string = """ + + + + + + + + """ + bt_factory = BehaviorTreeFactory() + bt = bt_factory.build_tree(xml_string) + bt.tick() + assert bt.root.status == Status.RUNNING + assert bt.root.children[0].status == Status.SUCCESS + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + bt.tick() + bt.tick() + assert bt.root.status == Status.FAILURE + assert bt.root.children[0].status is None + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + + +def test_sequence_node2(): + xml_string = """ + + + + + + + + """ + bt_factory = BehaviorTreeFactory() + bt = bt_factory.build_tree(xml_string) + bt.tick_while_running() + assert bt.root.status == Status.SUCCESS + assert bt.root.children[0].status is None + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + + +def test_selector_node1(): + xml_string = """ + + + + + + + + """ + bt_factory = BehaviorTreeFactory() + bt = bt_factory.build_tree(xml_string) + bt.tick() + assert bt.root.status == Status.RUNNING + assert bt.root.children[0].status == Status.FAILURE + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + bt.tick() + assert bt.root.status == Status.SUCCESS + assert bt.root.children[0].status is None + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + + +def test_selector_node2(): + xml_string = """ + + + + + + + + + """ + bt_factory = BehaviorTreeFactory() + bt = bt_factory.build_tree(xml_string) + bt.tick_while_running() + assert bt.root.status == Status.FAILURE + assert bt.root.children[0].status is None + assert bt.root.children[1].status is None + + +def test_while_do_else_node(): + xml_string = """ + + + + + + """ + + class CountNode(ActionNode): + def __init__(self, name, count_threshold): + super().__init__(name) + self.count = 0 + self.count_threshold = count_threshold + + def tick(self): + self.count += 1 + if self.count >= self.count_threshold: + return Status.FAILURE + else: + return Status.SUCCESS + + bt_factory = BehaviorTreeFactory() + bt_factory.register_node_builder( + "Count", + lambda node: CountNode( + node.attrib.get("name", CountNode.__name__), + int(node.attrib["count_threshold"]), + ), + ) + bt = bt_factory.build_tree(xml_string) + bt.tick() + assert bt.root.status == Status.RUNNING + assert bt.root.children[0].status == Status.SUCCESS + assert bt.root.children[1].status is Status.SUCCESS + assert bt.root.children[2].status is None + bt.tick() + assert bt.root.status == Status.RUNNING + assert bt.root.children[0].status == Status.SUCCESS + assert bt.root.children[1].status is Status.SUCCESS + assert bt.root.children[2].status is None + bt.tick() + assert bt.root.status == Status.SUCCESS + assert bt.root.children[0].status is None + assert bt.root.children[1].status is None + assert bt.root.children[2].status is None + + +def test_node_children(): + # ControlNode Must have children + xml_string = """ + + + """ + bt_factory = BehaviorTreeFactory() + with pytest.raises(ValueError): + bt_factory.build_tree(xml_string) + + # DecoratorNode Must have child + xml_string = """ + + + """ + with pytest.raises(ValueError): + bt_factory.build_tree(xml_string) + + # DecoratorNode Must have only one child + xml_string = """ + + + + + """ + with pytest.raises(ValueError): + bt_factory.build_tree(xml_string) + + # ActionNode Must have no children + xml_string = """ + + + + """ + with pytest.raises(ValueError): + bt_factory.build_tree(xml_string) + + # WhileDoElse Must have exactly 2 or 3 children + xml_string = """ + + + + """ + with pytest.raises(ValueError): + bt = bt_factory.build_tree(xml_string) + bt.tick() + + xml_string = """ + + + + + + + """ + with pytest.raises(ValueError): + bt = bt_factory.build_tree(xml_string) + bt.tick() + + +if __name__ == "__main__": + conftest.run_this_test(__file__)