From f73bc56caa9c7b3c53b264475728392d0e1ce9cf Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 22 Oct 2019 17:05:52 -0300 Subject: [PATCH] Support CLI commands testing (#279) * Improve launch_testing framework. - Generalize output checking tools. - Ease process launch and interaction. - Fix parameterization support. - Fix pytest hooks. - Replace inspect.getfullargspec() with inpect.signature() - Improve launch_testing.tools.expect_output() documentation. - Avoid races in ActiveIoHandler code. Signed-off-by: Michel Hidalgo --- .../launch_testing/asserts/assert_output.py | 52 +----- launch_testing/launch_testing/io_handler.py | 19 +- launch_testing/launch_testing/loader.py | 23 ++- launch_testing/launch_testing/markers.py | 41 +++++ .../launch_testing/proc_info_handler.py | 7 +- launch_testing/launch_testing/pytest/hooks.py | 4 +- launch_testing/launch_testing/test_runner.py | 11 +- .../launch_testing/tools/__init__.py | 6 +- .../launch_testing/tools/launchers.py | 43 ----- launch_testing/launch_testing/tools/output.py | 53 ++++++ .../launch_testing/tools/process.py | 163 ++++++++++++++++++ launch_testing/launch_testing/tools/text.py | 138 +++++++++++++++ .../examples/good_proc_launch_test.py | 2 +- .../examples/terminating_proc_launch_test.py | 60 ++++--- .../test_io_handler_and_assertions.py | 6 + .../test/launch_testing/test_tools.py | 129 +++++++++++++- 16 files changed, 619 insertions(+), 138 deletions(-) create mode 100644 launch_testing/launch_testing/markers.py delete mode 100644 launch_testing/launch_testing/tools/launchers.py create mode 100644 launch_testing/launch_testing/tools/process.py create mode 100644 launch_testing/launch_testing/tools/text.py diff --git a/launch_testing/launch_testing/asserts/assert_output.py b/launch_testing/launch_testing/asserts/assert_output.py index 2c0220f89..40dee8a30 100644 --- a/launch_testing/launch_testing/asserts/assert_output.py +++ b/launch_testing/launch_testing/asserts/assert_output.py @@ -20,56 +20,12 @@ further reference. """ - -import os - from osrf_pycommon.terminal_color import remove_ansi_escape_sequences +from ..tools.text import build_text_match from ..util import resolveProcesses -def normalize_lineseps(lines): - r"""Normalize and then return the given lines to all use '\n'.""" - lines = lines.replace(os.linesep, '\n') - # This happens (even on Linux and macOS) when capturing I/O from an - # emulated tty. - lines = lines.replace('\r\n', '\n') - return lines - - -def get_matching_function(expected_output): - if isinstance(expected_output, (list, tuple)): - if len(expected_output) > 0: - if isinstance(expected_output[0], str): - def _match(expected, actual): - lines = actual.splitlines() - for pattern in expected: - if pattern not in lines: - return False - index = lines.index(pattern) - lines = lines[index + 1:] - return True - return _match - if hasattr(expected_output[0], 'search'): - def _match(expected, actual): - start = 0 - actual = normalize_lineseps(actual) - for pattern in expected: - match = pattern.search(actual, start) - if match is None: - return False - start = match.end() - return True - return _match - elif isinstance(expected_output, str): - return lambda expected, actual: expected in actual - elif hasattr(expected_output, 'search'): - return lambda expected, actual: ( - expected.search(normalize_lineseps(actual)) is not None - ) - raise ValueError('Unknown format for expected output') - - def assertInStdout(proc_output, expected_output, process, @@ -86,7 +42,7 @@ def assertInStdout(proc_output, :type proc_output: An launch_testing.IoHandler :param expected_output: The output to search for - :type expected_output: string or regex pattern or list of the aforementioned types + :type expected_output: string or regex pattern or a list of the aforementioned types :param process: The process whose output will be searched :type process: A string (search by process name) or a launch.actions.ExecuteProcess object @@ -118,7 +74,7 @@ def assertInStdout(proc_output, if output_filter is not None: if not callable(output_filter): raise ValueError('output_filter is not callable') - output_match = get_matching_function(expected_output) + output_match = build_text_match(expected_output) for proc in resolved_procs: # Nominally just one matching proc full_output = ''.join( @@ -128,7 +84,7 @@ def assertInStdout(proc_output, full_output = remove_ansi_escape_sequences(full_output) if output_filter is not None: full_output = output_filter(full_output) - if output_match(expected_output, full_output): + if output_match(full_output) is not None: break else: names = ', '.join(sorted(p.process_details['name'] for p in resolved_procs)) diff --git a/launch_testing/launch_testing/io_handler.py b/launch_testing/launch_testing/io_handler.py index cc000b5b4..423aebb54 100644 --- a/launch_testing/launch_testing/io_handler.py +++ b/launch_testing/launch_testing/io_handler.py @@ -39,12 +39,14 @@ def __init__(self): self._sequence_list = [] # A time-ordered list of IO from all processes self._process_name_dict = {} # A dict of time ordered lists of IO key'd by the process + def track(self, process_name): + if process_name not in self._process_name_dict: + self._process_name_dict[process_name] = [] + def append(self, process_io): self._sequence_list.append(process_io) - if process_io.process_name not in self._process_name_dict: self._process_name_dict[process_io.process_name] = [] - self._process_name_dict[process_io.process_name].append(process_io) def __iter__(self): @@ -56,7 +58,7 @@ def processes(self): :returns [launch.actions.ExecuteProcess]: """ - return [val[0].action for val in self._process_name_dict.values()] + return [val[0].action for val in self._process_name_dict.values() if len(val) > 0] def process_names(self): """ @@ -79,7 +81,7 @@ def __getitem__(self, key): return list(self._process_name_dict[key.process_details['name']]) -class ActiveIoHandler(IoHandler): +class ActiveIoHandler: """ Holds stdout captured from running processes. @@ -93,6 +95,15 @@ def __init__(self): # by composition so we can still give out the unsynchronized version self._io_handler = IoHandler() + @property + def io_event(self): + return self._sync_lock + + def track(self, process_name): + with self._sync_lock: + self._io_handler.track(process_name) + self._sync_lock.notify() + def append(self, process_io): with self._sync_lock: self._io_handler.append(process_io) diff --git a/launch_testing/launch_testing/loader.py b/launch_testing/launch_testing/loader.py index 47ea90fca..e9f887c47 100644 --- a/launch_testing/launch_testing/loader.py +++ b/launch_testing/launch_testing/loader.py @@ -80,6 +80,8 @@ def __init__(self, pre_shutdown_tests, post_shutdown_tests): self.name = name + if not hasattr(test_description_function, '__markers__'): + test_description_function.__markers__ = {} self._test_description_function = test_description_function self.normalized_test_description = _normalize_ld(test_description_function) @@ -100,6 +102,10 @@ def __init__(self, setattr(tc, '_testMethodName', new_name) setattr(tc, new_name, test_method) + @property + def markers(self): + return self._test_description_function.__markers__ + def bind(self, tests, injected_attributes={}, injected_args={}): """ Bind injected_attributes and injected_args to tests. @@ -176,13 +182,16 @@ class _loader(unittest.TestLoader): """TestLoader selectively loads pre-shutdown or post-shutdown tests.""" def loadTestsFromTestCase(self, testCaseClass): - if getattr(testCaseClass, '__post_shutdown_test__', False) == load_post_shutdown: - cases = super(_loader, self).loadTestsFromTestCase(testCaseClass) + # Isolate test classes instances on a per parameterization basis + cases = super(_loader, self).loadTestsFromTestCase( + type(testCaseClass.__name__, (testCaseClass,), { + '__module__': testCaseClass.__module__ + }) + ) return cases - else: - # Empty test suites will be ignored by the test runner - return self.suiteClass() + # Empty test suites will be ignored by the test runner + return self.suiteClass() return _loader() @@ -224,9 +233,9 @@ def _bind_test_args_to_tests(context, test_suite): def _partially_bind_matching_args(unbound_function, arg_candidates): - function_arg_names = inspect.getfullargspec(unbound_function).args + function_args = inspect.signature(unbound_function).parameters # We only want to bind the part of the context matches the test args - matching_args = {k: v for (k, v) in arg_candidates.items() if k in function_arg_names} + matching_args = {k: v for (k, v) in arg_candidates.items() if k in function_args} return functools.partial(unbound_function, **matching_args) diff --git a/launch_testing/launch_testing/markers.py b/launch_testing/launch_testing/markers.py new file mode 100644 index 000000000..a65d49b9f --- /dev/null +++ b/launch_testing/launch_testing/markers.py @@ -0,0 +1,41 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools + + +def keep_alive(test_description): + """Mark a test launch description to be kept alive after fixture processes' termination.""" + if not hasattr(test_description, '__markers__'): + test_description.__markers__ = {} + test_description.__markers__['keep_alive'] = True + return test_description + + +def retry_on_failure(*, times): + """Mark a test case to be retried up to `times` on AssertionError.""" + assert times > 0 + + def _decorator(func): + @functools.wraps(func) + def _wrapper(*args, **kwargs): + n = times + while n > 1: + try: + return func(*args, **kwargs) + except AssertionError: + n -= 1 + return func(*args, **kwargs) + return _wrapper + return _decorator diff --git a/launch_testing/launch_testing/proc_info_handler.py b/launch_testing/launch_testing/proc_info_handler.py index 6abf2c652..30fc2244f 100644 --- a/launch_testing/launch_testing/proc_info_handler.py +++ b/launch_testing/launch_testing/proc_info_handler.py @@ -70,7 +70,7 @@ def __getitem__(self, key): return self._proc_info[key] -class ActiveProcInfoHandler(ProcInfoHandler): +class ActiveProcInfoHandler: """Allows tests to wait on process termination before proceeding.""" def __init__(self): @@ -79,6 +79,10 @@ def __init__(self): # by composition so we can still give out the unsynchronized version self._proc_info_handler = ProcInfoHandler() + @property + def proc_event(self): + return self._sync_lock + def append(self, process_info): with self._sync_lock: self._proc_info_handler.append(process_info) @@ -126,7 +130,6 @@ def proc_is_shutdown(): cmd_args=cmd_args, strict_proc_matching=True )[0] - process_event = self._proc_info_handler[resolved_process] return isinstance(process_event, ProcessExited) except NoMatchingProcessException: diff --git a/launch_testing/launch_testing/pytest/hooks.py b/launch_testing/launch_testing/pytest/hooks.py index 796ba83df..c1876a5c3 100644 --- a/launch_testing/launch_testing/pytest/hooks.py +++ b/launch_testing/launch_testing/pytest/hooks.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest import TestCase + import pytest from ..loader import LoadTestsFromPythonModule @@ -64,7 +66,7 @@ def repr_failure(self, excinfo): ) for test_run, test_result in excinfo.value.results.items() for test_case, _ in (test_result.errors + test_result.failures) - if not test_result.wasSuccessful() + if isinstance(test_case, TestCase) and not test_result.wasSuccessful() }) if excinfo.value.results else '' return super().repr_failure(excinfo) diff --git a/launch_testing/launch_testing/test_runner.py b/launch_testing/launch_testing/test_runner.py index f4b721a0f..a54415214 100644 --- a/launch_testing/launch_testing/test_runner.py +++ b/launch_testing/launch_testing/test_runner.py @@ -128,6 +128,11 @@ def run(self): RegisterEventHandler( OnProcessStart(on_start=lambda info, unused: proc_info.append(info)) ), + RegisterEventHandler( + OnProcessStart( + on_start=lambda info, unused: proc_output.track(info.process_name) + ) + ), RegisterEventHandler( OnProcessExit(on_exit=lambda info, unused: proc_info.append(info)) ), @@ -148,7 +153,11 @@ def run(self): ) self._test_tr.start() # Run the tests on another thread - self._launch_service.run() # This will block until the test thread stops it + + # This will block until the test thread stops it + self._launch_service.run( + shutdown_when_idle=not self._test_run.markers.get('keep_alive', False) + ) if not self._tests_completed.wait(timeout=0): # LaunchService.run returned before the tests completed. This can be because the user diff --git a/launch_testing/launch_testing/tools/__init__.py b/launch_testing/launch_testing/tools/__init__.py index 93d820ab1..057cf5bbf 100644 --- a/launch_testing/launch_testing/tools/__init__.py +++ b/launch_testing/launch_testing/tools/__init__.py @@ -12,13 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .launchers import launch_process from .output import basic_output_filter +from .output import expect_output from .output import expected_output_from_file +from .process import launch_process +from .process import ProcessProxy __all__ = [ 'basic_output_filter', + 'expect_output', 'expected_output_from_file', 'launch_process', + 'ProcessProxy' ] diff --git a/launch_testing/launch_testing/tools/launchers.py b/launch_testing/launch_testing/tools/launchers.py deleted file mode 100644 index ec77e7eaf..000000000 --- a/launch_testing/launch_testing/tools/launchers.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import contextlib - -import launch -import launch.events - - -@contextlib.contextmanager -def launch_process(launch_service, process_action): - """ - Launch a process. - - Start execution of a ``process_action`` using the given - ``launch_service`` upon context entering and shut it down - upon context exiting. - """ - assert isinstance(process_action, launch.actions.ExecuteProcess) - launch_service.emit_event( - event=launch.events.IncludeLaunchDescription( - launch_description=launch.LaunchDescription([process_action]) - ) - ) - try: - yield process_action - finally: - launch_service.emit_event( - event=launch.events.process.ShutdownProcess( - process_matcher=launch.events.matches_action(process_action) - ) - ) diff --git a/launch_testing/launch_testing/tools/output.py b/launch_testing/launch_testing/tools/output.py index 1f4e94671..ff8a23896 100644 --- a/launch_testing/launch_testing/tools/output.py +++ b/launch_testing/launch_testing/tools/output.py @@ -15,6 +15,11 @@ import os import re +from osrf_pycommon.terminal_color import remove_ansi_escape_sequences + +from .text import build_line_match +from .text import build_text_match + def get_default_filtered_prefixes(): return [ @@ -77,3 +82,51 @@ def expected_output_from_file(path): return [re.compile(regex) for regex in f.read().splitlines()] raise RuntimeError('could not find output check file: {}'.format(path)) + + +def expect_output( + text=None, + *, + lines=None, + expected_text=None, + expected_lines=None, + strip_ansi_escape_sequences=True, + strict=False +): + r""" + Match output text or lines with expected text or lines. + + Either (expected) text or (expected) lines can be provided but giving both results + in a ValueError. + If lines are given but a text is expected, these lines are joined using '\n'. + Likewise, if text is given but lines are expected, text is split into lines. + + :param expected_text: output text expectation, as supported + by `launch_testing.tools.text.build_text_match` + :param expected_lines: output lines expectation, as supported + by `launch_testing.tools.text.build_line_match` + :param text: output text to be matched + :param lines: output text lines to be matched + :param strip_ansi_escape_sequences: If True (default), strip + ansi escape sequences from actual output before comparing + """ + if (text is not None) == (lines is not None): + raise ValueError('Either lines or text, but not both, must be specified') + + if (expected_text is not None) == (expected_lines is not None): + raise ValueError('Either expected lines or text, but not both, must be specified') + + if expected_text is not None: + if text is None: + text = '\n'.join(lines) + match = build_text_match(expected_text, strict=strict) + if strip_ansi_escape_sequences: + text = remove_ansi_escape_sequences(text) + return match(text) is not None + + match = build_line_match(expected_lines, strict=strict) + if lines is None: + lines = text.splitlines() + if strip_ansi_escape_sequences: + lines = [remove_ansi_escape_sequences(line) for line in lines] + return match(lines) is not None diff --git a/launch_testing/launch_testing/tools/process.py b/launch_testing/launch_testing/tools/process.py new file mode 100644 index 000000000..5c0048193 --- /dev/null +++ b/launch_testing/launch_testing/tools/process.py @@ -0,0 +1,163 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib + +import launch +import launch.actions +import launch.events +from launch.utilities import ensure_argument_type + +from ..io_handler import ActiveIoHandler +from ..proc_info_handler import ActiveProcInfoHandler + + +class ProcessProxy: + """A proxy to interact with `launch.actions.ExecuteProcess` instances.""" + + def __init__(self, process_action, proc_info, proc_output, *, output_filter=None): + """ + Construct a proxy for the given ``process_action``. + + :param process_action: `launch.actions.ExecuteProcess` instance to proxy. + :param proc_info: `ActiveProcInfoHandler` tracking process state. + :param proc_output: `ActiveIoHandler` tracking process output. + :param output_filter: an optional callable to filter output text. + """ + ensure_argument_type( + process_action, types=launch.actions.ExecuteProcess, argument_name='process_action' + ) + ensure_argument_type(proc_info, types=ActiveProcInfoHandler, argument_name='proc_info') + ensure_argument_type(proc_output, types=ActiveIoHandler, argument_name='proc_output') + if output_filter is not None and not callable(output_filter): + raise TypeError( + "Expected 'output_filter' to be callable but got '{!r}'".format(output_filter) + ) + self._process_action = process_action + self._proc_info = proc_info + self._proc_output = proc_output + self._output_filter = output_filter + + def wait_for_shutdown(self, timeout=None): + """ + Wait for the target process to shutdown. + + :param timeout: time in seconds to wait, or None to block indefinitely. + :return: whether the target process shut down or not. + """ + with self._proc_info.proc_event: + return self._proc_info.proc_event.wait_for( + lambda: self.terminated, timeout=timeout + ) + + def wait_for_output(self, condition=None, timeout=None): + """ + Wait for the target process to produce any output, either over stdout or stderr. + + :param condition: a callable to wait on a specific output condition to be satisfied, + or ``None`` (default) to wake on any output. + :param timeout: time in seconds to wait, or ``None`` (default) to block indefinitely. + :return: whether the condition has been satisfied or not. + """ + if condition is None: + condition = (lambda output: True) + if not callable(condition): + raise TypeError( + "Expected 'condition' to be callable but got '{!r}'".format(condition) + ) + with self._proc_output.io_event: + return self._proc_output.io_event.wait_for( + lambda: self.running and condition(self.output), timeout=timeout + ) + + @property + def target_process_action(self): + return self._process_action + + @property + def stderr(self): + output_events = self._proc_output[self._process_action] + output_text = ''.join(ev.text.decode() for ev in output_events if ev.from_stderr) + if self._output_filter is not None: + output_text = self._output_filter(output_text) + return output_text + + @property + def stdout(self): + output_events = self._proc_output[self._process_action] + output_text = ''.join(ev.text.decode() for ev in output_events if ev.from_stdout) + if self._output_filter is not None: + output_text = self._output_filter(output_text) + return output_text + + @property + def output(self): + output_events = self._proc_output[self._process_action] + output_text = ''.join(ev.text.decode() for ev in output_events) + if self._output_filter is not None: + output_text = self._output_filter(output_text) + return output_text + + @property + def running(self): + if self._process_action not in self._proc_info.processes(): + return False + return isinstance( + self._proc_info[self._process_action], + launch.events.process.ProcessStarted + ) + + @property + def terminated(self): + if self._process_action not in self._proc_info.processes(): + return False + return isinstance( + self._proc_info[self._process_action], + launch.events.process.ProcessExited + ) + + @property + def exit_code(self): + return self._proc_info[self._process_action].returncode + + +@contextlib.contextmanager +def launch_process(launch_service, process_action, proc_info, proc_output, **kwargs): + """ + Launch and interact with a process. + + On context entering, start execution of a ``process_action`` using the given ``launch_service`` + and yield a `ProcessProxy` to that ``process_action``. + On context exiting, shut the process down if it has not been terminated yet. + All additional arguments are forwarded to `ProcessProxy` on construction. + """ + ensure_argument_type( + process_action, types=launch.actions.ExecuteProcess, argument_name='process_action' + ) + + launch_service.emit_event( + event=launch.events.IncludeLaunchDescription( + launch_description=launch.LaunchDescription([process_action]) + ) + ) + process_proxy = ProcessProxy(process_action, proc_info, proc_output, **kwargs) + try: + yield process_proxy + finally: + if not process_proxy.terminated: + launch_service.emit_event( + event=launch.events.process.ShutdownProcess( + process_matcher=launch.events.matches_action(process_action) + ) + ) diff --git a/launch_testing/launch_testing/tools/text.py b/launch_testing/launch_testing/tools/text.py new file mode 100644 index 000000000..61fdd7a95 --- /dev/null +++ b/launch_testing/launch_testing/tools/text.py @@ -0,0 +1,138 @@ +# Copyright 2019 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Iterable + +import os + + +def normalize_lineseps(lines): + r"""Normalize and then return the given lines to all use '\n'.""" + lines = lines.replace(os.linesep, '\n') + # This happens (even on Linux and macOS) when capturing I/O from an + # emulated tty. + lines = lines.replace('\r\n', '\n') + return lines + + +def build_line_match(expected_lines, *, strict=False): + """ + Make a callable to match lines with ``expected_lines``. + + :param expected_lines: line expectation to match against + :type expected_text: str or List[str] or List[Union[str, regex pattern]] + :return: a callable that matches text against the expectation. + :rtype: Callable[bool, [str]] + """ + if isinstance(expected_lines, str): + def _match(actual_lines, start=0): + if strict: + if all(expected_lines == line for line in actual_lines[start:]): + return start, start + len(actual_lines[start:]) + return None + return next(( + (i, i + 1) for i, line in enumerate( + actual_lines[start:], start=start + ) if expected_lines in line + ), None) + return _match + + if hasattr(expected_lines, 'match'): + def _match(actual_lines, start): + if strict: + if all(expected_lines.match(line) for line in actual_lines[start:]): + return start, start + len(actual_lines[start:]) + return None + return next(( + (i, i + 1) for i, line in enumerate( + actual_lines[start:], start=start + ) if expected_lines.match(line) + ), None) + return _match + + if isinstance(expected_lines, Iterable): + head_match, *tail_matches = [ + build_line_match(line, strict=False) for line in expected_lines + ] + + def _match(actual_lines, start=0): + next_start, end = head_match(actual_lines, start) or (-1, -1) + if next_start < start or (strict and next_start != start): + return None + start = next_start + for match in tail_matches: + next_start, next_end = match(actual_lines, end) or (-1, -1) + if next_start < end or (strict and next_start != end): + return None + end = next_end + return start, end + return _match + + raise ValueError('Unknown format for expected lines') + + +def build_text_match(expected_text, *, strict=False): + """ + Make a callable to match text with ``expected_text``. + + :param expected_text: text expectation to match against + :type expected_text: str or regex pattern or List[Union[str, regex pattern]] + :return: a callable that matches text against the expectation. + :rtype: Callable[bool, [str]] + """ + if isinstance(expected_text, str): + def _match(actual_text, start=0): + actual_text = normalize_lineseps(actual_text) + if strict: + if actual_text[start:] != expected_text: + return None + else: + start = actual_text.find(expected_text, start) + if start < 0: + return None + return start, start + len(expected_text) + return _match + + if hasattr(expected_text, 'search') and hasattr(expected_text, 'match'): + def _match(actual_text, start=0): + actual_text = normalize_lineseps(actual_text) + if strict: + match = expected_text.match(actual_text, start) + else: + match = expected_text.search(actual_text, start) + if match is not None: + match = match.start(), match.end() + return match + return _match + + if isinstance(expected_text, Iterable): + head_match, *tail_matches = [ + build_text_match(text, strict=False) for text in expected_text + ] + + def _match(actual_text, start=0): + actual_text = normalize_lineseps(actual_text) + next_start, end = head_match(actual_text, start) or (-1, -1) + if next_start < start or (strict and next_start != start): + return None + start = next_start + for match in tail_matches: + next_start, next_end = match(actual_text, end) or (-1, -1) + if next_start < end or (strict and next_start != end): + return None + end = next_end + return start, end + return _match + + raise ValueError('Unknown format for expected text') diff --git a/launch_testing/test/launch_testing/examples/good_proc_launch_test.py b/launch_testing/test/launch_testing/examples/good_proc_launch_test.py index 215dffeb1..694c75dcd 100644 --- a/launch_testing/test/launch_testing/examples/good_proc_launch_test.py +++ b/launch_testing/test/launch_testing/examples/good_proc_launch_test.py @@ -39,7 +39,7 @@ dut_process = launch.actions.ExecuteProcess( cmd=[sys.executable, TEST_PROC_PATH], - env=proc_env, + env=proc_env, output='screen' ) diff --git a/launch_testing/test/launch_testing/examples/terminating_proc_launch_test.py b/launch_testing/test/launch_testing/examples/terminating_proc_launch_test.py index cbcadebec..221491783 100644 --- a/launch_testing/test/launch_testing/examples/terminating_proc_launch_test.py +++ b/launch_testing/test/launch_testing/examples/terminating_proc_launch_test.py @@ -23,6 +23,7 @@ import launch_testing import launch_testing.asserts +import launch_testing.markers import launch_testing.tools import pytest @@ -39,56 +40,63 @@ def get_test_process_action(*, args=[]): name='terminating_proc', # This is necessary to get unbuffered output from the process under test additional_env={'PYTHONUNBUFFERED': '1'}, + output='screen' ) @pytest.mark.launch_test +@launch_testing.markers.keep_alive def generate_test_description(ready_fn): return launch.LaunchDescription([ - launch_testing.util.KeepAliveProc(), launch.actions.OpaqueFunction(function=lambda context: ready_fn()), ]) class TestTerminatingProc(unittest.TestCase): - def test_no_args(self, launch_service, proc_output, proc_info): + def test_no_args(self, launch_service, proc_info, proc_output): """Test terminating_proc without command line arguments.""" proc_action = get_test_process_action() - with launch_testing.tools.launch_process(launch_service, proc_action) as dut: - proc_info.assertWaitForStartup(process=dut, timeout=2) - proc_output.assertWaitFor('Starting Up', process=dut, timeout=2) - proc_output.assertWaitFor('Emulating Work', process=dut, timeout=2) - proc_output.assertWaitFor('Done', process=dut, timeout=2) - proc_output.assertWaitFor('Shutting Down', process=dut, timeout=2) - proc_info.assertWaitForShutdown(process=dut, timeout=2) - launch_testing.asserts.assertExitCodes(proc_info, process=dut) + with launch_testing.tools.launch_process( + launch_service, proc_action, proc_info, proc_output + ): + proc_info.assertWaitForStartup(process=proc_action, timeout=2) + proc_output.assertWaitFor('Starting Up', process=proc_action, timeout=2) + proc_output.assertWaitFor('Emulating Work', process=proc_action, timeout=2) + proc_output.assertWaitFor('Done', process=proc_action, timeout=2) + proc_output.assertWaitFor('Shutting Down', process=proc_action, timeout=2) + proc_info.assertWaitForShutdown(process=proc_action, timeout=4) + launch_testing.asserts.assertExitCodes(proc_info, process=proc_action) - def test_with_args(self, launch_service, proc_output, proc_info): + def test_with_args(self, launch_service, proc_info, proc_output): """Test terminating_proc with some command line arguments.""" proc_action = get_test_process_action(args=['--foo', 'bar']) - with launch_testing.tools.launch_process(launch_service, proc_action) as dut: - proc_info.assertWaitForStartup(process=dut, timeout=2) - proc_output.assertWaitFor('Starting Up', process=dut, timeout=2) + with launch_testing.tools.launch_process( + launch_service, proc_action, proc_info, proc_output + ): + proc_info.assertWaitForStartup(process=proc_action, timeout=2) + proc_output.assertWaitFor('Starting Up', process=proc_action, timeout=2) proc_output.assertWaitFor( - "Called with arguments ['--foo', 'bar']", process=dut, timeout=2 + "Called with arguments ['--foo', 'bar']", process=proc_action, timeout=2 ) - proc_output.assertWaitFor('Emulating Work', process=dut, timeout=2) - proc_output.assertWaitFor('Done', process=dut, timeout=2) - proc_output.assertWaitFor('Shutting Down', process=dut, timeout=2) - proc_info.assertWaitForShutdown(process=dut, timeout=2) - launch_testing.asserts.assertExitCodes(proc_info, process=dut) + proc_output.assertWaitFor('Emulating Work', process=proc_action, timeout=2) + proc_output.assertWaitFor('Done', process=proc_action, timeout=2) + proc_output.assertWaitFor('Shutting Down', process=proc_action, timeout=2) + proc_info.assertWaitForShutdown(process=proc_action, timeout=4) + launch_testing.asserts.assertExitCodes(proc_info, process=proc_action) def test_unhandled_exception(self, launch_service, proc_output, proc_info): """Test terminating_proc forcing an unhandled exception.""" proc_action = get_test_process_action(args=['--exception']) - with launch_testing.tools.launch_process(launch_service, proc_action) as dut: - proc_info.assertWaitForStartup(process=dut, timeout=2) - proc_output.assertWaitFor('Starting Up', process=dut, timeout=2) + with launch_testing.tools.launch_process( + launch_service, proc_action, proc_info, proc_output + ): + proc_info.assertWaitForStartup(process=proc_action, timeout=2) + proc_output.assertWaitFor('Starting Up', process=proc_action, timeout=2) proc_output.assertWaitFor( - "Called with arguments ['--exception']", process=dut, timeout=2 + "Called with arguments ['--exception']", process=proc_action, timeout=2 ) - proc_info.assertWaitForShutdown(process=dut, timeout=2) + proc_info.assertWaitForShutdown(process=proc_action, timeout=4) launch_testing.asserts.assertExitCodes( - proc_info, process=dut, allowable_exit_codes=[1] + proc_info, process=proc_action, allowable_exit_codes=[1] ) diff --git a/launch_testing/test/launch_testing/test_io_handler_and_assertions.py b/launch_testing/test/launch_testing/test_io_handler_and_assertions.py index c0b33161d..e0bd13e0b 100644 --- a/launch_testing/test/launch_testing/test_io_handler_and_assertions.py +++ b/launch_testing/test/launch_testing/test_io_handler_and_assertions.py @@ -22,6 +22,7 @@ import launch from launch.actions import RegisterEventHandler from launch.event_handlers import OnProcessIO +from launch.event_handlers import OnProcessStart from launch_testing import ActiveIoHandler from launch_testing.asserts import assertInStdout @@ -77,6 +78,11 @@ def setUpClass(cls): cls.proc_2, cls.proc_3, # This plumbs all the output to our IoHandler just like the LaunchTestRunner does + RegisterEventHandler( + OnProcessStart( + on_start=lambda event, _: cls.proc_output.track(event.process_name) + ) + ), RegisterEventHandler( OnProcessIO( on_stdout=cls.proc_output.append, diff --git a/launch_testing/test/launch_testing/test_tools.py b/launch_testing/test/launch_testing/test_tools.py index 7d7783465..0fdfbb4e0 100644 --- a/launch_testing/test/launch_testing/test_tools.py +++ b/launch_testing/test/launch_testing/test_tools.py @@ -12,8 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re + +import launch.actions +import launch.events +import launch.launch_context +import launch_testing.io_handler +import launch_testing.proc_info_handler from launch_testing.tools import basic_output_filter +from launch_testing.tools import expect_output +from launch_testing.tools import ProcessProxy def test_basic_output_filter(): @@ -29,7 +38,7 @@ def test_basic_output_filter(): [listener] I heard: foo [listener] I heard: bar [listener] I heard: foobar - """.replace(' ', '') + """.replace(' ' * 4, '') output_content = '' assert filter_fn(input_content) == output_content @@ -37,16 +46,128 @@ def test_basic_output_filter(): [talker] I said: foo [listener] I heard: bar [listener] I heard: foobar - """.replace(' ', '') + """.replace(' ' * 4, '') output_content = """\ [talker] I said: foo - """.replace(' ', '') + """.replace(' ' * 4, '') assert filter_fn(input_content) == output_content input_content = """\ [talker] I said: foo [talker] I said: bar [talker] I said: foobar - """.replace(' ', '') + """.replace(' ' * 4, '') output_content = input_content assert filter_fn(input_content) == output_content + + +def test_expect_output(): + output_text = """\ + [talker] I said: foo + [listener] I heard: bar + [talker] I said: foo! + [listener] I heard: baz""".replace(' ' * 4, '') + output_lines = output_text.splitlines() + + assert expect_output(expected_text=output_text, text=output_text) + assert expect_output(expected_text=output_text, text=output_text, strict=True) + assert expect_output(expected_lines=output_lines, text=output_text) + assert expect_output(expected_lines=output_lines, text=output_text, strict=True) + assert expect_output(expected_text=output_text, lines=output_lines) + assert expect_output(expected_text=output_text, lines=output_lines, strict=True) + assert expect_output(expected_lines=output_lines, lines=output_lines) + assert expect_output(expected_lines=output_lines, lines=output_lines, strict=True) + + assert expect_output( + expected_text=re.compile(r'^\[talker\].*$', re.M), text=output_text + ) + assert not expect_output( + expected_text=re.compile(r'^\[listener\].*$', re.M), + text=output_text, strict=True + ) + assert expect_output( + expected_lines=[ + re.compile(r'^\[talker\].*$', re.M), + re.compile(r'^\[listener\].*$', re.M) + ] * 2, + text=output_text, + strict=True + ) + + +def test_process_proxy(): + proc_output = launch_testing.io_handler.ActiveIoHandler() + proc_info = launch_testing.proc_info_handler.ActiveProcInfoHandler() + process_action = launch.actions.ExecuteProcess(cmd=['ls', '-las'], name='ls') + proxy = ProcessProxy(process_action, proc_info, proc_output) + + context = launch.launch_context.LaunchContext() + process_action._ExecuteProcess__expand_substitutions(context) + + assert not proxy.running + assert not proxy.terminated + + proc_info.append(launch.events.process.ProcessStarted( + action=process_action, + name=process_action.name, + cmd=process_action.cmd, + cwd=process_action.cwd, + env=process_action.env, + pid=1001 + )) + proc_output.track(process_action.name) + + assert proxy.running + assert not proxy.terminated + assert proxy.output == '' + assert proxy.stdout == '' + assert proxy.stderr == '' + + proc_output.append(launch.events.process.ProcessStdout( + action=process_action, + text='Foobar\n'.encode('utf-8'), + name=process_action.name, + cmd=process_action.cmd, + cwd=process_action.cwd, + env=process_action.env, + pid=1001 + )) + + assert proxy.running + assert not proxy.terminated + assert proxy.output == 'Foobar\n' + assert proxy.stdout == 'Foobar\n' + assert proxy.stderr == '' + + proc_output.append(launch.events.process.ProcessStderr( + action=process_action, + text='Warning!\n'.encode('utf-8'), + name=process_action.name, + cmd=process_action.cmd, + cwd=process_action.cwd, + env=process_action.env, + pid=1001 + )) + + assert proxy.running + assert not proxy.terminated + assert proxy.output == 'Foobar\nWarning!\n' + assert proxy.stdout == 'Foobar\n' + assert proxy.stderr == 'Warning!\n' + + proc_info.append(launch.events.process.ProcessExited( + action=process_action, + returncode=0, + name=process_action.name, + cmd=process_action.cmd, + cwd=process_action.cwd, + env=process_action.env, + pid=1001 + )) + + assert not proxy.running + assert proxy.terminated + assert proxy.exit_code == 0 + assert proxy.output == 'Foobar\nWarning!\n' + assert proxy.stdout == 'Foobar\n' + assert proxy.stderr == 'Warning!\n'