Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions controller_manager/test/test_ros2_control_node_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from launch_testing.actions import ReadyToTest
import launch_testing.markers
import launch_ros.actions
from launch_ros.actions import Node

import rclpy
from controller_manager.test_utils import (
Expand All @@ -46,6 +47,7 @@
from controller_manager.launch_utils import generate_controllers_spawner_launch_description



# Executes the given launch file and checks if all nodes can be started
@pytest.mark.launch_test
def generate_test_description():
Expand Down Expand Up @@ -102,6 +104,63 @@ def setUp(self):
def tearDown(self):
self.node.destroy_node()

# ------------------------------------------------------------------
# Helper methods
# ------------------------------------------------------------------
def _extract_actions(self, result):
"""Return a list of launch actions, regardless of type."""
if isinstance(result, list):
return result
elif hasattr(result, "entities"): # LaunchDescription in newer ROS2
return list(result.entities)
else:
return [result]

def _assert_launch_result(self, result, min_len=0):
"""Verify that result is list- or LaunchDescription-like."""
self.assertTrue(
isinstance(result, (list, LaunchDescription)),
f"Unexpected result type: {type(result)}",
)
actions = self._extract_actions(result)
self.assertGreaterEqual(len(actions), min_len)
for act in actions:
self.assertTrue(
hasattr(act, "execute") or hasattr(act, "visit"),
f"Invalid action type: {type(act)}",
)
return actions

# ------------------------------------------------------------------
# Launch utility tests
# ------------------------------------------------------------------
def test_generate_controllers_spawner_from_list(self):
controllers = ["test_controller_1", "test_controller_2"]
result = generate_controllers_spawner_launch_description(controllers)
actions = self._assert_launch_result(result, min_len=1)
self.assertTrue(actions is not None and len(actions) > 0)

def test_generate_controllers_spawner_from_dict(self):
"""Ensure dict-based API works with string param files (old-style)."""
controllers = {
"ctrl_A": ["/tmp/dummy.yaml"],
"ctrl_B": ["/tmp/dummy.yaml"],
}
result = generate_controllers_spawner_launch_description_from_dict(controllers)
actions = self._extract_actions(result)
self.assertIsInstance(result, LaunchDescription)
self.assertEqual(len(actions), 3)
self.assertIsInstance(actions[-1], Node)

def test_generate_load_controller_launch_description(self):
"""Test load controller description with valid single string params."""
controllers = ["test_controller_load"]
result = generate_load_controller_launch_description(controllers)
actions = self._extract_actions(result)
self.assertIsInstance(result, LaunchDescription)
self.assertEqual(len(actions), 3)
self.assertIsInstance(actions[-1], Node)

def test_node_start(self):
check_node_running(self.node, "controller_manager")

Expand Down