diff --git a/examples/improving_actions.md b/examples/improving_actions.md index e3e4cecf0..cdd5eabfc 100644 --- a/examples/improving_actions.md +++ b/examples/improving_actions.md @@ -5,7 +5,7 @@ jupyter: extension: .md format_name: markdown format_version: '1.3' - jupytext_version: 1.16.2 + jupytext_version: 1.16.4 kernelspec: display_name: Python 3 language: python @@ -33,6 +33,7 @@ import pandas as pd import sqlalchemy.orm import plotly +from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit plotly.offline.init_notebook_mode() import plotly.graph_objects as go @@ -90,7 +91,7 @@ fpa = MoveAndPickUp(milk_description, arms=[Arms.LEFT, Arms.RIGHT], grasps=[Grasp.FRONT.value, Grasp.LEFT.value, Grasp.RIGHT.value, Grasp.TOP.value]) print(world.current_world) p_xy = fpa.policy.marginal([fpa.variables.relative_x, fpa.variables.relative_y]) -fig = go.Figure(p_xy.root.plot(), p_xy.root.plotly_layout()) +fig = go.Figure(p_xy.plot(), p_xy.plotly_layout()) fig.update_layout(title="Marginal View of relative x and y position of the robot with respect to the object.") fig.show() ``` @@ -128,7 +129,7 @@ variables = infer_variables_from_dataframe(samples, scale_continuous_types=False min_likelihood_improvement = 0.) model = JPT(variables, min_samples_leaf=25) model.fit(samples) -model = model.probabilistic_circuit +model = ProbabilisticCircuit.from_other(model) print(model) ``` diff --git a/requirements.txt b/requirements.txt index d4a612b1d..8cabe8391 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,6 @@ gTTS~=2.5.3 dm_control trimesh deprecated -probabilistic_model>=5.1.0 -random_events>=3.0.7 +probabilistic_model>=6.0.2 +random_events>=3.1.2 sympy diff --git a/src/pycram/costmaps.py b/src/pycram/costmaps.py index 69abb8f34..77d1c0586 100644 --- a/src/pycram/costmaps.py +++ b/src/pycram/costmaps.py @@ -10,27 +10,23 @@ import tf from matplotlib import colors from nav_msgs.msg import OccupancyGrid, MapMetaData -from probabilistic_model.probabilistic_circuit.nx.distributions import UniformDistribution -from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit, ProductUnit +from probabilistic_model.probabilistic_circuit.nx.helper import uniform_measure_of_event +from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit from random_events.interval import Interval, reals, closed_open, closed from random_events.product_algebra import Event, SimpleEvent from random_events.variable import Continuous +from tf.transformations import quaternion_from_matrix from typing_extensions import Tuple, List, Optional, Iterator -from .ros.ros_tools import wait_for_message -from .datastructures.dataclasses import AxisAlignedBoundingBox -from .datastructures.pose import Pose +from .datastructures.dataclasses import AxisAlignedBoundingBox, BoxVisualShape, Color +from .datastructures.pose import Pose, Transform from .datastructures.world import UseProspectionWorld from .datastructures.world import World from .description import Link from .local_transformer import LocalTransformer +from .ros.ros_tools import wait_for_message from .world_concepts.world_object import Object -from .datastructures.pose import Pose, Transform -from .datastructures.world import World -from .datastructures.dataclasses import AxisAlignedBoundingBox, BoxVisualShape, Color -from tf.transformations import quaternion_from_matrix - @dataclass class Rectangle: @@ -827,7 +823,7 @@ def check_valid_area_exists(self): assert self.valid_area is not None, ("The map has to be created before semantics can be applied. " "Call 'generate_map first'") - def left(self, margin = 0.) -> Event: + def left(self, margin=0.) -> Event: """ Create an event left of the origins Y-Coordinate. :param margin: The margin of the events left bound. @@ -841,7 +837,7 @@ def left(self, margin = 0.) -> Event: {self.x: reals(), self.y: random_events.interval.open(left, y_origin)}).as_composite_set() return event - def right(self, margin = 0.) -> Event: + def right(self, margin=0.) -> Event: """ Create an event right of the origins Y-Coordinate. :param margin: The margin of the events right bound. @@ -854,7 +850,7 @@ def right(self, margin = 0.) -> Event: event = SimpleEvent({self.x: reals(), self.y: closed_open(y_origin, right)}).as_composite_set() return event - def top(self, margin = 0.) -> Event: + def top(self, margin=0.) -> Event: """ Create an event above the origins X-Coordinate. :param margin: The margin of the events upper bound. @@ -868,7 +864,7 @@ def top(self, margin = 0.) -> Event: {self.x: random_events.interval.closed_open(x_origin, top), self.y: reals()}).as_composite_set() return event - def bottom(self, margin = 0.) -> Event: + def bottom(self, margin=0.) -> Event: """ Create an event below the origins X-Coordinate. :param margin: The margin of the events lower bound. @@ -915,14 +911,8 @@ def generate_map(self) -> None: self.original_valid_area = self.valid_area.simple_sets[0] def as_distribution(self) -> ProbabilisticCircuit: - p_xy = ProductUnit() - u_x = UniformDistribution(self.x, self.original_valid_area[self.x].simple_sets[0]) - u_y = UniformDistribution(self.y, self.original_valid_area[self.y].simple_sets[0]) - p_xy.add_subcircuit(u_x) - p_xy.add_subcircuit(u_y) - - conditional, _ = p_xy.conditional(self.valid_area) - return conditional.probabilistic_circuit + model = uniform_measure_of_event(self.valid_area) + return model def sample_to_pose(self, sample: np.ndarray) -> Pose: """ diff --git a/src/pycram/designators/action_designator.py b/src/pycram/designators/action_designator.py index 2c77c29fe..23aad0f5d 100644 --- a/src/pycram/designators/action_designator.py +++ b/src/pycram/designators/action_designator.py @@ -1073,7 +1073,41 @@ class MoveAndPickUpPerformable(ActionAbstract): The grasp to use """ + @with_tree def perform(self): NavigateActionPerformable(self.standing_position).perform() FaceAtPerformable(self.object_designator.pose).perform() PickUpActionPerformable(self.object_designator, self.arm, self.grasp).perform() + + +@dataclass +class MoveAndPlacePerformable(ActionAbstract): + """ + Navigate to `standing_position`, then turn towards the object and pick it up. + """ + + standing_position: Pose + """ + The pose to stand before trying to pick up the object + """ + + object_designator: ObjectDesignatorDescription.Object + """ + The object to pick up + """ + + target_location: Pose + """ + The location to place the object. + """ + + arm: Arms + """ + The arm to use + """ + + @with_tree + def perform(self): + NavigateActionPerformable(self.standing_position).perform() + FaceAtPerformable(self.target_location).perform() + PlaceActionPerformable(self.object_designator, self.arm, self.target_location).perform() diff --git a/src/pycram/designators/specialized_designators/probabilistic/probabilistic_action.py b/src/pycram/designators/specialized_designators/probabilistic/probabilistic_action.py index 606969672..cd48cd228 100644 --- a/src/pycram/designators/specialized_designators/probabilistic/probabilistic_action.py +++ b/src/pycram/designators/specialized_designators/probabilistic/probabilistic_action.py @@ -1,25 +1,28 @@ -import numpy as np +import json +from dataclasses import fields, dataclass + +import random_events import tqdm -from probabilistic_model.probabilistic_circuit.nx.distributions import GaussianDistribution, SymbolicDistribution -from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit, \ - ProductUnit -from probabilistic_model.utils import MissingDict -from random_events.interval import * +from probabilistic_model.probabilistic_circuit.nx.helper import fully_factorized +from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit +from probabilistic_model.probabilistic_model import ProbabilisticModel +from random_events.interval import SimpleInterval from random_events.product_algebra import Event, SimpleEvent from random_events.set import SetElement -from random_events.variable import Symbolic, Continuous +from random_events.variable import Symbolic, Continuous, Variable +from sortedcontainers import SortedSet from sqlalchemy import select from typing_extensions import Optional, List, Iterator -from ...action_designator import MoveAndPickUpPerformable, ActionAbstract +from ...action_designator import MoveAndPickUpPerformable, ActionAbstract, MoveAndPlacePerformable from ....costmaps import OccupancyCostmap, VisibilityCostmap from ....datastructures.enums import Arms as EArms, Grasp as EGrasp, TaskStatus from ....datastructures.pose import Pose from ....datastructures.world import World from ....designator import ActionDesignatorDescription, ObjectDesignatorDescription -from ....local_transformer import LocalTransformer -from ....orm.views import PickUpWithContextView from ....failures import ObjectUnreachable, PlanFailure +from ....local_transformer import LocalTransformer +from ....orm.views import PickUpWithContextView, PlaceWithContextView class Grasp(SetElement): @@ -38,7 +41,7 @@ class Arms(SetElement): class ProbabilisticAction: """ - Abstract class for probabilistic performables. + Abstract class for performables that have a probabilistic parametrization. """ @dataclass @@ -65,96 +68,45 @@ class Variables: """ def __init__(self, policy: Optional[ProbabilisticCircuit] = None): + self.variables = self.Variables() if policy is None: policy = self.default_policy() self.policy = policy - self.variables = self.Variables(*self.policy.variables) - - def default_policy(self) -> ProbabilisticCircuit: - """ - Create a default policy for the action. - - :return: The default policy for the action - """ - raise NotImplementedError def sample_to_action(self, sample: List) -> ActionAbstract: """ Convert a sample from the policy to a performable action. - :param sample: The sample - :return: The action + + :param sample: The sample. + :return: The action. """ raise NotImplementedError - -class GaussianCostmapModel: - """ - Class that generates a Gaussian Costmap around the center of an object. The costmap cuts out a square in the middle - that has side lengths given by ``distance_to_center``. - """ - - distance_to_center: float - """ - The side length of the cut out square. - """ - - variance: float - """ - The variance of the distributions involved - """ - - relative_x = Continuous("relative_x") - relative_y = Continuous("relative_y") - grasp = Symbolic("grasp", Grasp) - arm = Symbolic("arm", Arms) - - def __init__(self, distance_to_center: float = 0.2, variance: float = 0.5): - self.distance_to_center = distance_to_center - self.variance = variance - - def center_event(self) -> Event: + def all_variables(self) -> SortedSet: """ - Create an event that describes the center of the map. + :return: All variables of the action. """ - return SimpleEvent({self.relative_x: open(-self.distance_to_center, self.distance_to_center), - self.relative_y: open(-self.distance_to_center, - self.distance_to_center)}).as_composite_set() + return SortedSet([getattr(self.variables, field.name) for field in fields(self.variables) + if issubclass(field.type, Variable)]) - def create_model_with_center(self) -> ProbabilisticCircuit: + def default_policy(self) -> ProbabilisticCircuit: """ - Create a fully factorized gaussian at the center of the map. + :return: The default policy for the action. """ - centered_model = ProductUnit() - centered_model.add_subcircuit(GaussianDistribution(self.relative_x, 0., np.sqrt(self.variance))) - centered_model.add_subcircuit(GaussianDistribution(self.relative_y, 0., np.sqrt(self.variance))) - - grasp_probabilities = MissingDict(float, {int(element): 1 / len(self.grasp.domain.simple_sets) for element in - self.grasp.domain.simple_sets}) - - centered_model.add_subcircuit(SymbolicDistribution(self.grasp, grasp_probabilities)) + means = {v: 0 for v in self.all_variables() if v.is_numeric} + variances = {v: 1 for v in self.all_variables() if v.is_numeric} + model = fully_factorized(self.all_variables(), means, variances) + return model - arm_probabilities = MissingDict(float, {int(element): 1 / len(self.arm.domain.simple_sets) for element in - self.arm.domain.simple_sets}) - centered_model.add_subcircuit(SymbolicDistribution(self.arm, arm_probabilities)) - return centered_model.probabilistic_circuit - - def create_model(self) -> ProbabilisticCircuit: + def evidence_from_belief_state(self) -> Event: """ - Create a gaussian model that assumes mass everywhere besides the center square. - - :return: The probabilistic circuit + :return: the evidence from the belief state that influences the distribution. """ - centered_model = self.create_model_with_center() - outer_event = self.center_event().complement() - limiting_event = SimpleEvent({self.relative_x: open(-2, 2), - self.relative_y: open(-2, 2)}).as_composite_set() - event = outer_event & limiting_event - # go.Figure(event.plot()).show() - result, _ = centered_model.conditional(event) - return result + raise NotImplementedError class MoveAndPickUp(ActionDesignatorDescription, ProbabilisticAction): + @dataclass class Variables: arm: Symbolic = Symbolic("arm", Arms) @@ -162,7 +114,7 @@ class Variables: relative_x: Continuous = Continuous("relative_x") relative_y: Continuous = Continuous("relative_y") - variables: Variables # Type hint variables + variables: Variables sample_amount: int = 20 """ @@ -192,21 +144,14 @@ def __init__(self, object_designator: ObjectDesignatorDescription.Object, arms: self.arms = arms self.grasps = grasps - def default_policy(self) -> ProbabilisticCircuit: - return GaussianCostmapModel().create_model() - def sample_to_action(self, sample: List) -> MoveAndPickUpPerformable: - """ - Convert a sample from the underlying distribution to a performable action. - :param sample: The sample - :return: action - """ arm, grasp, relative_x, relative_y = sample position = [relative_x, relative_y, 0.] pose = Pose(position, frame=self.object_designator.world_object.tf_frame) standing_position = LocalTransformer().transform_pose(pose, "map") standing_position.position.z = 0 - action = MoveAndPickUpPerformable(standing_position, self.object_designator, EArms[Arms(int(arm)).name], EGrasp(int(grasp))) + action = MoveAndPickUpPerformable(standing_position, self.object_designator, EArms[Arms(int(arm)).name], + EGrasp(int(grasp))) return action def events_from_occupancy_and_visibility_costmap(self) -> Event: @@ -217,7 +162,7 @@ def events_from_occupancy_and_visibility_costmap(self) -> Event: """ # create occupancy and visibility costmap for the target object - ocm = OccupancyCostmap(distance_to_obstacle=0.3, from_ros=False, size=200, resolution=0.1, + ocm = OccupancyCostmap(distance_to_obstacle=0.4, from_ros=False, size=200, resolution=0.1, origin=self.object_designator.pose) vcm = VisibilityCostmap(min_height=1.27, max_height=1.69, size=200, resolution=0.1, origin=self.object_designator.pose) @@ -226,8 +171,8 @@ def events_from_occupancy_and_visibility_costmap(self) -> Event: # convert rectangles to events events = [] for rectangle in mcm.partitioning_rectangles(): - event = SimpleEvent({self.variables.relative_x: open(rectangle.x_lower, rectangle.x_upper), - self.variables.relative_y: open(rectangle.y_lower, rectangle.y_upper)}) + event = SimpleEvent({self.variables.relative_x: random_events.interval.open(rectangle.x_lower, rectangle.x_upper), + self.variables.relative_y: random_events.interval.open(rectangle.y_lower, rectangle.y_upper)}) events.append(event) return Event(*events) @@ -258,8 +203,9 @@ def iter_with_mode(self) -> Iterator[MoveAndPickUpPerformable]: Generate performables by sampling from the mode of the policy conditioned on visibility and occupancy. """ model = self.ground_model() - modes, _ = model.mode() - model = self.ground_model(model, modes) + + modes, log_max = model.log_mode(check_determinism=False) + model, _ = model.conditional(modes) samples = model.sample(self.sample_amount) for sample in samples: @@ -322,3 +268,160 @@ def batch_rollout(self): # reset world World.current_world.reset_world() + + +class MoveAndPlace(ActionDesignatorDescription, ProbabilisticAction): + + @dataclass + class Variables: + relative_x: Continuous = Continuous("relative_x") + relative_y: Continuous = Continuous("relative_y") + + variables: Variables + + sample_amount: int = 20 + """ + The amount of samples that should be drawn from the policy when iterating over it. + """ + + object_designator: ObjectDesignatorDescription.Object + """ + The object designator that should be picked up. + """ + + target_location: Pose + """ + The position to place the object. + """ + + arms: List[Arms] + """ + The arms that can be used for the pick up. + """ + + def __init__(self, object_designator: ObjectDesignatorDescription.Object, + target_location: Pose, policy: Optional[ProbabilisticCircuit] = None): + ActionDesignatorDescription.__init__(self) + ProbabilisticAction.__init__(self, policy) + self.object_designator = object_designator + self.target_location = target_location + + def sample_to_action(self, sample: List) -> MoveAndPlacePerformable: + relative_x, relative_y = sample + position = [relative_x + self.target_location.position.x, relative_y + self.target_location.position.y, 0.] + pose = Pose(position, frame=self.target_location.frame) + standing_position = LocalTransformer().transform_pose(pose, "map") + standing_position.position.z = 0 + action = MoveAndPlacePerformable(standing_position, self.object_designator, + self.target_location, EArms.LEFT) + return action + + def events_from_occupancy_and_visibility_costmap(self) -> Event: + """ + Create events from the occupancy and visibility costmap. + + :return: The events that can be used as evidence for the model. + """ + + # create occupancy and visibility costmap for the target object + ocm = OccupancyCostmap(distance_to_obstacle=0.4, from_ros=False, size=200, resolution=0.1, + origin=self.target_location) + vcm = VisibilityCostmap(min_height=1.27, max_height=1.69, + size=200, resolution=0.1, origin=self.target_location) + mcm = ocm + vcm + + # convert rectangles to events + events = [] + for rectangle in mcm.partitioning_rectangles(): + event = SimpleEvent({self.variables.relative_x: random_events.interval.open(rectangle.x_lower, rectangle.x_upper), + self.variables.relative_y: random_events.interval.open(rectangle.y_lower, rectangle.y_upper)}) + events.append(event) + return Event(*events) + + def ground_model(self, model: Optional[ProbabilisticCircuit] = None, + event: Optional[Event] = None) -> ProbabilisticCircuit: + """ + Ground the model to the current evidence. + + :param model: The model that should be grounded. If None, the policy is used. + :param event: The events that should be used as evidence. If None, the occupancy costmap is used. + :return: The grounded model + """ + + if model is None: + model = self.policy + if event is None: + event = self.events_from_occupancy_and_visibility_costmap() + + result, probability = model.conditional(event) + + if probability == 0: + raise ObjectUnreachable("No possible locations found") + + return result + + def iter_with_mode(self) -> Iterator[MoveAndPlacePerformable]: + """ + Generate performables by sampling from the mode of the policy conditioned on visibility and occupancy. + """ + model = self.ground_model() + modes, _ = model.log_mode(check_determinism=False) + model = self.ground_model(model, modes) + samples = model.sample(self.sample_amount) + + for sample in samples: + action = self.sample_to_action(sample) + yield action + + def __iter__(self) -> Iterator[MoveAndPickUpPerformable]: + """ + Generate performables by sampling from the policy conditioned on visibility and occupancy. + """ + model = self.ground_model(self.policy, self.events_from_occupancy_and_visibility_costmap()) + samples = model.sample(self.sample_amount) + likelihoods = model.likelihood(samples) + + # sort samples by likelihood + samples = [x for _, x in sorted(zip(likelihoods, samples), key=lambda pair: pair[0], reverse=True)] + + for sample in samples: + action = self.sample_to_action(sample) + yield action + + def iterate_without_occupancy_costmap(self) -> Iterator[MoveAndPickUpPerformable]: + """ + Generate performables by sampling from the policy without conditioning on visibility and occupancy. + """ + samples = self.policy.sample(self.sample_amount) + for sample in samples: + action = self.sample_to_action(sample) + yield action + + @staticmethod + def query_for_database(): + view = PlaceWithContextView + query = (select(view.arm, view.relative_x, view.relative_y).where(view.status == TaskStatus.SUCCEEDED)) + return query + + def batch_rollout(self): + + # initialize statistics + successful_tries = 0 + total_tries = 0 + + # create the progress bar + progress_bar = tqdm.tqdm(iter(self.iterate_without_occupancy_costmap()), total=self.sample_amount) + + for action in progress_bar: + total_tries += 1 + try: + action.perform() + successful_tries += 1 + except PlanFailure as p: + pass + + # update progress bar + progress_bar.set_postfix({"Success Probability": successful_tries / total_tries}) + + # reset world + World.current_world.reset_world() diff --git a/src/pycram/orm/views.py b/src/pycram/orm/views.py index 999a4aabe..60760ab89 100644 --- a/src/pycram/orm/views.py +++ b/src/pycram/orm/views.py @@ -3,7 +3,7 @@ import sqlalchemy.orm from sqlalchemy import table, inspect, event, select, engine, MetaData, Select, TableClause, ExecutableDDLElement from sqlalchemy.ext.compiler import compiles -from .action_designator import PickUpAction +from .action_designator import PickUpAction, PlaceAction from .base import Position, RobotState, Pose, Base, Quaternion from .object_designator import Object from .tasktree import TaskTreeNode @@ -148,3 +148,50 @@ class PickUpWithContextView(base): quaternion_w: Mapped[float] = __table__.c.w obj_type: Mapped[str] = __table__.c.obj_type status: Mapped[str] = __table__.c.status + + +class PlaceWithContextView(base): + """ + View for pickup performables with context. + """ + + __robot_position: Position = sqlalchemy.orm.aliased(Position, flat=True) + """ + 3D Vector of robot position + """ + + __robot_pose: Pose = sqlalchemy.orm.aliased(Pose, flat=True) + """ + Complete robot pose + """ + + __object_position: Position = sqlalchemy.orm.aliased(Position, flat=True) + """ + 3D Vector for object position + """ + + __table__ = view("PlaceWithContextView", Base.metadata, + (select(PlaceAction.id, PlaceAction.arm, RobotState.torso_height, + (__robot_position.x-__object_position.x).label("relative_x"), + (__robot_position.y-__object_position.y).label("relative_y"), Quaternion.x, Quaternion.y, + Quaternion.z, Quaternion.w, Object.obj_type, TaskTreeNode.status) + .join(TaskTreeNode.action.of_type(PlaceAction)) + .join(PlaceAction.robot_state) + .join(__robot_pose, RobotState.pose) + .join(__robot_position, __robot_pose.position) + .join(Pose.orientation) + .join(PlaceAction.object) + .join(Object.pose) + .join(__object_position, Pose.position))) + + id: Mapped[int] = __table__.c.id + arm: Mapped[str] = __table__.c.arm + torso_height: Mapped[float] = __table__.c.torso_height + relative_x: Mapped[float] = column_property(__table__.c.relative_x) + relative_y: Mapped[float] = column_property(__table__.c.relative_y) + quaternion_x: Mapped[float] = __table__.c.x + quaternion_y: Mapped[float] = __table__.c.y + quaternion_z: Mapped[float] = __table__.c.z + quaternion_w: Mapped[float] = __table__.c.w + obj_type: Mapped[str] = __table__.c.obj_type + status: Mapped[str] = __table__.c.status diff --git a/test/test_move_and_pick_up.py b/test/test_move_and_pick_up.py index 2c4268950..1a2008278 100644 --- a/test/test_move_and_pick_up.py +++ b/test/test_move_and_pick_up.py @@ -2,28 +2,20 @@ import unittest import numpy as np +from random_events.variable import Continuous, Symbolic +from sortedcontainers import SortedSet from bullet_world_testcase import BulletWorldTestCase from pycram.datastructures.enums import ObjectType, Arms, Grasp from pycram.designator import ObjectDesignatorDescription from pycram.designators.action_designator import MoveTorsoActionPerformable from pycram.designators.specialized_designators.probabilistic.probabilistic_action import (MoveAndPickUp, - GaussianCostmapModel) + Arms as PMArms, + Grasp as PMGrasp) from pycram.failures import PlanFailure from pycram.process_module import simulated_robot -class GaussianCostmapModelTestCase(unittest.TestCase): - - def test_create_model(self): - gcm = GaussianCostmapModel() - model = gcm.create_model() - self.assertEqual(model.probability(gcm.center_event()), 0) - self.assertEqual(len(model.variables), 4) - # p_xy = model.marginal([gcm.relative_x, gcm.relative_y]) - # go.Figure(p_xy.plot(), p_xy.plotly_layout()).show() - - class MoveAndPickUpTestCase(BulletWorldTestCase): @classmethod @@ -32,6 +24,15 @@ def setUpClass(cls): np.random.seed(69) random.seed(69) + def test_variables(self): + object_designator = ObjectDesignatorDescription(types=[ObjectType.MILK]).resolve() + move_and_pick = MoveAndPickUp(object_designator, arms=[Arms.LEFT, Arms.RIGHT], + grasps=[Grasp.FRONT, Grasp.LEFT, Grasp.RIGHT, Grasp.TOP]) + result = SortedSet([Symbolic("arm", PMArms), Symbolic("grasp", PMGrasp), + Continuous("relative_x"), Continuous("relative_y")]) + all_variables = move_and_pick.all_variables() + self.assertEqual(all_variables, result) + def test_grounding(self): object_designator = ObjectDesignatorDescription(types=[ObjectType.MILK]).resolve() move_and_pick = MoveAndPickUp(object_designator, arms=[Arms.LEFT, Arms.RIGHT], @@ -42,12 +43,12 @@ def test_grounding(self): self.assertTrue(event.is_disjoint()) self.assertIsNotNone(model) - def test_move_and_pick_up(self): + def test_move_and_pick_up_with_mode(self): object_designator = ObjectDesignatorDescription(types=[ObjectType.MILK]).resolve() move_and_pick = MoveAndPickUp(object_designator, arms=[Arms.LEFT, Arms.RIGHT], grasps=[Grasp.FRONT, Grasp.LEFT, Grasp.RIGHT, Grasp.TOP]) with simulated_robot: - for action in move_and_pick: + for action in move_and_pick.iter_with_mode(): try: MoveTorsoActionPerformable(0.3).perform() action.perform() diff --git a/test/test_move_and_place.py b/test/test_move_and_place.py new file mode 100644 index 000000000..74cec82c4 --- /dev/null +++ b/test/test_move_and_place.py @@ -0,0 +1,44 @@ +import random +import unittest + +import numpy as np + +from bullet_world_testcase import BulletWorldTestCase +from pycram.datastructures.enums import ObjectType, Arms, Grasp +from pycram.datastructures.pose import Pose +from pycram.designator import ObjectDesignatorDescription +from pycram.designators.action_designator import MoveTorsoActionPerformable, NavigateActionPerformable, \ + PickUpActionPerformable +from pycram.designators.specialized_designators.probabilistic.probabilistic_action import (MoveAndPlace) +from pycram.failures import PlanFailure +from pycram.process_module import simulated_robot + + +class MoveAndPlaceTestCase(BulletWorldTestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + np.random.seed(69) + random.seed(69) + + def test_with_mode(self): + object_designator = ObjectDesignatorDescription(types=[ObjectType.MILK]).resolve() + target_location = Pose([1.3, 1, 0.9], [0, 0, 0, 1]) + designator = MoveAndPlace(object_designator, target_location) + + with simulated_robot: + NavigateActionPerformable(Pose([0.6, 0.4, 0], [0, 0, 0, 1])).perform() + MoveTorsoActionPerformable(0.3).perform() + PickUpActionPerformable(object_designator, Arms.LEFT, Grasp.FRONT).perform() + with simulated_robot: + for action in designator: + try: + action.perform() + return # Success + except PlanFailure as e: + continue + raise AssertionError("No action performed successfully.") + +if __name__ == '__main__': + unittest.main()