Skip to content

Commit

Permalink
Refactor end-to-end CLI tests.
Browse files Browse the repository at this point in the history
Improved test coverage for:

- ros2action
- ros2service
- ros2topic
- ros2msg
- ros2srv
- ros2interface
- ros2node
- ros2pkg

Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
hidmic committed Oct 15, 2019
1 parent 7dc731d commit bf24f0d
Show file tree
Hide file tree
Showing 27 changed files with 2,734 additions and 153 deletions.
1 change: 1 addition & 0 deletions ros2action/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>ros_testing</test_depend>
<test_depend>test_msgs</test_depend>

<export>
Expand Down
71 changes: 71 additions & 0 deletions ros2action/test/fixtures/fibonacci_action_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import time

import sys

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from test_msgs.action import Fibonacci


class FibonacciActionServer(Node):

def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)

def destroy_node(self):
self._action_server.destroy()
super().destroy_node()

def execute_callback(self, goal_handle):
feedback = Fibonacci.Feedback()
feedback.sequence = [0, 1]

for i in range(1, goal_handle.request.order):
feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i-1])
goal_handle.publish_feedback(feedback)

goal_handle.succeed()

result = Fibonacci.Result()
result.sequence = feedback.sequence
return result


def main(args=None):
rclpy.init(args=args)

node = FibonacciActionServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
print('server stopped cleanly')
except BaseException:
print('exception in server:', file=sys.stderr)
raise
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
313 changes: 313 additions & 0 deletions ros2action/test/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import re
import sys
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction

import launch_testing
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

from rmw_implementation import get_available_rmw_implementations

import yaml


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation, ready_fn):
path_to_action_server_executable = os.path.join(
os.path.dirname(__file__), 'fixtures', 'fibonacci_action_server.py'
)
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
ExecuteProcess(
cmd=[sys.executable, path_to_action_server_executable],
additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
),
OpaqueFunction(function=lambda context: ready_fn())
],
additional_env={'RMW_IMPLEMENTATION': rmw_implementation}
)
]
),
])


def get_fibonacci_send_goal_output(*, order=1, with_feedback=False):
assert order > 0
output = [
'Waiting for an action server to become available...',
'Sending goal:',
' order: {}'.format(order),
'',
re.compile('Goal accepted with ID: [a-f0-9]+'),
'',
]
sequence = [0, 1]
for _ in range(order - 1):
sequence.append(sequence[-1] + sequence[-2])
if with_feedback:
output.append('Feedback:')
output.extend((' ' + yaml.dump({
'sequence': sequence
})).splitlines())
output.append('')
output.append('Result:'),
output.extend((' ' + yaml.dump({
'sequence': sequence
})).splitlines())
output.append('')
output.append('Goal finished with status: SUCCEEDED')
return output


class TestROS2ActionCLI(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_action_command(self, arguments):
action_command_action = ExecuteProcess(
cmd=['ros2', 'action', *arguments],
name='ros2action-cli', output='screen',
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
}
)
with launch_testing.tools.launch_process(
launch_service, action_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=rmw_implementation
)
) as action_command:
yield action_command
cls.launch_action_command = launch_action_command

def test_info_on_nonexistent_action(self):
with self.launch_action_command(arguments=['info', '/not_an_action']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /not_an_action',
'Action clients: 0',
'Action servers: 0',
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info(self):
with self.launch_action_command(arguments=['info', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
' /fibonacci_action_server'
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info_with_types(self):
with self.launch_action_command(arguments=['info', '-t', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
' /fibonacci_action_server [test_msgs/action/Fibonacci]'
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_fibonacci_info_count(self):
with self.launch_action_command(arguments=['info', '-c', '/fibonacci']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Action: /fibonacci',
'Action clients: 0',
'Action servers: 1',
],
text=action_command.output,
strict=False
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list(self):
with self.launch_action_command(arguments=['list']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['/fibonacci'],
text=action_command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list_with_types(self):
with self.launch_action_command(arguments=['list', '-t']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['/fibonacci [test_msgs/action/Fibonacci]'],
text=action_command.output, strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_list_count(self):
with self.launch_action_command(arguments=['list', '-c']) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
command_output_lines = action_command.output.splitlines()
assert len(command_output_lines) == 1
assert int(command_output_lines[0]) == 1

@launch_testing.markers.retry_on_failure(times=5)
def test_send_fibonacci_goal(self):
with self.launch_action_command(
arguments=[
'send_goal',
'/fibonacci',
'test_msgs/action/Fibonacci',
'{order: 5}'
],
) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=get_fibonacci_send_goal_output(order=5),
text=action_command.output, strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_send_fibonacci_goal_with_feedback(self):
with self.launch_action_command(
arguments=[
'send_goal',
'-f',
'/fibonacci',
'test_msgs/action/Fibonacci',
'{order: 5}'
],
) as action_command:
assert action_command.wait_for_shutdown(timeout=10)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=get_fibonacci_send_goal_output(
order=5, with_feedback=True
),
text=action_command.output, strict=True
)

def test_show_fibonacci(self):
with self.launch_action_command(
arguments=['show', 'test_msgs/action/Fibonacci'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'int32 order',
'---',
'int32[] sequence',
'---',
'int32[] sequence'
],
text=action_command.output,
strict=False
)

def test_show_not_a_package(self):
with self.launch_action_command(
arguments=['show', 'not_a_package/action/Fibonacci'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['Unknown package name'],
text=action_command.output, strict=True
)

# TODO(hidmic): make 'ros2 action show' fail accordingly
# def test_show_not_an_action_ns(self):
# with self.launch_action_command(
# arguments=['show', 'test_msgs/foo/Fibonacci'],
# ) as action_command:
# assert action_command.wait_for_shutdown(timeout=2)
# assert action_command.exit_code == 1
# assert launch_testing.tools.expect_output(
# expected_lines=['Unknown action type'],
# text=action_command.output, strict=True
# )

def test_show_not_an_action_typename(self):
with self.launch_action_command(
arguments=['show', 'test_msgs/action/NotAnActionTypeName'],
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['Unknown action type'],
text=action_command.output, strict=True
)

def test_show_not_an_action_type(self):
with self.launch_action_command(
arguments=['show', 'not_an_action_type']
) as action_command:
assert action_command.wait_for_shutdown(timeout=2)
assert action_command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=['The passed action type is invalid'],
text=action_command.output, strict=True
)
Loading

0 comments on commit bf24f0d

Please sign in to comment.