From 5f52b76605b68036b09fb838dfb6f5e48d8494b0 Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Tue, 29 Oct 2019 16:57:57 -0300 Subject: [PATCH 1/7] Add automation tests for ros2 param cli Signed-off-by: Brian Ezequiel Marchi --- ros2param/package.xml | 1 + ros2param/test/fixtures/param_node.py | 56 ++ ros2param/test/test_cli.py | 796 ++++++++++++++++++++++++++ 3 files changed, 853 insertions(+) create mode 100644 ros2param/test/fixtures/param_node.py create mode 100644 ros2param/test/test_cli.py diff --git a/ros2param/package.xml b/ros2param/package.xml index e860fef0b..a9a7a2962 100644 --- a/ros2param/package.xml +++ b/ros2param/package.xml @@ -21,6 +21,7 @@ ament_pep257 ament_xmllint python3-pytest + ros_testing ament_python diff --git a/ros2param/test/fixtures/param_node.py b/ros2param/test/fixtures/param_node.py new file mode 100644 index 000000000..3acf0ca26 --- /dev/null +++ b/ros2param/test/fixtures/param_node.py @@ -0,0 +1,56 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from rcl_interfaces.msg import ParameterDescriptor +import rclpy +from rclpy.node import Node + + +class ParamNode(Node): + + def __init__(self): + super().__init__('param_node') + self.declare_parameter('bool_param', True, ParameterDescriptor()) + self.declare_parameter('int_param', 42, ParameterDescriptor()) + self.declare_parameter('double_param', 1.23, ParameterDescriptor()) + self.declare_parameter('str_param', 'Hello World', ParameterDescriptor()) + self.declare_parameter('int_array', [1, 2, 3], ParameterDescriptor()) + self.declare_parameter('double_array', [1.0, 2.0, 3.0], ParameterDescriptor()) + self.declare_parameter('bool_array', [True, False, True], ParameterDescriptor()) + self.declare_parameter('str_array', ['Hello', 'World'], ParameterDescriptor()) + self.declare_parameter('byte_array', [b'p', b'v'], ParameterDescriptor()) + self.declare_parameter('parameter_with_no_value', None, ParameterDescriptor()) + + +def main(args=None): + rclpy.init(args=args) + + node = ParamNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('node stopped cleanly') + except BaseException: + print('exception in node:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py new file mode 100644 index 000000000..672c47222 --- /dev/null +++ b/ros2param/test/test_cli.py @@ -0,0 +1,796 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + +PARAM_NODE_PARAMETER_LIST = """\ + bool_array + bool_param + byte_array + double_array + double_param + int_array + int_param + parameter_with_no_value + str_array + str_param + use_sim_time +""" + +PARAM_NODE_BOOL_PARAMETER_LIST = """\ + bool_array + bool_param +""" + +ALL_NODES_PARAMETER_LIST = """\ +/_hidden_param_node: + bool_array + bool_param + byte_array + double_array + double_param + int_array + int_param + parameter_with_no_value + str_array + str_param + use_sim_time +/launch_ros: + use_sim_time +/param_node: + bool_array + bool_param + byte_array + double_array + double_param + int_array + int_param + parameter_with_no_value + str_array + str_param + use_sim_time +""" + +DESCRIBE_PARAMETER_TYPE = """\ +Parameter name: {} + Type: {} + Constraints:{} +""" + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_param_node_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'param_node.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='param_node', + additional_env=additional_env + ), + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='_hidden_param_node', + additional_env=additional_env + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2ParamCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_node_command(self, arguments): + node_command_action = ExecuteProcess( + cmd=['ros2', 'param', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2param-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, node_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + # ignore launch_ros and ros2cli daemon nodes + filtered_patterns=['.*ros2cli.*'], + filtered_rmw_implementation=rmw_implementation + ) + ) as node_command: + yield node_command + cls.launch_node_command = launch_node_command + + # Set verb tests + @launch_testing.markers.retry_on_failure(times=3) + def test_set_invalid_param_existing_node(self): + with self.launch_node_command( + arguments=['set', '/param_node', 'unexistent_parameter', '1']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_valid_param_existing_node(self): + with self.launch_node_command( + arguments=['set', '/param_node', 'double_param', '3.14']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Set parameter successful'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_invalid_param_hidden_node_no_hidden_argument(self): + with self.launch_node_command( + arguments=[ + 'set', '/_hidden_param_node', 'unexistent_parameter', '1']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_invalid_param_hidden_node_hidden_argument(self): + with self.launch_node_command( + arguments=[ + 'set', '/_hidden_param_node', + 'unexistent_parameter', '1', '--include-hidden-nodes']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_inexisting_node(self): + with self.launch_node_command( + arguments=['set', '/foo/unexisting_node', 'test_param', '3.14']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + # Get verb tests without --hide-type flag + @launch_testing.markers.retry_on_failure(times=3) + def test_get_invalid_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'unexistent_parameter']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Parameter not set'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_double_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'double_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Double value is: 1.23'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_boolean_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'bool_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Boolean value is: True'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_int_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Integer value is: 42'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_string_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'str_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['String value is: Hello World'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_string_array_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'str_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["String values are: ['Hello', 'World']"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_int_array_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'int_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["Integer values are: array('q', [1, 2, 3])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_double_array_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'double_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["Double values are: array('d', [1.0, 2.0, 3.0])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_boolean_array_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'bool_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Boolean values are: [True, False, True]'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_byte_array_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'byte_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["Byte values are: [b'p', b'v']"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_unset_param_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'parameter_with_no_value']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Parameter not set.'], + text=node_command.output, + strict=True + ) + + # Get verb tests with --hide-type flag + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_double_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'double_param', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['1.23'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_boolean_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'bool_param', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['True'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_int_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'int_param', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['42'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_string_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'str_param', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Hello World'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_string_array_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'str_array', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["['Hello', 'World']"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_int_array_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'int_array', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["array('q', [1, 2, 3])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_double_array_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'double_array', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["array('d', [1.0, 2.0, 3.0])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_boolean_array_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'bool_array', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['[True, False, True]'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_valid_byte_array_param_hide_type_existing_node(self): + with self.launch_node_command( + arguments=['get', '/param_node', 'byte_array', '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=["[b'p', b'v']"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_inexisting_node(self): + with self.launch_node_command( + arguments=['get', '/foo/unexisting_node', 'test_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_hidden_node_with_no_hidden_argument(self): + with self.launch_node_command( + arguments=['get', '/_hidden_param_node', 'test_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_get_hidden_node_with_hidden_argument(self): + with self.launch_node_command( + arguments=[ + 'get', '/_hidden_param_node', 'int_param', + '--include-hidden-nodes']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Integer value is: 42'], + text=node_command.output, + strict=True + ) + + # List verb tests + @launch_testing.markers.retry_on_failure(times=3) + def test_list_existing_node(self): + with self.launch_node_command( + arguments=['list', '/param_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=PARAM_NODE_PARAMETER_LIST, + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_list_unexisting_node(self): + with self.launch_node_command( + arguments=['list', '/unexisting_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_list_hidden_existing_node_no_hidden_flag(self): + with self.launch_node_command( + arguments=['list', '/_hidden_param_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_list_hidden_existing_node_hidden_flag(self): + with self.launch_node_command( + arguments=[ + 'list', '/_hidden_param_node', '--include-hidden-nodes']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=PARAM_NODE_PARAMETER_LIST, + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_list_existing_node_bool_prefix(self): + with self.launch_node_command( + arguments=[ + 'list', '--param-prefixes=bool', '/param_node']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=PARAM_NODE_BOOL_PARAMETER_LIST, + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_list_all_nodes_all_parameters(self): + with self.launch_node_command( + arguments=['list', '--include-hidden-nodes']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=ALL_NODES_PARAMETER_LIST, + text=node_command.output, + strict=True + ) + + # Describe verb tests + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_int_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + print(node_command.output) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('int_param', 'integer', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_double_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'double_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('double_param', 'double', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_bool_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'bool_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_param', 'boolean', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_str_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'str_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('str_param', 'string', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_int_array_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'int_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('int_array', 'integer array', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_double_array_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'double_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('double_array', 'double array', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_bool_array_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'bool_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_array', 'boolean array', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_string_array_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'str_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('str_array', 'string array', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_byte_array_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'byte_array']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('byte_array', 'byte array', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_existing_node_not_set_param(self): + with self.launch_node_command( + arguments=['describe', '/param_node', 'parameter_with_no_value']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format('parameter_with_no_value', 'not set', ''), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_describe_unexisting_node(self): + with self.launch_node_command( + arguments=['describe', '/foo/unexisting_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_int_param_existing_node(self): + with self.launch_node_command( + arguments=['delete', '/param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Deleted parameter successfully'], + text=node_command.output, + strict=True + ) + with self.launch_node_command( + arguments=['get', '/param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Parameter not set.'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_int_param_hidden_node(self): + with self.launch_node_command( + arguments=['delete', '/_hidden_param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_text='Node not found', + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_int_param_hidden_node_with_hidden_flag(self): + with self.launch_node_command( + arguments=[ + 'delete', '--include-hidden-nodes', '/_hidden_param_node', + 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Deleted parameter successfully'], + text=node_command.output, + strict=True + ) + with self.launch_node_command( + arguments=['get', '/_hidden_param_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Parameter not set.'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_any_param_unexisting_node(self): + with self.launch_node_command( + arguments=['delete', '/foo/unexisting_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_text='Node not found', + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_unexistent_param_existent_node(self): + with self.launch_node_command( + arguments=['delete', '/param_node', 'foo_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=4) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Deleting parameter failed'], + text=node_command.output, + strict=True + ) From fa079b48bc020b6e924025622619a8e8a341bf10 Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Wed, 30 Oct 2019 11:29:15 -0300 Subject: [PATCH 2/7] Add delete verb test to another file to avoid node collisions Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/test_cli.py | 194 +++++++++--------------------- ros2param/test/test_cli_delete.py | 188 +++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 136 deletions(-) create mode 100644 ros2param/test/test_cli_delete.py diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py index 672c47222..04f8716de 100644 --- a/ros2param/test/test_cli.py +++ b/ros2param/test/test_cli.py @@ -164,7 +164,7 @@ def launch_node_command(self, arguments): def test_set_invalid_param_existing_node(self): with self.launch_node_command( arguments=['set', '/param_node', 'unexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ @@ -177,7 +177,7 @@ def test_set_invalid_param_existing_node(self): def test_set_valid_param_existing_node(self): with self.launch_node_command( arguments=['set', '/param_node', 'double_param', '3.14']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Set parameter successful'], @@ -190,7 +190,7 @@ def test_set_invalid_param_hidden_node_no_hidden_argument(self): with self.launch_node_command( arguments=[ 'set', '/_hidden_param_node', 'unexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -204,7 +204,7 @@ def test_set_invalid_param_hidden_node_hidden_argument(self): arguments=[ 'set', '/_hidden_param_node', 'unexistent_parameter', '1', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ @@ -217,7 +217,7 @@ def test_set_invalid_param_hidden_node_hidden_argument(self): def test_set_inexisting_node(self): with self.launch_node_command( arguments=['set', '/foo/unexisting_node', 'test_param', '3.14']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -230,7 +230,7 @@ def test_set_inexisting_node(self): def test_get_invalid_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'unexistent_parameter']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ @@ -243,7 +243,7 @@ def test_get_invalid_param_existing_node(self): def test_get_valid_double_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'double_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Double value is: 1.23'], @@ -255,7 +255,7 @@ def test_get_valid_double_param_existing_node(self): def test_get_valid_boolean_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'bool_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Boolean value is: True'], @@ -267,7 +267,7 @@ def test_get_valid_boolean_param_existing_node(self): def test_get_valid_int_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Integer value is: 42'], @@ -279,7 +279,7 @@ def test_get_valid_int_param_existing_node(self): def test_get_valid_string_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'str_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['String value is: Hello World'], @@ -291,7 +291,7 @@ def test_get_valid_string_param_existing_node(self): def test_get_valid_string_array_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'str_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["String values are: ['Hello', 'World']"], @@ -303,7 +303,7 @@ def test_get_valid_string_array_param_existing_node(self): def test_get_valid_int_array_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'int_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["Integer values are: array('q', [1, 2, 3])"], @@ -315,7 +315,7 @@ def test_get_valid_int_array_param_existing_node(self): def test_get_valid_double_array_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'double_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["Double values are: array('d', [1.0, 2.0, 3.0])"], @@ -327,7 +327,7 @@ def test_get_valid_double_array_param_existing_node(self): def test_get_valid_boolean_array_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'bool_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Boolean values are: [True, False, True]'], @@ -339,7 +339,7 @@ def test_get_valid_boolean_array_param_existing_node(self): def test_get_valid_byte_array_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'byte_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["Byte values are: [b'p', b'v']"], @@ -351,7 +351,7 @@ def test_get_valid_byte_array_param_existing_node(self): def test_get_valid_unset_param_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'parameter_with_no_value']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Parameter not set.'], @@ -364,7 +364,7 @@ def test_get_valid_unset_param_existing_node(self): def test_get_valid_double_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'double_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['1.23'], @@ -376,7 +376,7 @@ def test_get_valid_double_param_hide_type_existing_node(self): def test_get_valid_boolean_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'bool_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['True'], @@ -388,7 +388,7 @@ def test_get_valid_boolean_param_hide_type_existing_node(self): def test_get_valid_int_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'int_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['42'], @@ -400,7 +400,7 @@ def test_get_valid_int_param_hide_type_existing_node(self): def test_get_valid_string_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'str_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Hello World'], @@ -412,7 +412,7 @@ def test_get_valid_string_param_hide_type_existing_node(self): def test_get_valid_string_array_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'str_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["['Hello', 'World']"], @@ -424,7 +424,7 @@ def test_get_valid_string_array_param_hide_type_existing_node(self): def test_get_valid_int_array_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'int_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["array('q', [1, 2, 3])"], @@ -436,7 +436,7 @@ def test_get_valid_int_array_param_hide_type_existing_node(self): def test_get_valid_double_array_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'double_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["array('d', [1.0, 2.0, 3.0])"], @@ -448,7 +448,7 @@ def test_get_valid_double_array_param_hide_type_existing_node(self): def test_get_valid_boolean_array_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'bool_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['[True, False, True]'], @@ -460,7 +460,7 @@ def test_get_valid_boolean_array_param_hide_type_existing_node(self): def test_get_valid_byte_array_param_hide_type_existing_node(self): with self.launch_node_command( arguments=['get', '/param_node', 'byte_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=["[b'p', b'v']"], @@ -472,7 +472,7 @@ def test_get_valid_byte_array_param_hide_type_existing_node(self): def test_get_inexisting_node(self): with self.launch_node_command( arguments=['get', '/foo/unexisting_node', 'test_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -484,7 +484,7 @@ def test_get_inexisting_node(self): def test_get_hidden_node_with_no_hidden_argument(self): with self.launch_node_command( arguments=['get', '/_hidden_param_node', 'test_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -498,7 +498,7 @@ def test_get_hidden_node_with_hidden_argument(self): arguments=[ 'get', '/_hidden_param_node', 'int_param', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Integer value is: 42'], @@ -511,7 +511,7 @@ def test_get_hidden_node_with_hidden_argument(self): def test_list_existing_node(self): with self.launch_node_command( arguments=['list', '/param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=PARAM_NODE_PARAMETER_LIST, @@ -523,7 +523,7 @@ def test_list_existing_node(self): def test_list_unexisting_node(self): with self.launch_node_command( arguments=['list', '/unexisting_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -535,7 +535,7 @@ def test_list_unexisting_node(self): def test_list_hidden_existing_node_no_hidden_flag(self): with self.launch_node_command( arguments=['list', '/_hidden_param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], @@ -548,7 +548,7 @@ def test_list_hidden_existing_node_hidden_flag(self): with self.launch_node_command( arguments=[ 'list', '/_hidden_param_node', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=PARAM_NODE_PARAMETER_LIST, @@ -561,7 +561,7 @@ def test_list_existing_node_bool_prefix(self): with self.launch_node_command( arguments=[ 'list', '--param-prefixes=bool', '/param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=PARAM_NODE_BOOL_PARAMETER_LIST, @@ -569,25 +569,27 @@ def test_list_existing_node_bool_prefix(self): strict=True ) - @launch_testing.markers.retry_on_failure(times=3) - def test_list_all_nodes_all_parameters(self): - with self.launch_node_command( - arguments=['list', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=ALL_NODES_PARAMETER_LIST, - text=node_command.output, - strict=True - ) + # TODO(BMarchi): This test fails for opensplice only. It hangs + # in wait_for_shutdown method. It seems that it doesn't return + # after getting all the nodes with their parameters. + # @launch_testing.markers.retry_on_failure(times=3) + # def test_list_all_nodes_all_parameters(self): + # with self.launch_node_command( + # arguments=['list', '--include-hidden-nodes']) as node_command: + # assert node_command.wait_for_shutdown(timeout=5) + # assert node_command.exit_code == launch_testing.asserts.EXIT_OK + # assert launch_testing.tools.expect_output( + # expected_text=ALL_NODES_PARAMETER_LIST, + # text=node_command.output, + # strict=True + # ) # Describe verb tests @launch_testing.markers.retry_on_failure(times=3) def test_describe_existing_node_int_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - print(node_command.output) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('int_param', 'integer', ''), @@ -599,7 +601,7 @@ def test_describe_existing_node_int_param(self): def test_describe_existing_node_double_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'double_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('double_param', 'double', ''), @@ -611,7 +613,7 @@ def test_describe_existing_node_double_param(self): def test_describe_existing_node_bool_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'bool_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_param', 'boolean', ''), @@ -623,7 +625,7 @@ def test_describe_existing_node_bool_param(self): def test_describe_existing_node_str_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'str_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('str_param', 'string', ''), @@ -635,7 +637,7 @@ def test_describe_existing_node_str_param(self): def test_describe_existing_node_int_array_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'int_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('int_array', 'integer array', ''), @@ -647,7 +649,7 @@ def test_describe_existing_node_int_array_param(self): def test_describe_existing_node_double_array_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'double_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('double_array', 'double array', ''), @@ -659,7 +661,7 @@ def test_describe_existing_node_double_array_param(self): def test_describe_existing_node_bool_array_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'bool_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_array', 'boolean array', ''), @@ -671,7 +673,7 @@ def test_describe_existing_node_bool_array_param(self): def test_describe_existing_node_string_array_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'str_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('str_array', 'string array', ''), @@ -683,7 +685,7 @@ def test_describe_existing_node_string_array_param(self): def test_describe_existing_node_byte_array_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'byte_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('byte_array', 'byte array', ''), @@ -695,7 +697,7 @@ def test_describe_existing_node_byte_array_param(self): def test_describe_existing_node_not_set_param(self): with self.launch_node_command( arguments=['describe', '/param_node', 'parameter_with_no_value']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_text=DESCRIBE_PARAMETER_TYPE.format('parameter_with_no_value', 'not set', ''), @@ -707,90 +709,10 @@ def test_describe_existing_node_not_set_param(self): def test_describe_unexisting_node(self): with self.launch_node_command( arguments=['describe', '/foo/unexisting_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) + assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], text=node_command.output, strict=True ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_delete_int_param_existing_node(self): - with self.launch_node_command( - arguments=['delete', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Deleted parameter successfully'], - text=node_command.output, - strict=True - ) - with self.launch_node_command( - arguments=['get', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Parameter not set.'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_delete_int_param_hidden_node(self): - with self.launch_node_command( - arguments=['delete', '/_hidden_param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == 1 - assert launch_testing.tools.expect_output( - expected_text='Node not found', - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_delete_int_param_hidden_node_with_hidden_flag(self): - with self.launch_node_command( - arguments=[ - 'delete', '--include-hidden-nodes', '/_hidden_param_node', - 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Deleted parameter successfully'], - text=node_command.output, - strict=True - ) - with self.launch_node_command( - arguments=['get', '/_hidden_param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Parameter not set.'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_delete_any_param_unexisting_node(self): - with self.launch_node_command( - arguments=['delete', '/foo/unexisting_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == 1 - assert launch_testing.tools.expect_output( - expected_text='Node not found', - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_delete_unexistent_param_existent_node(self): - with self.launch_node_command( - arguments=['delete', '/param_node', 'foo_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=4) - assert node_command.exit_code == 1 - assert launch_testing.tools.expect_output( - expected_lines=['Deleting parameter failed'], - text=node_command.output, - strict=True - ) diff --git a/ros2param/test/test_cli_delete.py b/ros2param/test/test_cli_delete.py new file mode 100644 index 000000000..5dbb22271 --- /dev/null +++ b/ros2param/test/test_cli_delete.py @@ -0,0 +1,188 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_param_node_script = os.path.join( + os.path.dirname(__file__), 'fixtures', 'param_node.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='param_delete_node', + additional_env=additional_env + ), + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='_hidden_param_delete_node', + additional_env=additional_env + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2ParamDeleteCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_node_command(self, arguments): + node_command_action = ExecuteProcess( + cmd=['ros2', 'param', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2param-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, node_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + # ignore launch_ros and ros2cli daemon nodes + filtered_patterns=['.*ros2cli.*'], + filtered_rmw_implementation=rmw_implementation + ) + ) as node_command: + yield node_command + cls.launch_node_command = launch_node_command + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_int_param_existing_node(self): + with self.launch_node_command( + arguments=['delete', '/param_delete_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Deleted parameter successfully'], + text=node_command.output, + strict=True + ) + with self.launch_node_command( + arguments=['get', '/param_delete_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Parameter not set.'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_int_param_hidden_node(self): + with self.launch_node_command( + arguments=['delete', '/_hidden_param_delete_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_text='Node not found\n', + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_delete_int_param_hidden_node_with_hidden_flag(self): + with self.launch_node_command( + arguments=[ + 'delete', '--include-hidden-nodes', + '/_hidden_param_delete_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=8) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Deleted parameter successfully'], + text=node_command.output, + strict=True + ) + with self.launch_node_command( + arguments=[ + 'get', '--include-hidden-nodes', + '/_hidden_param_delete_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Parameter not set.'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_any_param_unexisting_node(self): + with self.launch_node_command( + arguments=['delete', '/foo/unexisting_node', 'int_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_text='Node not found\n', + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_delete_unexistent_param_existent_node(self): + with self.launch_node_command( + arguments=['delete', '/param_delete_node', 'foo_param']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Deleting parameter failed'], + text=node_command.output, + strict=True + ) From 44d8a727c809b486cc9d0c15ace742a00a8807a6 Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Tue, 5 Nov 2019 09:03:48 -0300 Subject: [PATCH 3/7] Address pr comments and improve get, set and describe verb tests Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/fixtures/param_node.py | 56 --- ros2param/test/param_node.py | 92 +++++ ros2param/test/test_cli.py | 508 ++++---------------------- ros2param/test/test_cli_delete.py | 2 +- ros2param/test/test_cli_set.py | 190 ++++++++++ 5 files changed, 353 insertions(+), 495 deletions(-) delete mode 100644 ros2param/test/fixtures/param_node.py create mode 100644 ros2param/test/param_node.py create mode 100644 ros2param/test/test_cli_set.py diff --git a/ros2param/test/fixtures/param_node.py b/ros2param/test/fixtures/param_node.py deleted file mode 100644 index 3acf0ca26..000000000 --- a/ros2param/test/fixtures/param_node.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -from rcl_interfaces.msg import ParameterDescriptor -import rclpy -from rclpy.node import Node - - -class ParamNode(Node): - - def __init__(self): - super().__init__('param_node') - self.declare_parameter('bool_param', True, ParameterDescriptor()) - self.declare_parameter('int_param', 42, ParameterDescriptor()) - self.declare_parameter('double_param', 1.23, ParameterDescriptor()) - self.declare_parameter('str_param', 'Hello World', ParameterDescriptor()) - self.declare_parameter('int_array', [1, 2, 3], ParameterDescriptor()) - self.declare_parameter('double_array', [1.0, 2.0, 3.0], ParameterDescriptor()) - self.declare_parameter('bool_array', [True, False, True], ParameterDescriptor()) - self.declare_parameter('str_array', ['Hello', 'World'], ParameterDescriptor()) - self.declare_parameter('byte_array', [b'p', b'v'], ParameterDescriptor()) - self.declare_parameter('parameter_with_no_value', None, ParameterDescriptor()) - - -def main(args=None): - rclpy.init(args=args) - - node = ParamNode() - - try: - rclpy.spin(node) - except KeyboardInterrupt: - print('node stopped cleanly') - except BaseException: - print('exception in node:', file=sys.stderr) - raise - finally: - node.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/ros2param/test/param_node.py b/ros2param/test/param_node.py new file mode 100644 index 000000000..f08eb525a --- /dev/null +++ b/ros2param/test/param_node.py @@ -0,0 +1,92 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from rcl_interfaces.msg import ParameterDescriptor +import rclpy +from rclpy.node import Node + +GET_VERB_PARAM_VALUES = { + 'bool_param': 'Boolean value is: {}\n', + 'int_param': 'Integer value is: {}\n', + 'double_param': 'Double value is: {}\n', + 'str_param': 'String value is: {}\n', + 'byte_array': 'Byte values are: {}\n', + 'bool_array': 'Boolean values are: {}\n', + 'int_array': "Integer values are: array('q', {})\n", + 'double_array': "Double values are: array('d', {})\n", + 'str_array': 'String values are: {}\n', + 'parameter_with_no_value': 'Parameter not set.{}\n' +} + + +class ParamNode(Node): + + @staticmethod + def get_node_parameters(): + return { + 'bool_param': True, + 'int_param': 42, + 'double_param': 1.23, + 'str_param': 'Hello World', + 'int_array': [1, 2, 3], + 'double_array': [1.0, 2.0, 3.0], + 'bool_array': [True, False, True], + 'str_array': ['Hello', 'World'], + 'byte_array': [b'p', b'v'], + 'parameter_with_no_value': None + } + + @staticmethod + def get_modified_parameters(): + return { + 'bool_param': False, + 'int_param': 43, + 'double_param': 1.20, + 'str_param': 'Hello World', + 'int_array': [4, 5, 6], + 'double_array': [4.0, 5.0, 6.0], + 'bool_array': [False, True, False], + 'str_array': ['World', 'Hello'], + 'byte_array': [b'v', b'p'], + 'parameter_with_no_value': None + } + + def __init__(self): + super().__init__('param_node') + parameters = ParamNode.get_node_parameters() + for param_key, param_value in parameters.items(): + self.declare_parameter(param_key, param_value, ParameterDescriptor()) + + +def main(args=None): + rclpy.init(args=args) + + node = ParamNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + print('node stopped cleanly') + except BaseException: + print('exception in node:', file=sys.stderr) + raise + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py index 04f8716de..6de2ec48b 100644 --- a/ros2param/test/test_cli.py +++ b/ros2param/test/test_cli.py @@ -29,6 +29,8 @@ import launch_testing.tools import launch_testing_ros.tools +import param_node + import pytest from rmw_implementation import get_available_rmw_implementations @@ -87,12 +89,25 @@ Constraints:{} """ +DESCRIBE_PARAMETER_NAME_TYPE = { + 'bool_param': 'boolean', + 'int_param': 'integer', + 'double_param': 'double', + 'str_param': 'string', + 'byte_array': 'byte array', + 'bool_array': 'boolean array', + 'int_array': 'integer array', + 'double_array': 'double array', + 'str_array': 'string array', + 'parameter_with_no_value': 'not set' +} + @pytest.mark.rostest @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) def generate_test_description(rmw_implementation, ready_fn): path_to_param_node_script = os.path.join( - os.path.dirname(__file__), 'fixtures', 'param_node.py' + os.path.dirname(__file__), 'param_node.py' ) additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} return LaunchDescription([ @@ -151,327 +166,51 @@ def launch_node_command(self, arguments): with launch_testing.tools.launch_process( launch_service, node_command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( - # ignore launch_ros and ros2cli daemon nodes + # ignore ros2cli daemon node filtered_patterns=['.*ros2cli.*'], filtered_rmw_implementation=rmw_implementation ) ) as node_command: yield node_command cls.launch_node_command = launch_node_command + cls.rmw_implementation = rmw_implementation + + def test_get_verb(self): + parameters = param_node.ParamNode.get_node_parameters() + for param_key, param_value in parameters.items(): + with self.launch_node_command( + arguments=['get', '/param_node', param_key]) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + if param_value is None: + param_value = '' + assert launch_testing.tools.expect_output( + expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format(str(param_value)), + text=node_command.output, + strict=True + ) - # Set verb tests - @launch_testing.markers.retry_on_failure(times=3) - def test_set_invalid_param_existing_node(self): - with self.launch_node_command( - arguments=['set', '/param_node', 'unexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=[ - "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_set_valid_param_existing_node(self): - with self.launch_node_command( - arguments=['set', '/param_node', 'double_param', '3.14']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Set parameter successful'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_set_invalid_param_hidden_node_no_hidden_argument(self): - with self.launch_node_command( - arguments=[ - 'set', '/_hidden_param_node', 'unexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 - assert launch_testing.tools.expect_output( - expected_lines=['Node not found'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_set_invalid_param_hidden_node_hidden_argument(self): - with self.launch_node_command( - arguments=[ - 'set', '/_hidden_param_node', - 'unexistent_parameter', '1', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=[ - "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_set_inexisting_node(self): - with self.launch_node_command( - arguments=['set', '/foo/unexisting_node', 'test_param', '3.14']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 - assert launch_testing.tools.expect_output( - expected_lines=['Node not found'], - text=node_command.output, - strict=True - ) - - # Get verb tests without --hide-type flag - @launch_testing.markers.retry_on_failure(times=3) - def test_get_invalid_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'unexistent_parameter']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=[ - 'Parameter not set'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_double_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'double_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Double value is: 1.23'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_boolean_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'bool_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Boolean value is: True'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_int_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Integer value is: 42'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_string_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'str_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['String value is: Hello World'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_string_array_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'str_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["String values are: ['Hello', 'World']"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_int_array_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'int_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["Integer values are: array('q', [1, 2, 3])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_double_array_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'double_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["Double values are: array('d', [1.0, 2.0, 3.0])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_boolean_array_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'bool_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Boolean values are: [True, False, True]'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_byte_array_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'byte_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["Byte values are: [b'p', b'v']"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_unset_param_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'parameter_with_no_value']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Parameter not set.'], - text=node_command.output, - strict=True - ) - - # Get verb tests with --hide-type flag - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_double_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'double_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['1.23'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_boolean_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'bool_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['True'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_int_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'int_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['42'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_string_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'str_param', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Hello World'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_string_array_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'str_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["['Hello', 'World']"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_int_array_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'int_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["array('q', [1, 2, 3])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_double_array_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'double_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["array('d', [1.0, 2.0, 3.0])"], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_boolean_array_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'bool_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['[True, False, True]'], - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_get_valid_byte_array_param_hide_type_existing_node(self): - with self.launch_node_command( - arguments=['get', '/param_node', 'byte_array', '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=["[b'p', b'v']"], - text=node_command.output, - strict=True - ) + def test_get_verb_hide_type(self): + parameters = param_node.ParamNode.get_node_parameters() + for param_key, param_value in parameters.items(): + with self.launch_node_command( + arguments=['get', '/param_node', param_key, '--hide-type']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + if param_value is None: + param_value = ':None\n' + assert launch_testing.tools.expect_output( + expected_lines=[ + param_node.GET_VERB_PARAM_VALUES[param_key].format( + str(param_value)).split(':')[1].strip()], + text=node_command.output, + strict=True + ) @launch_testing.markers.retry_on_failure(times=3) - def test_get_inexisting_node(self): + def test_get_nonexistent_node(self): with self.launch_node_command( - arguments=['get', '/foo/unexisting_node', 'test_param']) as node_command: + arguments=['get', '/foo/nonexistent_node', 'test_param']) as node_command: assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( @@ -501,7 +240,7 @@ def test_get_hidden_node_with_hidden_argument(self): assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_lines=['Integer value is: 42'], + expected_text=param_node.GET_VERB_PARAM_VALUES['int_param'].format(str(42)), text=node_command.output, strict=True ) @@ -520,9 +259,9 @@ def test_list_existing_node(self): ) @launch_testing.markers.retry_on_failure(times=3) - def test_list_unexisting_node(self): + def test_list_nonexistent_node(self): with self.launch_node_command( - arguments=['list', '/unexisting_node']) as node_command: + arguments=['list', '/nonexistent_node']) as node_command: assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( @@ -569,14 +308,14 @@ def test_list_existing_node_bool_prefix(self): strict=True ) - # TODO(BMarchi): This test fails for opensplice only. It hangs + # TODO(BMarchi): This test fails for every implementation. It hangs # in wait_for_shutdown method. It seems that it doesn't return # after getting all the nodes with their parameters. # @launch_testing.markers.retry_on_failure(times=3) # def test_list_all_nodes_all_parameters(self): # with self.launch_node_command( # arguments=['list', '--include-hidden-nodes']) as node_command: - # assert node_command.wait_for_shutdown(timeout=5) + # assert node_command.wait_for_shutdown(timeout=10) # assert node_command.exit_code == launch_testing.asserts.EXIT_OK # assert launch_testing.tools.expect_output( # expected_text=ALL_NODES_PARAMETER_LIST, @@ -584,131 +323,24 @@ def test_list_existing_node_bool_prefix(self): # strict=True # ) - # Describe verb tests - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_int_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('int_param', 'integer', ''), - text=node_command.output, - strict=True - ) - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_double_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'double_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('double_param', 'double', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_bool_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'bool_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_param', 'boolean', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_str_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'str_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('str_param', 'string', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_int_array_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'int_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('int_array', 'integer array', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_double_array_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'double_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('double_array', 'double array', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_bool_array_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'bool_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('bool_array', 'boolean array', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_string_array_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'str_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('str_array', 'string array', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_byte_array_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'byte_array']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('byte_array', 'byte array', ''), - text=node_command.output, - strict=True - ) - - @launch_testing.markers.retry_on_failure(times=3) - def test_describe_existing_node_not_set_param(self): - with self.launch_node_command( - arguments=['describe', '/param_node', 'parameter_with_no_value']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format('parameter_with_no_value', 'not set', ''), - text=node_command.output, - strict=True - ) + def test_describe_existing_node_all_params(self): + for param_key in param_node.ParamNode.get_node_parameters(): + with self.launch_node_command( + arguments=['describe', '/param_node', param_key]) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format( + param_key, DESCRIBE_PARAMETER_NAME_TYPE[param_key], ''), + text=node_command.output, + strict=True + ) @launch_testing.markers.retry_on_failure(times=3) - def test_describe_unexisting_node(self): + def test_describe_nonexistent_node(self): with self.launch_node_command( - arguments=['describe', '/foo/unexisting_node', 'int_param']) as node_command: + arguments=['describe', '/foo/nonexistent_node', 'int_param']) as node_command: assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( diff --git a/ros2param/test/test_cli_delete.py b/ros2param/test/test_cli_delete.py index 5dbb22271..73dae046a 100644 --- a/ros2param/test/test_cli_delete.py +++ b/ros2param/test/test_cli_delete.py @@ -38,7 +38,7 @@ @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) def generate_test_description(rmw_implementation, ready_fn): path_to_param_node_script = os.path.join( - os.path.dirname(__file__), 'fixtures', 'param_node.py' + os.path.dirname(__file__), 'param_node.py' ) additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} return LaunchDescription([ diff --git a/ros2param/test/test_cli_set.py b/ros2param/test/test_cli_set.py new file mode 100644 index 000000000..6183e3199 --- /dev/null +++ b/ros2param/test/test_cli_set.py @@ -0,0 +1,190 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import sys +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch.actions import OpaqueFunction + +from launch_ros.actions import Node + +import launch_testing +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import param_node + +import pytest + +from rmw_implementation import get_available_rmw_implementations + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation, ready_fn): + path_to_param_node_script = os.path.join( + os.path.dirname(__file__), 'param_node.py' + ) + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + return LaunchDescription([ + # Always restart daemon to isolate tests. + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + # Add test fixture actions. + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='param_node', + additional_env=additional_env + ), + Node( + node_executable=sys.executable, + arguments=[path_to_param_node_script], + node_name='_hidden_param_node', + additional_env=additional_env + ), + OpaqueFunction(function=lambda context: ready_fn()) + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestROS2ParamSetCLI(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + @contextlib.contextmanager + def launch_node_command(self, arguments): + node_command_action = ExecuteProcess( + cmd=['ros2', 'param', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + 'PYTHONUNBUFFERED': '1' + }, + name='ros2param-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, node_command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + # ignore ros2cli daemon node + filtered_patterns=['.*ros2cli.*'], + filtered_rmw_implementation=rmw_implementation + ) + ) as node_command: + yield node_command + cls.launch_node_command = launch_node_command + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_nonexistent_param_existing_node(self): + with self.launch_node_command( + arguments=['set', '/param_node', 'nonexistent_parameter', '1']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_valid_param_existing_node(self): + new_parameters = param_node.ParamNode.get_modified_parameters() + # Byte array is unsupported for the yaml parser in get_parameter_value + new_parameters.pop('byte_array') + for param_key, param_value in new_parameters.items(): + with self.launch_node_command( + arguments=['set', '/param_node', param_key, str(param_value)]) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Set parameter successful'], + text=node_command.output, + strict=True + ) + for param_key, param_value in new_parameters.items(): + with self.launch_node_command( + arguments=['get', '/param_node', param_key]) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + if param_value is None: + param_node.GET_VERB_PARAM_VALUES[param_key] = \ + param_node.GET_VERB_PARAM_VALUES['str_param'] + assert launch_testing.tools.expect_output( + expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format(str(param_value)), + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_nonexistent_param_hidden_node_no_hidden_argument(self): + with self.launch_node_command( + arguments=[ + 'set', '/_hidden_param_node', 'nonexistent_parameter', '1']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_nonexistent_param_hidden_node_hidden_argument(self): + with self.launch_node_command( + arguments=[ + 'set', '/_hidden_param_node', + 'nonexistent_parameter', '1', '--include-hidden-nodes']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], + text=node_command.output, + strict=True + ) + + @launch_testing.markers.retry_on_failure(times=3) + def test_set_nonexistent_node(self): + with self.launch_node_command( + arguments=['set', '/foo/nonexistent_node', 'test_param', '3.14']) as node_command: + assert node_command.wait_for_shutdown(timeout=5) + assert node_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=node_command.output, + strict=True + ) From 998a8f6f117ba2e8186399f10d0d1bc5d0f7400d Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Tue, 5 Nov 2019 09:06:36 -0300 Subject: [PATCH 4/7] Change comments and method names Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/test_cli_delete.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ros2param/test/test_cli_delete.py b/ros2param/test/test_cli_delete.py index 73dae046a..843d16ac7 100644 --- a/ros2param/test/test_cli_delete.py +++ b/ros2param/test/test_cli_delete.py @@ -97,7 +97,7 @@ def launch_node_command(self, arguments): with launch_testing.tools.launch_process( launch_service, node_command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( - # ignore launch_ros and ros2cli daemon nodes + # ignore ros2cli daemon node filtered_patterns=['.*ros2cli.*'], filtered_rmw_implementation=rmw_implementation ) @@ -164,9 +164,9 @@ def test_delete_int_param_hidden_node_with_hidden_flag(self): ) @launch_testing.markers.retry_on_failure(times=3) - def test_delete_any_param_unexisting_node(self): + def test_delete_any_param_nonexistent_node(self): with self.launch_node_command( - arguments=['delete', '/foo/unexisting_node', 'int_param']) as node_command: + arguments=['delete', '/foo/nonexistent_node', 'int_param']) as node_command: assert node_command.wait_for_shutdown(timeout=5) assert node_command.exit_code == 1 assert launch_testing.tools.expect_output( @@ -176,7 +176,7 @@ def test_delete_any_param_unexisting_node(self): ) @launch_testing.markers.retry_on_failure(times=3) - def test_delete_unexistent_param_existent_node(self): + def test_delete_nonexistent_param_existent_node(self): with self.launch_node_command( arguments=['delete', '/param_delete_node', 'foo_param']) as node_command: assert node_command.wait_for_shutdown(timeout=5) From 699aee4be8ee4f21ea9a7158ce669a0d2c269dad Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Wed, 6 Nov 2019 14:40:48 -0300 Subject: [PATCH 5/7] Make use of subtests and address pr comments Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/test_cli.py | 302 +++++++++++++++--------------- ros2param/test/test_cli_delete.py | 82 ++++---- ros2param/test/test_cli_set.py | 99 +++++----- 3 files changed, 244 insertions(+), 239 deletions(-) diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py index 6de2ec48b..464fc9ca8 100644 --- a/ros2param/test/test_cli.py +++ b/ros2param/test/test_cli.py @@ -35,53 +35,51 @@ from rmw_implementation import get_available_rmw_implementations -PARAM_NODE_PARAMETER_LIST = """\ - bool_array - bool_param - byte_array - double_array - double_param - int_array - int_param - parameter_with_no_value - str_array - str_param - use_sim_time -""" +PARAM_NODE_PARAMETER_LIST = [ + ' bool_array', + ' bool_param', + ' byte_array', + ' double_array', + ' double_param', + ' int_array', + ' int_param', + ' parameter_with_no_value', + ' str_array', + ' str_param', + ' use_sim_time' +] -PARAM_NODE_BOOL_PARAMETER_LIST = """\ - bool_array - bool_param -""" +PARAM_NODE_BOOL_PARAMETER_LIST = [ + 'bool_array', + 'bool_param' +] -ALL_NODES_PARAMETER_LIST = """\ -/_hidden_param_node: - bool_array - bool_param - byte_array - double_array - double_param - int_array - int_param - parameter_with_no_value - str_array - str_param - use_sim_time -/launch_ros: - use_sim_time -/param_node: - bool_array - bool_param - byte_array - double_array - double_param - int_array - int_param - parameter_with_no_value - str_array - str_param - use_sim_time -""" +ALL_NODES_PARAMETER_LIST = [ + '/_hidden_param_node:', + ' bool_array', + ' bool_param', + ' byte_array', + ' double_array', + ' double_param', + ' int_array', + ' int_param', + ' parameter_with_no_value', + ' str_array', + ' str_param', + ' use_sim_time', + '/param_node:', + ' bool_array', + ' bool_param', + ' byte_array', + ' double_array', + ' double_param', + ' int_array', + ' int_param', + ' parameter_with_no_value', + ' str_array', + ' str_param', + ' use_sim_time' +] DESCRIBE_PARAMETER_TYPE = """\ Parameter name: {} @@ -153,8 +151,8 @@ def setUpClass( rmw_implementation ): @contextlib.contextmanager - def launch_node_command(self, arguments): - node_command_action = ExecuteProcess( + def launch_param_command(self, arguments): + param_command_action = ExecuteProcess( cmd=['ros2', 'param', *arguments], additional_env={ 'RMW_IMPLEMENTATION': rmw_implementation, @@ -164,187 +162,191 @@ def launch_node_command(self, arguments): output='screen' ) with launch_testing.tools.launch_process( - launch_service, node_command_action, proc_info, proc_output, + launch_service, param_command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( # ignore ros2cli daemon node filtered_patterns=['.*ros2cli.*'], filtered_rmw_implementation=rmw_implementation ) - ) as node_command: - yield node_command - cls.launch_node_command = launch_node_command - cls.rmw_implementation = rmw_implementation + ) as param_command: + yield param_command + cls.launch_param_command = launch_param_command + @launch_testing.markers.retry_on_failure(times=3) def test_get_verb(self): parameters = param_node.ParamNode.get_node_parameters() for param_key, param_value in parameters.items(): - with self.launch_node_command( - arguments=['get', '/param_node', param_key]) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - if param_value is None: - param_value = '' - assert launch_testing.tools.expect_output( - expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format(str(param_value)), - text=node_command.output, - strict=True - ) + with self.subTest(param_key=param_key, param_value=param_value): + with self.launch_param_command( + arguments=['get', '/param_node', param_key]) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + if param_value is None: + param_value = '' + assert launch_testing.tools.expect_output( + expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format( + str(param_value)), + text=param_command.output, + strict=True + ) + @launch_testing.markers.retry_on_failure(times=3) def test_get_verb_hide_type(self): parameters = param_node.ParamNode.get_node_parameters() for param_key, param_value in parameters.items(): - with self.launch_node_command( - arguments=['get', '/param_node', param_key, '--hide-type']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - if param_value is None: - param_value = ':None\n' - assert launch_testing.tools.expect_output( - expected_lines=[ - param_node.GET_VERB_PARAM_VALUES[param_key].format( - str(param_value)).split(':')[1].strip()], - text=node_command.output, - strict=True - ) + with self.subTest(param_key=param_key, param_value=param_value): + with self.launch_param_command( + arguments=['get', '/param_node', + param_key, '--hide-type']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + if param_value is None: + param_value = ':None\n' + assert launch_testing.tools.expect_output( + expected_lines=[ + param_node.GET_VERB_PARAM_VALUES[param_key].format( + str(param_value)).split(':')[1].strip()], + text=param_command.output, + strict=True + ) @launch_testing.markers.retry_on_failure(times=3) def test_get_nonexistent_node(self): - with self.launch_node_command( - arguments=['get', '/foo/nonexistent_node', 'test_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['get', '/foo/nonexistent_node', 'test_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_get_hidden_node_with_no_hidden_argument(self): - with self.launch_node_command( - arguments=['get', '/_hidden_param_node', 'test_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['get', '/_hidden_param_node', 'test_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_get_hidden_node_with_hidden_argument(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ 'get', '/_hidden_param_node', 'int_param', - '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + '--include-hidden-nodes']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_text=param_node.GET_VERB_PARAM_VALUES['int_param'].format(str(42)), - text=node_command.output, + expected_text=param_node.GET_VERB_PARAM_VALUES['int_param'].format( + str(param_node.ParamNode.get_node_parameters()['int_param'])), + text=param_command.output, strict=True ) # List verb tests @launch_testing.markers.retry_on_failure(times=3) def test_list_existing_node(self): - with self.launch_node_command( - arguments=['list', '/param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + with self.launch_param_command( + arguments=['list', '/param_node']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_text=PARAM_NODE_PARAMETER_LIST, - text=node_command.output, + expected_lines=PARAM_NODE_PARAMETER_LIST, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_list_nonexistent_node(self): - with self.launch_node_command( - arguments=['list', '/nonexistent_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['list', '/nonexistent_node']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_list_hidden_existing_node_no_hidden_flag(self): - with self.launch_node_command( - arguments=['list', '/_hidden_param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['list', '/_hidden_param_node']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_list_hidden_existing_node_hidden_flag(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ - 'list', '/_hidden_param_node', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + 'list', '/_hidden_param_node', '--include-hidden-nodes']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_text=PARAM_NODE_PARAMETER_LIST, - text=node_command.output, + expected_lines=PARAM_NODE_PARAMETER_LIST, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_list_existing_node_bool_prefix(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ - 'list', '--param-prefixes=bool', '/param_node']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + 'list', '--param-prefixes=bool', '/param_node']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_text=PARAM_NODE_BOOL_PARAMETER_LIST, - text=node_command.output, + expected_lines=PARAM_NODE_BOOL_PARAMETER_LIST, + text=param_command.output, strict=True ) - # TODO(BMarchi): This test fails for every implementation. It hangs - # in wait_for_shutdown method. It seems that it doesn't return - # after getting all the nodes with their parameters. - # @launch_testing.markers.retry_on_failure(times=3) - # def test_list_all_nodes_all_parameters(self): - # with self.launch_node_command( - # arguments=['list', '--include-hidden-nodes']) as node_command: - # assert node_command.wait_for_shutdown(timeout=10) - # assert node_command.exit_code == launch_testing.asserts.EXIT_OK - # assert launch_testing.tools.expect_output( - # expected_text=ALL_NODES_PARAMETER_LIST, - # text=node_command.output, - # strict=True - # ) + @launch_testing.markers.retry_on_failure(times=5) + def test_list_all_nodes_all_parameters(self): + with self.launch_param_command( + arguments=['list', '--include-hidden-nodes']) as param_command: + assert param_command.wait_for_shutdown(timeout=10) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=ALL_NODES_PARAMETER_LIST, + text=param_command.output, + strict=False + ) - @launch_testing.markers.retry_on_failure(times=3) + @launch_testing.markers.retry_on_failure(times=4) def test_describe_existing_node_all_params(self): for param_key in param_node.ParamNode.get_node_parameters(): - with self.launch_node_command( - arguments=['describe', '/param_node', param_key]) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_text=DESCRIBE_PARAMETER_TYPE.format( - param_key, DESCRIBE_PARAMETER_NAME_TYPE[param_key], ''), - text=node_command.output, - strict=True - ) + with self.subTest(param_key=param_key): + with self.launch_param_command( + arguments=['describe', '/param_node', param_key]) as param_command: + assert param_command.wait_for_shutdown(timeout=10) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=DESCRIBE_PARAMETER_TYPE.format( + param_key, DESCRIBE_PARAMETER_NAME_TYPE[param_key], ''), + text=param_command.output, + strict=True + ) @launch_testing.markers.retry_on_failure(times=3) def test_describe_nonexistent_node(self): - with self.launch_node_command( - arguments=['describe', '/foo/nonexistent_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['describe', '/foo/nonexistent_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) diff --git a/ros2param/test/test_cli_delete.py b/ros2param/test/test_cli_delete.py index 843d16ac7..5e9021c47 100644 --- a/ros2param/test/test_cli_delete.py +++ b/ros2param/test/test_cli_delete.py @@ -84,8 +84,8 @@ def setUpClass( rmw_implementation ): @contextlib.contextmanager - def launch_node_command(self, arguments): - node_command_action = ExecuteProcess( + def launch_param_command(self, arguments): + param_command_action = ExecuteProcess( cmd=['ros2', 'param', *arguments], additional_env={ 'RMW_IMPLEMENTATION': rmw_implementation, @@ -95,94 +95,94 @@ def launch_node_command(self, arguments): output='screen' ) with launch_testing.tools.launch_process( - launch_service, node_command_action, proc_info, proc_output, + launch_service, param_command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( # ignore ros2cli daemon node filtered_patterns=['.*ros2cli.*'], filtered_rmw_implementation=rmw_implementation ) - ) as node_command: - yield node_command - cls.launch_node_command = launch_node_command + ) as param_command: + yield param_command + cls.launch_param_command = launch_param_command @launch_testing.markers.retry_on_failure(times=3) def test_delete_int_param_existing_node(self): - with self.launch_node_command( - arguments=['delete', '/param_delete_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + with self.launch_param_command( + arguments=['delete', '/param_delete_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Deleted parameter successfully'], - text=node_command.output, + text=param_command.output, strict=True ) - with self.launch_node_command( - arguments=['get', '/param_delete_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + with self.launch_param_command( + arguments=['get', '/param_delete_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Parameter not set.'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_delete_int_param_hidden_node(self): - with self.launch_node_command( - arguments=['delete', '/_hidden_param_delete_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['delete', '/_hidden_param_delete_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_text='Node not found\n', - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=5) def test_delete_int_param_hidden_node_with_hidden_flag(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ 'delete', '--include-hidden-nodes', - '/_hidden_param_delete_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=8) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + '/_hidden_param_delete_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=8) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Deleted parameter successfully'], - text=node_command.output, + text=param_command.output, strict=True ) - with self.launch_node_command( + with self.launch_param_command( arguments=[ 'get', '--include-hidden-nodes', - '/_hidden_param_delete_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + '/_hidden_param_delete_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Parameter not set.'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_delete_any_param_nonexistent_node(self): - with self.launch_node_command( - arguments=['delete', '/foo/nonexistent_node', 'int_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['delete', '/foo/nonexistent_node', 'int_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_text='Node not found\n', - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_delete_nonexistent_param_existent_node(self): - with self.launch_node_command( - arguments=['delete', '/param_delete_node', 'foo_param']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + with self.launch_param_command( + arguments=['delete', '/param_delete_node', 'foo_param']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=['Deleting parameter failed'], - text=node_command.output, + text=param_command.output, strict=True ) diff --git a/ros2param/test/test_cli_set.py b/ros2param/test/test_cli_set.py index 6183e3199..ef7e220c1 100644 --- a/ros2param/test/test_cli_set.py +++ b/ros2param/test/test_cli_set.py @@ -13,6 +13,7 @@ # limitations under the License. import contextlib +import copy import os import sys import unittest @@ -35,6 +36,9 @@ from rmw_implementation import get_available_rmw_implementations +SET_VERB_PARAM_VALUES = copy.deepcopy(param_node.GET_VERB_PARAM_VALUES) +SET_VERB_PARAM_VALUES['parameter_with_no_value'] = SET_VERB_PARAM_VALUES['str_param'] + @pytest.mark.rostest @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) @@ -86,8 +90,8 @@ def setUpClass( rmw_implementation ): @contextlib.contextmanager - def launch_node_command(self, arguments): - node_command_action = ExecuteProcess( + def launch_param_command(self, arguments): + param_command_action = ExecuteProcess( cmd=['ros2', 'param', *arguments], additional_env={ 'RMW_IMPLEMENTATION': rmw_implementation, @@ -97,26 +101,26 @@ def launch_node_command(self, arguments): output='screen' ) with launch_testing.tools.launch_process( - launch_service, node_command_action, proc_info, proc_output, + launch_service, param_command_action, proc_info, proc_output, output_filter=launch_testing_ros.tools.basic_output_filter( # ignore ros2cli daemon node filtered_patterns=['.*ros2cli.*'], filtered_rmw_implementation=rmw_implementation ) - ) as node_command: - yield node_command - cls.launch_node_command = launch_node_command + ) as param_command: + yield param_command + cls.launch_param_command = launch_param_command @launch_testing.markers.retry_on_failure(times=3) def test_set_nonexistent_param_existing_node(self): - with self.launch_node_command( - arguments=['set', '/param_node', 'nonexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + with self.launch_param_command( + arguments=['set', '/param_node', 'nonexistent_parameter', '1']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], - text=node_command.output, + text=param_command.output, strict=True ) @@ -126,65 +130,64 @@ def test_set_valid_param_existing_node(self): # Byte array is unsupported for the yaml parser in get_parameter_value new_parameters.pop('byte_array') for param_key, param_value in new_parameters.items(): - with self.launch_node_command( - arguments=['set', '/param_node', param_key, str(param_value)]) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - assert launch_testing.tools.expect_output( - expected_lines=['Set parameter successful'], - text=node_command.output, - strict=True - ) - for param_key, param_value in new_parameters.items(): - with self.launch_node_command( - arguments=['get', '/param_node', param_key]) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK - if param_value is None: - param_node.GET_VERB_PARAM_VALUES[param_key] = \ - param_node.GET_VERB_PARAM_VALUES['str_param'] - assert launch_testing.tools.expect_output( - expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format(str(param_value)), - text=node_command.output, - strict=True - ) + with self.subTest(param_key=param_key, param_value=param_value): + with self.launch_param_command( + arguments=['set', '/param_node', + param_key, str(param_value)]) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Set parameter successful'], + text=param_command.output, + strict=True + ) + with self.launch_param_command( + arguments=['get', '/param_node', param_key]) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_text=SET_VERB_PARAM_VALUES[param_key].format( + str(param_value)), + text=param_command.output, + strict=True + ) @launch_testing.markers.retry_on_failure(times=3) def test_set_nonexistent_param_hidden_node_no_hidden_argument(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ - 'set', '/_hidden_param_node', 'nonexistent_parameter', '1']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + 'set', '/_hidden_param_node', 'nonexistent_parameter', '1']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_set_nonexistent_param_hidden_node_hidden_argument(self): - with self.launch_node_command( + with self.launch_param_command( arguments=[ 'set', '/_hidden_param_node', - 'nonexistent_parameter', '1', '--include-hidden-nodes']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == launch_testing.asserts.EXIT_OK + 'nonexistent_parameter', '1', '--include-hidden-nodes']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ "Setting parameter failed: ('Invalid access to undeclared parameter(s)', [])"], - text=node_command.output, + text=param_command.output, strict=True ) @launch_testing.markers.retry_on_failure(times=3) def test_set_nonexistent_node(self): - with self.launch_node_command( - arguments=['set', '/foo/nonexistent_node', 'test_param', '3.14']) as node_command: - assert node_command.wait_for_shutdown(timeout=5) - assert node_command.exit_code == 1 + with self.launch_param_command( + arguments=['set', '/foo/nonexistent_node', 'test_param', '3.14']) as param_command: + assert param_command.wait_for_shutdown(timeout=5) + assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( expected_lines=['Node not found'], - text=node_command.output, + text=param_command.output, strict=True ) From 636a6c0c5ca179282df5838f14875eecb0267c32 Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Wed, 6 Nov 2019 15:11:04 -0300 Subject: [PATCH 6/7] Move constants to their own files Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/param_node.py | 13 ------------- ros2param/test/test_cli.py | 19 ++++++++++++++++--- ros2param/test/test_cli_set.py | 15 ++++++++++++--- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/ros2param/test/param_node.py b/ros2param/test/param_node.py index f08eb525a..59b90c03f 100644 --- a/ros2param/test/param_node.py +++ b/ros2param/test/param_node.py @@ -18,19 +18,6 @@ import rclpy from rclpy.node import Node -GET_VERB_PARAM_VALUES = { - 'bool_param': 'Boolean value is: {}\n', - 'int_param': 'Integer value is: {}\n', - 'double_param': 'Double value is: {}\n', - 'str_param': 'String value is: {}\n', - 'byte_array': 'Byte values are: {}\n', - 'bool_array': 'Boolean values are: {}\n', - 'int_array': "Integer values are: array('q', {})\n", - 'double_array': "Double values are: array('d', {})\n", - 'str_array': 'String values are: {}\n', - 'parameter_with_no_value': 'Parameter not set.{}\n' -} - class ParamNode(Node): diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py index 464fc9ca8..c72aed527 100644 --- a/ros2param/test/test_cli.py +++ b/ros2param/test/test_cli.py @@ -100,6 +100,19 @@ 'parameter_with_no_value': 'not set' } +GET_VERB_PARAM_VALUES = { + 'bool_param': 'Boolean value is: {}\n', + 'int_param': 'Integer value is: {}\n', + 'double_param': 'Double value is: {}\n', + 'str_param': 'String value is: {}\n', + 'byte_array': 'Byte values are: {}\n', + 'bool_array': 'Boolean values are: {}\n', + 'int_array': "Integer values are: array('q', {})\n", + 'double_array': "Double values are: array('d', {})\n", + 'str_array': 'String values are: {}\n', + 'parameter_with_no_value': 'Parameter not set.{}\n' +} + @pytest.mark.rostest @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) @@ -184,7 +197,7 @@ def test_get_verb(self): if param_value is None: param_value = '' assert launch_testing.tools.expect_output( - expected_text=param_node.GET_VERB_PARAM_VALUES[param_key].format( + expected_text=GET_VERB_PARAM_VALUES[param_key].format( str(param_value)), text=param_command.output, strict=True @@ -204,7 +217,7 @@ def test_get_verb_hide_type(self): param_value = ':None\n' assert launch_testing.tools.expect_output( expected_lines=[ - param_node.GET_VERB_PARAM_VALUES[param_key].format( + GET_VERB_PARAM_VALUES[param_key].format( str(param_value)).split(':')[1].strip()], text=param_command.output, strict=True @@ -243,7 +256,7 @@ def test_get_hidden_node_with_hidden_argument(self): assert param_command.wait_for_shutdown(timeout=5) assert param_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_text=param_node.GET_VERB_PARAM_VALUES['int_param'].format( + expected_text=GET_VERB_PARAM_VALUES['int_param'].format( str(param_node.ParamNode.get_node_parameters()['int_param'])), text=param_command.output, strict=True diff --git a/ros2param/test/test_cli_set.py b/ros2param/test/test_cli_set.py index ef7e220c1..0c88923bf 100644 --- a/ros2param/test/test_cli_set.py +++ b/ros2param/test/test_cli_set.py @@ -13,7 +13,6 @@ # limitations under the License. import contextlib -import copy import os import sys import unittest @@ -36,8 +35,18 @@ from rmw_implementation import get_available_rmw_implementations -SET_VERB_PARAM_VALUES = copy.deepcopy(param_node.GET_VERB_PARAM_VALUES) -SET_VERB_PARAM_VALUES['parameter_with_no_value'] = SET_VERB_PARAM_VALUES['str_param'] +SET_VERB_PARAM_VALUES = { + 'bool_param': 'Boolean value is: {}\n', + 'int_param': 'Integer value is: {}\n', + 'double_param': 'Double value is: {}\n', + 'str_param': 'String value is: {}\n', + 'byte_array': 'Byte values are: {}\n', + 'bool_array': 'Boolean values are: {}\n', + 'int_array': "Integer values are: array('q', {})\n", + 'double_array': "Double values are: array('d', {})\n", + 'str_array': 'String values are: {}\n', + 'parameter_with_no_value': 'String value is: {}\n' +} @pytest.mark.rostest From ded60282eb1da9f06e713eebb21b0d86383d8c67 Mon Sep 17 00:00:00 2001 From: Brian Ezequiel Marchi Date: Mon, 11 Nov 2019 14:49:53 -0300 Subject: [PATCH 7/7] Address PR comments Signed-off-by: Brian Ezequiel Marchi --- ros2param/test/test_cli.py | 27 +++++++++++++++++++++------ ros2param/test/test_cli_delete.py | 4 ++-- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/ros2param/test/test_cli.py b/ros2param/test/test_cli.py index c72aed527..b0756aee9 100644 --- a/ros2param/test/test_cli.py +++ b/ros2param/test/test_cli.py @@ -113,6 +113,19 @@ 'parameter_with_no_value': 'Parameter not set.{}\n' } +GET_VERB_PARAM_VALUES_HIDE_FLAG = { + 'bool_param': '{}', + 'int_param': '{}', + 'double_param': '{}', + 'str_param': '{}', + 'byte_array': '{}', + 'bool_array': '{}', + 'int_array': "array('q', {})", + 'double_array': "array('d', {})", + 'str_array': '{}', + 'parameter_with_no_value': 'None' +} + @pytest.mark.rostest @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) @@ -209,16 +222,18 @@ def test_get_verb_hide_type(self): for param_key, param_value in parameters.items(): with self.subTest(param_key=param_key, param_value=param_value): with self.launch_param_command( - arguments=['get', '/param_node', - param_key, '--hide-type']) as param_command: + arguments=[ + 'get', '/param_node', + param_key, '--hide-type' + ] + ) as param_command: assert param_command.wait_for_shutdown(timeout=5) assert param_command.exit_code == launch_testing.asserts.EXIT_OK - if param_value is None: - param_value = ':None\n' assert launch_testing.tools.expect_output( expected_lines=[ - GET_VERB_PARAM_VALUES[param_key].format( - str(param_value)).split(':')[1].strip()], + GET_VERB_PARAM_VALUES_HIDE_FLAG[param_key].format( + str(param_value)) + ], text=param_command.output, strict=True ) diff --git a/ros2param/test/test_cli_delete.py b/ros2param/test/test_cli_delete.py index 5e9021c47..9d73edf94 100644 --- a/ros2param/test/test_cli_delete.py +++ b/ros2param/test/test_cli_delete.py @@ -133,7 +133,7 @@ def test_delete_int_param_hidden_node(self): assert param_command.wait_for_shutdown(timeout=5) assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( - expected_text='Node not found\n', + expected_lines=['Node not found'], text=param_command.output, strict=True ) @@ -170,7 +170,7 @@ def test_delete_any_param_nonexistent_node(self): assert param_command.wait_for_shutdown(timeout=5) assert param_command.exit_code == 1 assert launch_testing.tools.expect_output( - expected_text='Node not found\n', + expected_lines=['Node not found'], text=param_command.output, strict=True )