Skip to content

Commit

Permalink
ros2node: Add verb get_type_description
Browse files Browse the repository at this point in the history
Adds the verb `get_type_description` which calls the given node's
~/get_type_description service and prints out the response.
This verb is basically a shorthand for
`ros2 service call <node>/get_type_description ...`.
  • Loading branch information
achim-k committed Apr 4, 2023
1 parent 08fdf98 commit 6e738a5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 0 deletions.
1 change: 1 addition & 0 deletions ros2node/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<depend>ros2cli</depend>

<exec_depend>rclpy</exec_depend>
<exec_depend>rosidl_runtime_py</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
64 changes: 64 additions & 0 deletions ros2node/ros2node/verb/get_type_description.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2023 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from ros2cli.node import NODE_NAME_PREFIX
from ros2node.api import NodeNameCompleter
from ros2node.verb import VerbExtension
from rosidl_runtime_py import message_to_yaml
from type_description_interfaces.srv import GetTypeDescription


class GetTypeDescriptionVerb(VerbExtension):
"""Call a node's ~/get_type_description service and print the response."""

def add_arguments(self, parser, cli_name):
argument = parser.add_argument(
'node_name',
help='Node name to request information')
argument.completer = NodeNameCompleter()
argument = parser.add_argument(
'type_name',
help='ROS interface type name, in PACKAGE/NAMESPACE/TYPENAME format')
argument = parser.add_argument(
'type_hash',
help='REP-2011 RIHS hash string')
argument = parser.add_argument(
'--include_type_sources', default=False, action='store_true',
help='Whether to return the original idl/msg/etc. source file(s) in the response')

def main(self, *, args):
rclpy.init()
node = rclpy.create_node(NODE_NAME_PREFIX + '_requester_%s_%s' %
('type_description_interfaces', 'GetTypeDescription'))

service_name = args.node_name + '/get_type_description'
cli = node.create_client(GetTypeDescription, service_name)

if not cli.service_is_ready():
print(f'waiting for service {service_name} to become available...')
cli.wait_for_service()

request = GetTypeDescription.Request(type_name=args.type_name,
type_hash=args.type_hash,
include_type_sources=args.include_type_sources)
future = cli.call_async(request)
rclpy.spin_until_future_complete(node, future)
if future.result() is not None:
print(message_to_yaml(future.result()))
else:
raise RuntimeError('Exception while calling service: %r' % future.exception())

node.destroy_node()
rclpy.shutdown()
1 change: 1 addition & 0 deletions ros2node/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'ros2node.verb': [
'info = ros2node.verb.info:InfoVerb',
'list = ros2node.verb.list:ListVerb',
'get_type_description = ros2node.verb.get_type_description:GetTypeDescriptionVerb',
],
}
)
41 changes: 41 additions & 0 deletions ros2node/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,44 @@ def test_info_hidden_node_hidden_flag(self):
]),
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_get_type_description_invalid_hash(self):
arguments = ['get_type_description', '/complex_node', 'some_type', 'some_hash']
with self.launch_node_command(arguments=arguments) as node_command:
assert node_command.wait_for_shutdown(timeout=10)
assert node_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=itertools.chain([
'successful: false',
re.compile(r'failure_reason: .*'),
'type_description:',
' type_description:',
' type_name: ''',
' fields: []',
' referenced_type_descriptions: []',
'type_sources: []',
'extra_information: []',
]),
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_get_type_description_valid_hash(self):
arguments = [
'get_type_description',
'/complex_node',
'test_msgs.msg.Arrays',
# Type hash below is from test_msgs/share/test_msgs/msg/Arrays.json
'RIHS01_8a321b80dae8c44dfd1ce558eaf1b1cd5ebbe5950863ed675836535511a7f91a'
]
with self.launch_node_command(arguments=arguments) as node_command:
assert node_command.wait_for_shutdown(timeout=10)
assert node_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=itertools.chain([
'successful: true',
"failure_reason: ''",
]),
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output

0 comments on commit 6e738a5

Please sign in to comment.