From bf24f0db7d2d578ceebf57fa01b51111cb28b76f Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Fri, 6 Sep 2019 21:10:21 -0300 Subject: [PATCH] Refactor end-to-end CLI tests. Improved test coverage for: - ros2action - ros2service - ros2topic - ros2msg - ros2srv - ros2interface - ros2node - ros2pkg Signed-off-by: Michel Hidalgo --- ros2action/package.xml | 1 + .../test/fixtures/fibonacci_action_server.py | 71 ++ ros2action/test/test_cli.py | 313 +++++++++ ros2interface/package.xml | 2 + ros2interface/ros2interface/api/__init__.py | 2 + ros2interface/test/test_cli.py | 386 +++++++++-- ros2msg/package.xml | 1 + ros2msg/test/test_cli.py | 183 +++-- ros2node/package.xml | 3 + ros2node/test/fixtures/complex_node.py | 76 ++ ros2node/test/test_cli.py | 181 +++++ ros2pkg/package.xml | 1 + ros2pkg/test/test_cli.py | 183 ++++- ros2service/package.xml | 2 + ros2service/test/fixtures/echo_server.py | 54 ++ ros2service/test/test_cli.py | 314 +++++++++ ros2srv/package.xml | 1 + ros2srv/test/test_cli.py | 209 ++++-- ros2topic/package.xml | 4 + ros2topic/test/fixtures/controller_node.py | 58 ++ ros2topic/test/fixtures/listener_node.py | 54 ++ ros2topic/test/fixtures/repeater_node.py | 73 ++ ros2topic/test/fixtures/talker_node.py | 54 ++ ros2topic/test/test_cli.py | 652 ++++++++++++++++++ test_ros2cli/COLCON_IGNORE | 0 .../test/ros2action_test_configuration.py | 1 + .../test/ros2topic_test_configuration.py | 8 +- 27 files changed, 2734 insertions(+), 153 deletions(-) create mode 100644 ros2action/test/fixtures/fibonacci_action_server.py create mode 100644 ros2action/test/test_cli.py create mode 100644 ros2node/test/fixtures/complex_node.py create mode 100644 ros2node/test/test_cli.py create mode 100644 ros2service/test/fixtures/echo_server.py create mode 100644 ros2service/test/test_cli.py create mode 100644 ros2topic/test/fixtures/controller_node.py create mode 100644 ros2topic/test/fixtures/listener_node.py create mode 100644 ros2topic/test/fixtures/repeater_node.py create mode 100644 ros2topic/test/fixtures/talker_node.py create mode 100644 ros2topic/test/test_cli.py create mode 100644 test_ros2cli/COLCON_IGNORE diff --git a/ros2action/package.xml b/ros2action/package.xml index c331d4a7b..b8662d57d 100644 --- a/ros2action/package.xml +++ b/ros2action/package.xml @@ -23,6 +23,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing test_msgs diff --git a/ros2action/test/fixtures/fibonacci_action_server.py b/ros2action/test/fixtures/fibonacci_action_server.py new file mode 100644 index 000000000..19f3948f2 --- /dev/null +++ b/ros2action/test/fixtures/fibonacci_action_server.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import time + +import sys + +import rclpy +from rclpy.action import ActionServer +from rclpy.node import Node + +from test_msgs.action import Fibonacci + + +class FibonacciActionServer(Node): + + def __init__(self): + super().__init__('fibonacci_action_server') + self._action_server = ActionServer( + self, + Fibonacci, + 'fibonacci', + self.execute_callback) + + def destroy_node(self): + self._action_server.destroy() + super().destroy_node() + + def execute_callback(self, goal_handle): + feedback = Fibonacci.Feedback() + feedback.sequence = [0, 1] + + for i in range(1, goal_handle.request.order): + feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1]) + goal_handle.publish_feedback(feedback) + + goal_handle.succeed() + + result = Fibonacci.Result() + result.sequence = feedback.sequence + return result + + +def main(args=None): + rclpy.init(args=args) + + node = FibonacciActionServer() + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('server stopped cleanly') + except BaseException: + print('exception in server:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2action/test/test_cli.py b/ros2action/test/test_cli.py new file mode 100644 index 000000000..90a4cd390 --- /dev/null +++ b/ros2action/test/test_cli.py @@ -0,0 +1,313 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + +import yaml + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_action_server_executable = os.path.join( + os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py' + ) + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + ExecuteProcess( + cmd=[sys.executable, path_to_action_server_executable], + additional_env={'RMW_IMPLEMENTATION': rmw_implementation} + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env={'RMW_IMPLEMENTATION': rmw_implementation} + ) + ] + ), + ]) + + +def get_fibonacci_send_goal_output(*, order=1, with_feedback=False): + assert order > 0 + output = [ + 'Waiting for an action server to become available...', + 'Sending goal:', + ' order: {}'.format(order), + '', + re.compile('Goal accepted with ID: [a-f0-9]+'), + '', + ] + sequence = [0, 1] + for _ in range(order - 1): + sequence.append(sequence[-1] + sequence[-2]) + if with_feedback: + output.append('Feedback:') + output.extend((' ' + yaml.dump({ + 'sequence': sequence + })).splitlines()) + output.append('') + output.append('Result:'), + output.extend((' ' + yaml.dump({ + 'sequence': sequence + })).splitlines()) + output.append('') + output.append('Goal finished with status: SUCCEEDED') + return output + + +class TestROS2ActionCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_action_command(self, arguments): + action_command_action = ExecuteProcess( + cmd=['ros2', 'action', *arguments], + name='ros2action-cli', output='screen', + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + } + ) + with launch_testing.tools.launch_process( + launch_service, action_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=rmw_implementation + ) + ) as action_command: + yield action_command + cls.launch_action_command = launch_action_command + + def test_info_on_nonexistent_action(self): + with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /not_an_action', + 'Action clients: 0', + 'Action servers: 0', + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info(self): + with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ' /fibonacci_action_server' + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info_with_types(self): + with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ' /fibonacci_action_server [test_msgs/action/Fibonacci]' + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_fibonacci_info_count(self): + with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Action: /fibonacci', + 'Action clients: 0', + 'Action servers: 1', + ], + text=action_command.output, + strict=False + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list(self): + with self.launch_action_command(arguments=['list']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/fibonacci'], + text=action_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_action_command(arguments=['list', '-t']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'], + text=action_command.output, strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_action_command(arguments=['list', '-c']) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + command_output_lines = action_command.output.splitlines() + assert len(command_output_lines) == 1 + assert int(command_output_lines[0]) == 1 + + @launch_testing.markers.retry_on_failure(times=5) + def test_send_fibonacci_goal(self): + with self.launch_action_command( + arguments=[ + 'send_goal', + '/fibonacci', + 'test_msgs/action/Fibonacci', + '{order: 5}' + ], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_fibonacci_send_goal_output(order=5), + text=action_command.output, strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_send_fibonacci_goal_with_feedback(self): + with self.launch_action_command( + arguments=[ + 'send_goal', + '-f', + '/fibonacci', + 'test_msgs/action/Fibonacci', + '{order: 5}' + ], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=10) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_fibonacci_send_goal_output( + order=5, with_feedback=True + ), + text=action_command.output, strict=True + ) + + def test_show_fibonacci(self): + with self.launch_action_command( + arguments=['show', 'test_msgs/action/Fibonacci'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'int32 order', + '---', + 'int32[] sequence', + '---', + 'int32[] sequence' + ], + text=action_command.output, + strict=False + ) + + def test_show_not_a_package(self): + with self.launch_action_command( + arguments=['show', 'not_a_package/action/Fibonacci'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=action_command.output, strict=True + ) + + # TODO(hidmic): make 'ros2 action show' fail accordingly + # def test_show_not_an_action_ns(self): + # with self.launch_action_command( + # arguments=['show', 'test_msgs/foo/Fibonacci'], + # ) as action_command: + # assert action_command.wait_for_shutdown(timeout=2) + # assert action_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown action type'], + # text=action_command.output, strict=True + # ) + + def test_show_not_an_action_typename(self): + with self.launch_action_command( + arguments=['show', 'test_msgs/action/NotAnActionTypeName'], + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown action type'], + text=action_command.output, strict=True + ) + + def test_show_not_an_action_type(self): + with self.launch_action_command( + arguments=['show', 'not_an_action_type'] + ) as action_command: + assert action_command.wait_for_shutdown(timeout=2) + assert action_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed action type is invalid'], + text=action_command.output, strict=True + ) diff --git a/ros2interface/package.xml b/ros2interface/package.xml index 3af3b8989..fa500ed6a 100644 --- a/ros2interface/package.xml +++ b/ros2interface/package.xml @@ -19,8 +19,10 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs + test_msgs ament_python diff --git a/ros2interface/ros2interface/api/__init__.py b/ros2interface/ros2interface/api/__init__.py index f7cb0f4ee..2c193854c 100644 --- a/ros2interface/ros2interface/api/__init__.py +++ b/ros2interface/ros2interface/api/__init__.py @@ -41,6 +41,8 @@ def get_interfaces(package_name): def get_interface_path(parts): prefix_path = has_resource('packages', parts[0]) + if not prefix_path: + raise LookupError('Unknown package {}'.format(parts[0])) joined = '/'.join(parts) if len(parts[-1].rsplit('.', 1)) == 1: joined += '.idl' diff --git a/ros2interface/test/test_cli.py b/ros2interface/test/test_cli.py index d161cc11f..07d3ff1c2 100644 --- a/ros2interface/test/test_cli.py +++ b/ros2interface/test/test_cli.py @@ -12,42 +12,350 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'interface', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_msgs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'interface', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - message_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in message_types) - count += len(message_types) - - if package_name != 'std_msgs': - continue - for message_name in [t[len(package_name) + 1:] for t in message_types]: - show_cmd = [ - 'ros2', 'interface', 'show', package_name + '/' + message_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if message_name == 'String': - assert show_result.stdout.rstrip() == b'string data' - - package_cmd = ['ros2', 'interface', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'interface', 'show', 'std_msgs/_not_existing_message_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import itertools +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +some_messages_from_std_msgs = [ + 'std_msgs/msg/Bool', + 'std_msgs/msg/Float32', + 'std_msgs/msg/Float64', +] + +some_services_from_std_srvs = [ + 'std_srvs/srv/Empty', + 'std_srvs/srv/SetBool', + 'std_srvs/srv/Trigger', +] + +some_actions_from_test_msgs = [ + 'test_msgs/action/Fibonacci' +] + +some_interfaces = ( + some_messages_from_std_msgs + + some_services_from_std_srvs + + some_actions_from_test_msgs +) + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +class TestROS2MsgCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_interface_command(self, arguments): + interface_command_action = ExecuteProcess( + cmd=['ros2', 'interface', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2interface-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, interface_command_action, proc_info, proc_output + ) as interface_command: + yield interface_command + cls.launch_interface_command = launch_interface_command + + def test_list_interfaces(self): + with self.launch_interface_command(arguments=['list']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + filter_ = launch_testing.tools.basic_output_filter( + filtered_prefixes=['Messages:', 'Services:', 'Actions:'] + ) + output_lines = filter_(interface_command.output).splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/(msg|srv|action)/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert launch_testing.tools.expect_output( + expected_lines=some_interfaces, + lines=output_lines, + strict=False + ) + + def test_list_messages(self): + with self.launch_interface_command(arguments=['list', '-m']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Messages:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/msg/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + assert launch_testing.tools.expect_output( + expected_lines=some_messages_from_std_msgs, + lines=output_lines, + strict=False + ) + + def test_list_services(self): + with self.launch_interface_command(arguments=['list', '-s']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Services:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/srv/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + assert launch_testing.tools.expect_output( + expected_lines=some_services_from_std_srvs, + lines=output_lines, + strict=False + ) + + def test_list_actions(self): + with self.launch_interface_command(arguments=['list', '-a']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['Actions:'], itertools.repeat( + re.compile(r'\s*[A-z0-9_]+/action/[A-z0-9_]+'), len(output_lines) - 1 + ) + ), + lines=output_lines, + strict=True + ) + assert launch_testing.tools.expect_output( + expected_lines=some_actions_from_test_msgs, + lines=output_lines, + strict=False + ) + + def test_package_on_nonexistent_package(self): + with self.launch_interface_command( + arguments=['package', 'not_a_package'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package not_a_package'], + text=interface_command.output, + strict=True + ) + + def test_package_on_std_msgs(self): + with self.launch_interface_command( + arguments=['package', 'std_msgs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'std_msgs/msg/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + + def test_package_on_std_srvs(self): + with self.launch_interface_command( + arguments=['package', 'std_srvs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'std_srvs/srv/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(srv in output_lines for srv in some_services_from_std_srvs) + + def test_package_on_test_msgs(self): + with self.launch_interface_command( + arguments=['package', 'test_msgs'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert launch_testing.tools.expect_output( + expected_lines=itertools.repeat( + re.compile(r'test_msgs/(msg|srv|action)/[A-z0-9_]+'), len(output_lines) + ), + lines=output_lines, + strict=True + ) + assert all(action in output_lines for action in some_actions_from_test_msgs) + + def test_packages(self): + with self.launch_interface_command(arguments=['packages']) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' in output_lines + assert 'std_srvs' in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_messages(self): + with self.launch_interface_command( + arguments=['packages', '-m'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' in output_lines + assert 'std_srvs' not in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_services(self): + with self.launch_interface_command( + arguments=['packages', '-s'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' not in output_lines + assert 'std_srvs' in output_lines + assert 'test_msgs' in output_lines + + def test_packages_with_actions(self): + with self.launch_interface_command( + arguments=['packages', '-a'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = interface_command.output.splitlines() + assert 'std_msgs' not in output_lines + assert 'std_srvs' not in output_lines + assert 'test_msgs' in output_lines + + def test_show_message(self): + with self.launch_interface_command( + arguments=['show', 'std_msgs/msg/String'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'module std_msgs {', + ' module msg {', + ' struct String {', + ' string data;', + ' };', + ' };', + '};' + ], + text=interface_command.output, + strict=False + ) + + def test_show_service(self): + with self.launch_interface_command( + arguments=['show', 'std_srvs/srv/SetBool'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'module std_srvs {', + ' module srv {', + ' struct SetBool_Request {', + ' boolean data;', + ' };', + ' struct SetBool_Response {', + ' boolean success;', + ' string message;', + ' };', + ' };', + '};' + ], + text=interface_command.output, + strict=False + ) + + def test_show_action(self): + with self.launch_interface_command( + arguments=['show', 'test_msgs/action/Fibonacci'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'module test_msgs {', + ' module action {', + ' struct Fibonacci_Goal {', + ' int32 order;', + ' };', + ' struct Fibonacci_Result {', + ' sequence sequence;', + ' };', + ' struct Fibonacci_Feedback {', + ' sequence sequence;', + ' };', + ' };', + '};' + ], + text=interface_command.output, + strict=False + ) + + def test_show_not_a_package(self): + with self.launch_interface_command( + arguments=['show', 'not_a_package/msg/NotAMessageTypeName'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package not_a_package'], + text=interface_command.output, + strict=True + ) + + def test_show_not_an_interface(self): + with self.launch_interface_command( + arguments=['show', 'std_msgs/msg/NotAMessageTypeName'] + ) as interface_command: + assert interface_command.wait_for_shutdown(timeout=2) + assert interface_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=[re.compile( + r"Could not find the interface '.+NotAMessageTypeName\.idl'" + )], + text=interface_command.output, + strict=True + ) diff --git a/ros2msg/package.xml b/ros2msg/package.xml index 14b64ee27..bec55bfef 100644 --- a/ros2msg/package.xml +++ b/ros2msg/package.xml @@ -18,6 +18,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs diff --git a/ros2msg/test/test_cli.py b/ros2msg/test/test_cli.py index 8bd92f7d2..142464e79 100644 --- a/ros2msg/test/test_cli.py +++ b/ros2msg/test/test_cli.py @@ -1,4 +1,4 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. +# Copyright 2019 Open Source Robotics Foundation, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,47 +12,140 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'msg', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_msgs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'msg', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - message_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in message_types) - count += len(message_types) - - if package_name != 'std_msgs': - continue - for message_name in [t[len(package_name) + 1:] for t in message_types]: - show_cmd = [ - 'ros2', 'msg', 'show', package_name + '/' + message_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if message_name == 'String': - assert show_result.stdout.rstrip() == b'string data' - - list_cmd = ['ros2', 'msg', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - message_types = list_result.stdout.decode().splitlines() - assert len(message_types) == count - - package_cmd = ['ros2', 'msg', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'msg', 'show', 'std_msgs/_not_existing_message_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +some_messages_from_std_msgs = [ + 'std_msgs/msg/Bool', + 'std_msgs/msg/Float32', + 'std_msgs/msg/Float64', +] + + +class TestROS2MsgCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_msg_command(self, arguments): + msg_command_action = ExecuteProcess( + cmd=['ros2', 'msg', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2msg-cli', output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, msg_command_action, proc_info, proc_output + ) as msg_command: + yield msg_command + cls.launch_msg_command = launch_msg_command + + def test_list_messages(self): + with self.launch_msg_command(arguments=['list']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = msg_command.output.splitlines() + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + assert all(re.match(r'.*/msg/.*', line) is not None for line in output_lines) + + def test_package_messages(self): + with self.launch_msg_command(arguments=['package', 'std_msgs']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = msg_command.output.splitlines() + assert all(msg in output_lines for msg in some_messages_from_std_msgs) + assert all(re.match(r'std_msgs/msg/.*', line) is not None for line in output_lines) + + def test_not_a_package_messages(self): + with self.launch_msg_command(arguments=['package', 'not_a_package']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=msg_command.output, strict=True + ) + + def test_list_packages_with_messages(self): + with self.launch_msg_command(arguments=['packages']) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + assert 'std_msgs' in msg_command.output.splitlines() + + def test_show_message(self): + with self.launch_msg_command( + arguments=['show', 'std_msgs/msg/String'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['string data'], + text=msg_command.output, strict=False + ) + + def test_show_not_a_message_typename(self): + with self.launch_msg_command( + arguments=['show', 'std_msgs/msg/NotAMessageTypeName'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown message name'], + text=msg_command.output, strict=True + ) + + # TODO(hidmic): make 'ros2 msg show' fail accordingly + # def test_show_not_a_message_ns(self): + # with self.launch_msg_command( + # arguments=['show', 'std_msgs/foo/String'] + # ) as msg_command: + # assert msg_command.wait_for_shutdown(timeout=10) + # assert msg_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown message name'], + # text=msg_command.output, strict=True + # ) + + def test_show_not_a_package(self): + with self.launch_msg_command( + arguments=['show', 'not_a_package/msg/String'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=msg_command.output, strict=True + ) + + def test_show_not_a_message_type(self): + with self.launch_msg_command( + arguments=['show', 'not_a_message_type'] + ) as msg_command: + assert msg_command.wait_for_shutdown(timeout=10) + assert msg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed message type is invalid'], + text=msg_command.output, strict=True + ) diff --git a/ros2node/package.xml b/ros2node/package.xml index ef8a056e0..b6748f6e2 100644 --- a/ros2node/package.xml +++ b/ros2node/package.xml @@ -16,6 +16,9 @@ ament_pep257 ament_xmllint python3-pytest + rclpy + ros_testing + test_msgs ament_python diff --git a/ros2node/test/fixtures/complex_node.py b/ros2node/test/fixtures/complex_node.py new file mode 100644 index 000000000..d715e218b --- /dev/null +++ b/ros2node/test/fixtures/complex_node.py @@ -0,0 +1,76 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.action import ActionServer +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + +from test_msgs.action import Fibonacci +from test_msgs.msg import Arrays +from test_msgs.msg import Strings +from test_msgs.srv import BasicTypes + + +class ComplexNode(Node): + + def __init__(self): + super().__init__('complex_node') + self.publisher = self.create_publisher(Arrays, 'arrays', qos_profile_system_default) + self.subscription = self.create_subscription( + Strings, 'strings', lambda msg: None, qos_profile_system_default + ) + self.server = self.create_service(BasicTypes, 'basic', lambda req, res: res) + self.action_server = ActionServer( + self, Fibonacci, 'fibonacci', self.action_callback + ) + self.timer = self.create_timer(1.0, self.pub_callback) + + def destroy_node(self): + self.timer.destroy() + self.publisher.destroy() + self.subscription.destroy() + self.server.destroy() + self.action_server.destroy() + super().destroy_node() + + def pub_callback(self): + self.publisher.publish(Arrays()) + + def action_callback(self, goal_handle): + goal_handle.succeed() + return Fibonacci.Result() + + +def main(args=None): + rclpy.init(args=args) + + node = ComplexNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('node stopped cleanly') + except BaseException: + print('exception in node:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2node/test/test_cli.py b/ros2node/test/test_cli.py new file mode 100644 index 000000000..a1f6e254c --- /dev/null +++ b/ros2node/test/test_cli.py @@ -0,0 +1,181 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import itertools +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_complex_node_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'complex_node.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_complex_node_script], + node_name='complex_node', + additional_env=additional_env + ), + Node( + node_executable=sys.executable, + arguments=[path_to_complex_node_script], + node_name='_hidden_complex_node', + additional_env=additional_env + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2NodeCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_node_command(self, arguments): + node_command_action = ExecuteProcess( + cmd=['ros2', 'node', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2node-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, node_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + # ignore launch_ros and ros2cli daemon nodes + filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'], + filtered_rmw_implementation=rmw_implementation + ) + ) as node_command: + yield node_command + cls.launch_node_command = launch_node_command + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_nodes(self): + with self.launch_node_command(arguments=['list']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/complex_node'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_nodes(self): + with self.launch_node_command(arguments=['list', '-a']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/_hidden_complex_node', + '/complex_node' + ], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_node_count(self): + with self.launch_node_command(arguments=['list', '-c']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = node_command.output.splitlines() + assert len(output_lines) == 1 + # Fixture nodes that are not hidden plus launch_ros node. + assert int(output_lines[0]) == 2 + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_nodes_count(self): + with self.launch_node_command(arguments=['list', '-c', '-a']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = node_command.output.splitlines() + assert len(output_lines) == 1 + # All fixture nodes plus launch_ros and ros2cli daemon nodes. + assert int(output_lines[0]) == 4 + + @launch_testing.markers.retry_on_failure(times=5) + def test_info_node(self): + with self.launch_node_command(arguments=['info', '/complex_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + # TODO(hidmic): only optionally show hidden topics and services + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain([ + '/complex_node', + ' Subscribers:', + ' /strings: test_msgs/msg/Strings', + ' Publishers:', + ' /arrays: test_msgs/msg/Arrays', + ' /parameter_events: rcl_interfaces/msg/ParameterEvent', + ' /rosout: rcl_interfaces/msg/Log', + ' Services:', + ' /basic: test_msgs/srv/BasicTypes', + ], itertools.repeat(re.compile( + r'\s*/complex_node/.*parameter.*: rcl_interfaces/srv/.*Parameter.*' + ), 6), [ + ' Actions Servers:', + ' /fibonacci: test_msgs/action/Fibonacci', + ' Actions Clients:', + '' + ]), + text=node_command.output, strict=False + ), 'Output does not match:\n' + node_command.output diff --git a/ros2pkg/package.xml b/ros2pkg/package.xml index 8071f4c22..cb5899f04 100644 --- a/ros2pkg/package.xml +++ b/ros2pkg/package.xml @@ -21,6 +21,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing ament_python diff --git a/ros2pkg/test/test_cli.py b/ros2pkg/test/test_cli.py index 0a71854fa..291e14416 100644 --- a/ros2pkg/test/test_cli.py +++ b/ros2pkg/test/test_cli.py @@ -12,26 +12,175 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib import os -import subprocess +import tempfile +import unittest +import xml.etree.ElementTree as ET +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction -def test_cli(): - list_cmd = ['ros2', 'pkg', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - package_names = list_result.stdout.decode().splitlines() +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools - # explicit dependencies of this package will for sure be available - assert 'ros2cli' in package_names +import pytest - prefix_cmd = ['ros2', 'pkg', 'prefix', 'ros2cli'] - prefix_result = subprocess.run( - prefix_cmd, stdout=subprocess.PIPE, check=True) - prefix_path = prefix_result.stdout.decode().splitlines() - assert len(prefix_path) == 1 - assert os.path.isdir(prefix_path[0]) - prefix_cmd = ['ros2', 'pkg', 'prefix', '_not_existing_package_name'] - prefix_result = subprocess.run( - prefix_cmd, stdout=subprocess.PIPE) - assert prefix_result.returncode +some_cli_packages = [ + 'ros2cli', + 'ros2pkg' +] + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +class TestROS2PkgCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_pkg_command(self, arguments, **kwargs): + pkg_command_action = ExecuteProcess( + cmd=['ros2', 'pkg', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2pkg-cli', + output='screen', + **kwargs + ) + with launch_testing.tools.launch_process( + launch_service, pkg_command_action, proc_info, proc_output + ) as pkg_command: + yield pkg_command + cls.launch_pkg_command = launch_pkg_command + + def test_list_packages(self): + with self.launch_pkg_command(arguments=['list']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = pkg_command.output.splitlines() + assert all(pkg in output_lines for pkg in some_cli_packages) + + def test_package_prefix(self): + with self.launch_pkg_command(arguments=['prefix', 'ros2cli']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = pkg_command.output.splitlines() + assert len(output_lines) == 1 + prefix_path = output_lines[0] + assert os.path.isdir(prefix_path) + + def test_not_a_package_prefix(self): + with self.launch_pkg_command(arguments=['prefix', 'not_a_package']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Package not found'], + text=pkg_command.output, + strict=True + ) + + def test_xml(self): + with self.launch_pkg_command(arguments=['xml', 'ros2cli']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + root = ET.XML(pkg_command.output) + assert root.tag == 'package' + assert root.find('name').text == 'ros2cli' + + def test_not_a_package_xml(self): + with self.launch_pkg_command(arguments=['xml', 'not_a_package']) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=2) + assert pkg_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Package not found'], + text=pkg_command.output, + strict=True + ) + + def test_create_package(self): + with tempfile.TemporaryDirectory() as tmpdir: + with self.launch_pkg_command( + arguments=[ + 'create', 'a_test_package', + '--package-format', '3', + '--description', 'A test package dummy description', + '--license', 'Apache License 2.0', + '--build-type', 'ament_cmake', + '--dependencies', 'ros2pkg', + '--maintainer-email', 'nobody@nowhere.com', + '--maintainer-name', 'Nobody', + '--node-name', 'test_node', + '--library-name', 'test_library' + ], cwd=tmpdir + ) as pkg_command: + assert pkg_command.wait_for_shutdown(timeout=5) + assert pkg_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'going to create a new package', + 'package name: a_test_package', + 'destination directory: ' + tmpdir, + 'package format: 3', + 'version: 0.0.0', + 'description: A test package dummy description', + "maintainer: ['Nobody ']", + "licenses: ['Apache License 2.0']", + 'build type: ament_cmake', + "dependencies: ['ros2pkg']", + 'node_name: test_node', + 'library_name: test_library', + 'creating folder ./a_test_package', + 'creating ./a_test_package/package.xml', + 'creating source and include folder', + 'creating folder ./a_test_package/src', + 'creating folder ./a_test_package/include/a_test_package', + 'creating ./a_test_package/CMakeLists.txt', + 'creating ./a_test_package/src/test_node.cpp', + 'creating ./a_test_package/include/a_test_package/test_library.hpp', + 'creating ./a_test_package/src/test_library.cpp', + 'creating ./a_test_package/include/a_test_package/visibility_control.h', + ], + text=pkg_command.output, + strict=True + ) + # Check layout + assert os.path.isdir(os.path.join(tmpdir, 'a_test_package')) + assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'package.xml')) + assert os.path.isfile(os.path.join(tmpdir, 'a_test_package', 'CMakeLists.txt')) + assert os.path.isfile( + os.path.join(tmpdir, 'a_test_package', 'src', 'test_node.cpp') + ) + assert os.path.isfile( + os.path.join(tmpdir, 'a_test_package', 'src', 'test_library.cpp') + ) + assert os.path.isfile(os.path.join( + tmpdir, 'a_test_package', 'include', 'a_test_package', 'test_library.hpp' + )) + assert os.path.isfile(os.path.join( + tmpdir, 'a_test_package', 'include', 'a_test_package', 'visibility_control.h' + )) + # Check package.xml + tree = ET.parse(os.path.join(tmpdir, 'a_test_package', 'package.xml')) + root = tree.getroot() + assert root.tag == 'package' + assert root.attrib['format'] == '3' + assert root.find('name').text == 'a_test_package' + assert root.find('description').text == 'A test package dummy description' + assert root.find('maintainer').text == 'Nobody' + assert root.find('maintainer').attrib['email'] == 'nobody@nowhere.com' + assert root.find('license').text == 'Apache License 2.0' + assert root.find('depend').text == 'ros2pkg' + assert root.find('.//build_type').text == 'ament_cmake' diff --git a/ros2service/package.xml b/ros2service/package.xml index 49eb2ba31..cbad47503 100644 --- a/ros2service/package.xml +++ b/ros2service/package.xml @@ -21,6 +21,8 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing + test_msgs ament_python diff --git a/ros2service/test/fixtures/echo_server.py b/ros2service/test/fixtures/echo_server.py new file mode 100644 index 000000000..ca3584d7c --- /dev/null +++ b/ros2service/test/fixtures/echo_server.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node + +from test_msgs.srv import BasicTypes + + +class EchoServer(Node): + + def __init__(self): + super().__init__('echo_server') + self.server = self.create_service(BasicTypes, 'echo', self.callback) + + def callback(self, request, response): + for field_name in request.get_fields_and_field_types(): + setattr(response, field_name, getattr(request, field_name)) + return response + + +def main(args=None): + rclpy.init(args=args) + + node = EchoServer() + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('server stopped cleanly') + except BaseException: + print('exception in server:', file=sys.stderr) + raise + finally: + # Destroy the node explicitly + # (optional - Done automatically when node is garbage collected) + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2service/test/test_cli.py b/ros2service/test/test_cli.py new file mode 100644 index 000000000..c1bbb5a77 --- /dev/null +++ b/ros2service/test/test_cli.py @@ -0,0 +1,314 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import itertools +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + +from test_msgs.srv import BasicTypes + + +def get_echo_call_output(**kwargs): + request = BasicTypes.Request() + for field_name, field_value in kwargs.items(): + setattr(request, field_name, field_value) + response = BasicTypes.Response() + for field_name, field_value in kwargs.items(): + setattr(response, field_name, field_value) + return [ + 'requester: making request: ' + repr(request), + '', + 'response:', + repr(response), + '' + ] + + +# TODO(hidmic): check Opensplice service types +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_echo_server_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'echo_server.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_echo_server_script], + node_name='echo_server', + node_namespace='my_ns', + additional_env=additional_env, + ), + Node( + node_executable=sys.executable, + arguments=[path_to_echo_server_script], + node_name='_hidden_echo_server', + node_namespace='my_ns', + remappings=[('echo', '_echo')], + additional_env=additional_env, + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2ServiceCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_service_command(self, arguments): + service_command_action = ExecuteProcess( + cmd=['ros2', 'service', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2service-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, service_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_prefixes=[ + 'waiting for service to become available...', + '/launch_ros' # cope with launch_ros internal node. + ], + filtered_rmw_implementation=rmw_implementation + ) + ) as service_command: + yield service_command + cls.launch_service_command = launch_service_command + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_services(self): + with self.launch_service_command(arguments=['list']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/echo'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_hidden(self): + with self.launch_service_command( + arguments=['--include-hidden-services', 'list'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/_echo'], + itertools.repeat(re.compile( + r'/my_ns/_hidden_echo_server/.*parameter.*' + ), 6), + ['/my_ns/echo'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_service_command(arguments=['list', '-t']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=itertools.chain( + ['/my_ns/echo [test_msgs/srv/BasicTypes]'], + itertools.repeat(re.compile( + r'/my_ns/echo_server/.*parameter.*' + r' \[rcl_interfaces/srv/.*Parameter.*\]' + ), 6) + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_service_command(arguments=['list', '-c']) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = service_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 7 + 6 # cope with launch_ros internal node. + + @launch_testing.markers.retry_on_failure(times=5) + def test_find(self): + with self.launch_service_command( + arguments=['find', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/my_ns/echo'], + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_hidden(self): + with self.launch_service_command( + arguments=['find', '--include-hidden-services', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/my_ns/_echo', '/my_ns/echo'], + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_count(self): + with self.launch_service_command( + arguments=['find', '-c', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = service_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 1 + + def test_find_not_a_service_type(self): + with self.launch_service_command( + arguments=['find', 'not_a_service_type'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert service_command.output == '' + + def test_type(self): + with self.launch_service_command( + arguments=['type', '/my_ns/echo'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['test_msgs/srv/BasicTypes'], + text=service_command.output, + strict=True + ) + + def test_type_on_not_a_service(self): + with self.launch_service_command( + arguments=['type', '/not_a_service'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == 1 + assert service_command.output == '' + + @launch_testing.markers.retry_on_failure(times=5) + def test_call_no_args(self): + with self.launch_service_command( + arguments=['call', '/my_ns/echo', 'test_msgs/srv/BasicTypes'] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_echo_call_output(), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_call(self): + with self.launch_service_command( + arguments=[ + 'call', + '/my_ns/echo', + 'test_msgs/srv/BasicTypes', + '{bool_value: false, int32_value: -1, float64_value: 0.1, string_value: bazbar}' + ] + ) as service_command: + assert service_command.wait_for_shutdown(timeout=10) + assert service_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=get_echo_call_output( + bool_value=False, + int32_value=-1, + float64_value=0.1, + string_value='bazbar' + ), + text=service_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_repeated_call(self): + with self.launch_service_command( + arguments=[ + 'call', + '-r', '1', + '/my_ns/echo', + 'test_msgs/srv/BasicTypes', + '{bool_value: true, int32_value: 1, float64_value: 1.0, string_value: foobar}' + ], + ) as service_command: + assert service_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=2 * get_echo_call_output( + bool_value=True, int32_value=1, float64_value=1.0, string_value='foobar' + ) + ), timeout=10) diff --git a/ros2srv/package.xml b/ros2srv/package.xml index 928fe3413..30cdc8672 100644 --- a/ros2srv/package.xml +++ b/ros2srv/package.xml @@ -18,6 +18,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing std_msgs std_srvs diff --git a/ros2srv/test/test_cli.py b/ros2srv/test/test_cli.py index de0906840..2c9e0edd9 100644 --- a/ros2srv/test/test_cli.py +++ b/ros2srv/test/test_cli.py @@ -1,4 +1,4 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. +# Copyright 2019 Open Source Robotics Foundation, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,47 +12,166 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess - - -def test_cli(): - packages_cmd = ['ros2', 'srv', 'packages'] - packages_result = subprocess.run(packages_cmd, stdout=subprocess.PIPE, check=True) - package_names = packages_result.stdout.decode().splitlines() - - # explicit dependencies of this package will for sure be available - assert 'std_srvs' in package_names - - count = 0 - for package_name in package_names: - package_cmd = ['ros2', 'srv', 'package', package_name] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE, check=True) - service_types = package_result.stdout.decode().splitlines() - assert all(t.startswith(package_name + '/') for t in service_types) - count += len(service_types) - - if package_name != 'std_srvs': - continue - for service_name in [t[len(package_name) + 1:] for t in service_types]: - show_cmd = [ - 'ros2', 'srv', 'show', package_name + '/' + service_name] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE, check=True) - if service_name == 'Empty': - assert show_result.stdout.rstrip() == b'---' - - list_cmd = ['ros2', 'srv', 'list'] - list_result = subprocess.run(list_cmd, stdout=subprocess.PIPE, check=True) - service_types = list_result.stdout.decode().splitlines() - assert len(service_types) == count - - package_cmd = ['ros2', 'srv', 'package', '_not_existing_package_name'] - package_result = subprocess.run( - package_cmd, stdout=subprocess.PIPE) - assert package_result.returncode - - show_cmd = ['ros2', 'srv', 'show', 'std_srvs/_not_existing_service_name'] - show_result = subprocess.run( - show_cmd, stdout=subprocess.PIPE) - assert show_result.returncode +import contextlib +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(ready_fn): + return LaunchDescription([OpaqueFunction(function=lambda context: ready_fn())]) + + +some_services_from_std_srvs = [ + 'std_srvs/srv/Empty', + 'std_srvs/srv/SetBool', + 'std_srvs/srv/Trigger', +] + + +class TestROS2SrvCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output + ): + @contextlib.contextmanager + def launch_srv_command(self, arguments): + srv_command_action = ExecuteProcess( + cmd=['ros2', 'srv', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2srv-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, srv_command_action, proc_info, proc_output + ) as srv_command: + yield srv_command + cls.launch_srv_command = launch_srv_command + + def test_list_service_types(self): + with self.launch_srv_command(arguments=['list']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = srv_command.output.splitlines() + assert all(srv in output_lines for srv in some_services_from_std_srvs) + assert all(re.match(r'.*/srv/.*', line) is not None for line in output_lines) + + def test_list_service_types_in_a_package(self): + with self.launch_srv_command(arguments=['package', 'std_srvs']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = srv_command.output.splitlines() + assert all(srv in output_lines for srv in some_services_from_std_srvs) + assert all(re.match(r'std_srvs/srv/.*', line) is not None for line in output_lines) + + def test_list_service_types_in_not_a_package(self): + with self.launch_srv_command(arguments=['package', 'not_a_package']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=srv_command.output + ) + + def test_list_packages_with_service_types(self): + with self.launch_srv_command(arguments=['packages']) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert 'std_srvs' in srv_command.output.splitlines() + + def test_show_service_type(self): + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/SetBool'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'bool data', + '---', + 'bool success', + 'string message' + ], + text=srv_command.output, + strict=False + ) + + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/Trigger'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '---', + 'bool success', + 'string message' + ], + text=srv_command.output, + strict=False + ) + + def test_show_not_a_service_typename(self): + with self.launch_srv_command( + arguments=['show', 'std_srvs/srv/NotAServiceTypeName'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown service name'], + text=srv_command.output, + strict=True + ) + + # TODO(hidmic): make 'ros2 srv show' more robust + # def test_show_not_a_service_ns(self): + # with self.launch_srv_command( + # arguments=['show', 'std_srvs/foo/Empty'] + # ) as srv_command: + # assert srv_command.wait_for_shutdown(timeout=10) + # assert srv_command.exit_code == 1 + # assert launch_testing.tools.expect_output( + # expected_lines=['Unknown service name'], + # text=srv_command.output, + # strict=True + # ) + + def test_show_not_a_package(self): + with self.launch_srv_command( + arguments=['show', 'not_a_package/srv/Empty'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Unknown package name'], + text=srv_command.output, + strict=True + ) + + def test_show_not_a_service_type(self): + with self.launch_srv_command( + arguments=['show', 'not_a_service_type'] + ) as srv_command: + assert srv_command.wait_for_shutdown(timeout=10) + assert srv_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['The passed service type is invalid'], + text=srv_command.output, + strict=True + ) diff --git a/ros2topic/package.xml b/ros2topic/package.xml index 5d0ee366c..f3149fd50 100644 --- a/ros2topic/package.xml +++ b/ros2topic/package.xml @@ -22,7 +22,11 @@ ament_flake8 ament_pep257 ament_xmllint + geometry_msgs python3-pytest + ros_testing + std_msgs + test_msgs ament_python diff --git a/ros2topic/test/fixtures/controller_node.py b/ros2topic/test/fixtures/controller_node.py new file mode 100644 index 000000000..1dd926597 --- /dev/null +++ b/ros2topic/test/fixtures/controller_node.py @@ -0,0 +1,58 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from geometry_msgs.msg import TwistStamped + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + + +class ControllerNode(Node): + + def __init__(self): + super().__init__('controller') + self.pub = self.create_publisher( + TwistStamped, 'cmd_vel', qos_profile_system_default + ) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + msg = TwistStamped() + msg.header.stamp = self.get_clock().now().to_msg() + msg.twist.linear.x = 1.0 + self.pub.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + + node = ControllerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('controller stopped cleanly') + except BaseException: + print('exception in controller:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/fixtures/listener_node.py b/ros2topic/test/fixtures/listener_node.py new file mode 100644 index 000000000..8ee4fddfa --- /dev/null +++ b/ros2topic/test/fixtures/listener_node.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default + +from std_msgs.msg import String + + +class ListenerNode(Node): + + def __init__(self): + super().__init__('listener') + self.sub = self.create_subscription( + String, 'chatter', self.callback, qos_profile_system_default + ) + + def callback(self, msg): + self.get_logger().info('I heard: [%s]' % msg.data) + + +def main(args=None): + rclpy.init(args=args) + + node = ListenerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('listener stopped cleanly') + except BaseException: + print('exception in listener:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/fixtures/repeater_node.py b/ros2topic/test/fixtures/repeater_node.py new file mode 100644 index 000000000..8fd4a41a6 --- /dev/null +++ b/ros2topic/test/fixtures/repeater_node.py @@ -0,0 +1,73 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys + +import rclpy +from rclpy.node import Node +from rclpy.qos import qos_profile_system_default +from rclpy.utilities import remove_ros_args + +from ros2topic.api import import_message_type + + +class RepeaterNode(Node): + + def __init__(self, message_type): + super().__init__('repeater_node') + self.message_type = message_type + self.pub = self.create_publisher( + self.message_type, '~/output', qos_profile_system_default + ) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + self.pub.publish(self.message_type()) + + +def message_type(message_typename): + return import_message_type('~/output', message_typename) + + +def parse_arguments(args=None): + parser = argparse.ArgumentParser() + parser.add_argument( + 'message_type', type=message_type, + help='Message type for the repeater to publish.' + ) + return parser.parse_args(args=remove_ros_args(args)) + + +def main(args=None): + parsed_args = parse_arguments(args=args) + + rclpy.init(args=args) + + node = RepeaterNode(message_type=parsed_args.message_type) + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('repeater stopped cleanly') + except BaseException: + print('exception in repeater:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main(args=sys.argv[1:]) diff --git a/ros2topic/test/fixtures/talker_node.py b/ros2topic/test/fixtures/talker_node.py new file mode 100644 index 000000000..a29f6ecdc --- /dev/null +++ b/ros2topic/test/fixtures/talker_node.py @@ -0,0 +1,54 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +import rclpy +from rclpy.node import Node + +from std_msgs.msg import String + + +class TalkerNode(Node): + + def __init__(self): + super().__init__('talker_node') + self.count = 1 + self.pub = self.create_publisher(String, 'chatter', 10) + self.tmr = self.create_timer(1.0, self.callback) + + def callback(self): + self.pub.publish(String(data='Hello World: {0}'.format(self.count))) + self.count += 1 + + +def main(args=None): + rclpy.init(args=args) + + node = TalkerNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('talker stopped cleanly') + except BaseException: + print('exception in talker:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py new file mode 100644 index 000000000..228283a64 --- /dev/null +++ b/ros2topic/test/test_cli.py @@ -0,0 +1,652 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import math +import os +import re +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures') + additional_env = { + 'RMW_IMPLEMENTATION': rmw_implementation, 'PYTHONUNBUFFERED': '1' + } + + path_to_talker_node_script = os.path.join(path_to_fixtures, 'talker_node.py') + path_to_listener_node_script = os.path.join(path_to_fixtures, 'listener_node.py') + + hidden_talker_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_talker_node_script], + remappings=[('chatter', '_hidden_chatter')], + additional_env=additional_env + ) + talker_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_talker_node_script], + additional_env=additional_env + ) + listener_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_listener_node_script], + remappings=[('chatter', 'chit_chatter')], + additional_env=additional_env + ) + + path_to_repeater_node_script = os.path.join(path_to_fixtures, 'repeater_node.py') + + array_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_repeater_node_script, 'test_msgs/msg/Arrays'], + node_name='array_repeater', + remappings=[('/array_repeater/output', '/arrays')], + output='screen', + additional_env=additional_env + ) + defaults_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_repeater_node_script, 'test_msgs/msg/Defaults'], + node_name='defaults_repeater', + remappings=[('/defaults_repeater/output', '/defaults')], + additional_env=additional_env, + ) + bounded_sequences_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[ + path_to_repeater_node_script, 'test_msgs/msg/BoundedSequences' + ], + node_name='bounded_sequences_repeater', + remappings=[('/bounded_sequences_repeater/output', '/bounded_sequences')], + additional_env=additional_env + ) + unbounded_sequences_repeater_node_action = Node( + node_executable=sys.executable, + arguments=[ + path_to_repeater_node_script, 'test_msgs/msg/UnboundedSequences' + ], + node_name='unbounded_sequences_repeater', + remappings=[('/unbounded_sequences_repeater/output', '/unbounded_sequences')], + additional_env=additional_env + ) + + path_to_controller_node_script = os.path.join(path_to_fixtures, 'controller_node.py') + + cmd_vel_controller_node_action = Node( + node_executable=sys.executable, + arguments=[path_to_controller_node_script], + additional_env=additional_env + ) + + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add talker/listener pair. + talker_node_action, + listener_node_action, + # Add hidden talker. + hidden_talker_node_action, + # Add topic repeaters. + array_repeater_node_action, + defaults_repeater_node_action, + bounded_sequences_repeater_node_action, + unbounded_sequences_repeater_node_action, + # Add stamped data publisher. + cmd_vel_controller_node_action, + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]), locals() + + +class TestROS2TopicCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation, + listener_node_action + ): + rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter( + filtered_patterns=['WARNING: topic .* does not appear to be published yet'], + filtered_rmw_implementation=rmw_implementation + ) + + @contextlib.contextmanager + def launch_topic_command(self, arguments): + topic_command_action = ExecuteProcess( + cmd=['ros2', 'topic', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2topic-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, topic_command_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) as topic_command: + yield topic_command + cls.launch_topic_command = launch_topic_command + + cls.listener_node = launch_testing.tools.ProcessProxy( + listener_node_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_topics(self): + with self.launch_topic_command(arguments=['list']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/arrays', + '/bounded_sequences', + '/chatter', + '/chit_chatter', + '/cmd_vel', + '/defaults', + '/parameter_events', + '/rosout', + '/unbounded_sequences', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_topics(self): + with self.launch_topic_command( + arguments=['list', '--include-hidden-topics'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/_hidden_chatter', + '/arrays', + '/bounded_sequences', + '/chatter', + '/chit_chatter', + '/cmd_vel', + '/defaults', + '/parameter_events', + '/rosout', + '/unbounded_sequences', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_with_types(self): + with self.launch_topic_command(arguments=['list', '-t']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + '/arrays [test_msgs/msg/Arrays]', + '/bounded_sequences [test_msgs/msg/BoundedSequences]', + '/chatter [std_msgs/msg/String]', + '/chit_chatter [std_msgs/msg/String]', + '/cmd_vel [geometry_msgs/msg/TwistStamped]', + '/defaults [test_msgs/msg/Defaults]', + '/parameter_events [rcl_interfaces/msg/ParameterEvent]', + '/rosout [rcl_interfaces/msg/Log]', + '/unbounded_sequences [test_msgs/msg/UnboundedSequences]', + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_list_count(self): + with self.launch_topic_command(arguments=['list', '-c']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + output_lines = topic_command.output.splitlines() + assert len(output_lines) == 1 + assert int(output_lines[0]) == 9 + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_info(self): + with self.launch_topic_command(arguments=['info', '/chatter']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Topic: /chatter', + 'Publisher count: 1', + 'Subscriber count: 0' + ], + text=topic_command.output, + strict=True + ) + + def test_info_on_unknown_topic(self): + with self.launch_topic_command(arguments=['info', '/unknown_topic']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Topic: /unknown_topic', + 'Publisher count: 0', + 'Subscriber count: 0' + ], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_type(self): + with self.launch_topic_command(arguments=['type', '/chatter']) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['std_msgs/msg/String'], + text=topic_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_hidden_topic_type(self): + with self.launch_topic_command( + arguments=['type', '/_hidden_chatter'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=10) + assert topic_command.exit_code == 1 + assert topic_command.output == '' + + @launch_testing.markers.retry_on_failure(times=5) + def test_find_topic_type(self): + with self.launch_topic_command( + arguments=['find', 'rcl_interfaces/msg/Log'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=2) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['/rosout'], text=topic_command.output, strict=True + ) + + def test_find_not_a_topic_typename(self): + with self.launch_topic_command( + arguments=['find', 'rcl_interfaces/msg/NotAMessageTypeName'] + ) as topic_command: + assert topic_command.wait_for_shutdown(timeout=2) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + assert not topic_command.output + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '/chatter'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r"data: 'Hello World: \d+'"), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_str_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--no-str', '/chatter'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r"data: '>'"), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_csv_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--csv', '/defaults'] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "True,b'2',100,1.125,1.125,-50,200,-1000,2000,-30000,60000,-40000000,50000000" + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_array_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/arrays'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + "basic_types_values: ''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=False + ), timeout=5), 'Output does not match: ' + topic_command.output + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_seq_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/unbounded_sequences'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + "basic_types_values: ''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=True + ), timeout=5) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_no_arr_topic_echo_on_bounded_seq_message(self): + with self.launch_topic_command( + arguments=['echo', '--no-arr', '/bounded_sequences'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + "bool_values: ''", + "byte_values: ''", + "char_values: ''", + "float32_values: ''", + "float64_values: ''", + "int8_values: ''", + "uint8_values: ''", + "int16_values: ''", + "uint16_values: ''", + "int32_values: ''", + "uint32_values: ''", + "int64_values: ''", + "uint64_values: ''", + "string_values: ''", + 'basic_types_values: ' + "''", + "constants_values: ''", + "defaults_values: ''", + "bool_values_default: ''", + "byte_values_default: ''", + "char_values_default: ''", + "float32_values_default: ''", + "float64_values_default: ''", + "int8_values_default: ''", + "uint8_values_default: ''", + "int16_values_default: ''", + "uint16_values_default: ''", + "int32_values_default: ''", + "uint32_values_default: ''", + "int64_values_default: ''", + "uint64_values_default: ''", + "string_values_default: ''", + 'alignment_check: 0', + '---' + ], strict=True + ), timeout=5) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_truncate_length_topic_echo(self): + with self.launch_topic_command( + arguments=['echo', '--truncate-length', '5', '/chatter'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'data: Hello...'), + '---' + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + def test_topic_pub(self): + with self.launch_topic_command( + arguments=['pub', '/chit_chatter', 'std_msgs/msg/String', '{data: foo}'], + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='foo')", + '' + ], strict=True + ), timeout=10) + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [foo]' + ] * 3, strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + def test_topic_pub_once(self): + with self.launch_topic_command( + arguments=[ + 'pub', '--once', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: bar}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='bar')", + '' + ], strict=True + ), timeout=5) + assert topic_command.wait_for_shutdown(timeout=5) + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [bar]' + ], strict=True + ), timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + + def test_topic_pub_print_every_two(self): + with self.launch_topic_command( + arguments=[ + 'pub', + '-p', '2', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: fizz}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #2: std_msgs.msg.String(data='fizz')", + '', + "publishing #4: std_msgs.msg.String(data='fizz')", + '' + ], strict=True + ), timeout=6), 'Output does not match: ' + topic_command.output + self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + '[INFO] [listener]: I heard: [fizz]' + ], strict=True + ), timeout=5) + assert topic_command.wait_for_shutdown(timeout=5) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_delay(self): + average_delay_line_pattern = re.compile(r'average delay: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command(arguments=['delay', '/cmd_vel']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_delay_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + head_line = topic_command.output.splitlines()[0] + average_delay = float(average_delay_line_pattern.match(head_line).group(1)) + assert math.isclose(average_delay, 0.0, abs_tol=10e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_hz(self): + average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command(arguments=['hz', '/chatter']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_rate_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) + + head_line = topic_command.output.splitlines()[0] + average_rate = float(average_rate_line_pattern.match(head_line).group(1)) + assert math.isclose(average_rate, 1., rel_tol=1e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_filtered_topic_hz(self): + average_rate_line_pattern = re.compile(r'average rate: (\d+.\d{3})') + stats_line_pattern = re.compile( + r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' + ) + with self.launch_topic_command( + arguments=[ + 'hz', + '--filter', + 'int(m.data.rpartition(\":\")[-1]) % 2 == 0', + '/chatter' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + average_rate_line_pattern, stats_line_pattern + ], strict=True + ), timeout=10), 'Output does not match: ' + topic_command.output + assert topic_command.wait_for_shutdown(timeout=5) + + head_line = topic_command.output.splitlines()[0] + average_rate = float(average_rate_line_pattern.match(head_line).group(1)) + assert math.isclose(average_rate, 0.5, rel_tol=1e-3) + + @launch_testing.markers.retry_on_failure(times=5) + def test_topic_bw(self): + with self.launch_topic_command(arguments=['bw', '/defaults']) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'Subscribed to [/defaults]', + re.compile(r'average: \d{2}\.\d{2}B/s'), + re.compile( + r'\s*mean: \d{2}\.\d{2}B/s min: \d{2}\.\d{2}B/s' + r' max: \d{2}\.\d{2}B/s window: \d+' + ) + ], strict=True + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=5) diff --git a/test_ros2cli/COLCON_IGNORE b/test_ros2cli/COLCON_IGNORE new file mode 100644 index 000000000..e69de29bb diff --git a/test_ros2cli/test/ros2action_test_configuration.py b/test_ros2cli/test/ros2action_test_configuration.py index a6f9f5e16..a640ce41a 100644 --- a/test_ros2cli/test/ros2action_test_configuration.py +++ b/test_ros2cli/test/ros2action_test_configuration.py @@ -195,3 +195,4 @@ def get_test_configurations(rmw_implementation): exit_codes=[1] ), ] + diff --git a/test_ros2cli/test/ros2topic_test_configuration.py b/test_ros2cli/test/ros2topic_test_configuration.py index c9dd60a68..43257aa37 100644 --- a/test_ros2cli/test/ros2topic_test_configuration.py +++ b/test_ros2cli/test/ros2topic_test_configuration.py @@ -155,12 +155,6 @@ def get_test_configurations(rmw_implementation): fixture_actions=[get_talker_node_action()], expected_output=None, ), - CLITestConfiguration( - command='topic', - arguments=['find', 'rcl_interfaces/msg/NotAMessageType'], - fixture_actions=[get_talker_node_action()], - expected_output=None, - ), CLITestConfiguration( command='topic', arguments=['echo', '/my_ns/chatter'], @@ -385,7 +379,7 @@ def get_test_configurations(rmw_implementation): ), CLITestConfiguration( command='topic', - arguments=['pub', '--once', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test}'], + arguments=['pub', '--once', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test_once}'], expected_output=[ 'publisher: beginning loop', re.compile(r"publishing #1: std_msgs\.msg\.String\(data='test'\)"),