Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle signals within the asyncio loop. #476

Merged
merged 9 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 26 additions & 71 deletions launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import collections.abc
import contextlib
import logging
import platform
import signal
import threading
import traceback
Expand All @@ -42,10 +43,7 @@
from .launch_description import LaunchDescription
from .launch_description_entity import LaunchDescriptionEntity
from .some_actions_type import SomeActionsType
from .utilities import install_signal_handlers
from .utilities import on_sigint
from .utilities import on_sigquit
from .utilities import on_sigterm
from .utilities import AsyncSafeSignalManager
from .utilities import visit_all_entities_and_collect_futures


Expand All @@ -61,11 +59,6 @@ def __init__(
"""
Create a LaunchService.

If called outside of the main-thread before the function
:func:`launch.utilities.install_signal_handlers()` has been called,
a ValueError can be raised, as setting signal handlers cannot be done
outside of the main-thread.

:param: argv stored in the context for access by the entities, None results in []
:param: debug if True (not default), asyncio the logger are seutp for debug
"""
Expand All @@ -77,10 +70,6 @@ def __init__(
# Setup logging
self.__logger = launch.logging.get_logger('launch')

# Install signal handlers if not already installed, will raise if not
# in main-thread, call manually in main-thread to avoid this.
install_signal_handlers()

# Setup context and register a built-in event handler for bootstrapping.
self.__context = LaunchContext(argv=self.__argv)
self.__context.register_event_handler(OnIncludeLaunchDescription())
Expand Down Expand Up @@ -194,12 +183,7 @@ def _prepare_run_loop(self):
# Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT.
sigint_received = False

def _on_sigint(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
def _on_sigint(signum):
nonlocal sigint_received
base_msg = 'user interrupted with ctrl-c (SIGINT)'
if not sigint_received:
Expand All @@ -211,57 +195,25 @@ def _on_sigint(signum, frame, prev_handler):
sigint_received = True
else:
self.__logger.warning('{} again, ignoring...'.format(base_msg))
if callable(prev_handler):
try:
# Run pre-existing signal handler.
prev_handler(signum, frame)
except KeyboardInterrupt:
# Ignore exception.
pass
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGINT, current_handler)

on_sigint(_on_sigint)

def _on_sigterm(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return

def _on_sigterm(signum):
signame = signal.Signals(signum).name
self.__logger.error(
'user interrupted with ctrl-\\ ({}), terminating...'.format(signame))
# TODO(wjwwood): try to terminate running subprocesses before exiting.
self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes')
self.__logger.error('using {} can result in orphaned processes'.format(signame))
self.__logger.error('make sure no processes launched are still running')
this_loop.call_soon(this_task.cancel)
if callable(prev_handler):
# Run pre-existing signal handler.
prev_handler(signum, frame)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGTERM, current_handler)

on_sigterm(_on_sigterm)

def _on_sigquit(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...')
_on_sigterm(signum, frame, prev_handler)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGQUIT, current_handler)

on_sigquit(_on_sigquit)

# Yield asyncio loop and current task.
yield self.__loop_from_run_thread, this_task
finally:
# Unset the signal handlers while not running.
on_sigint(None)
on_sigterm(None)
on_sigquit(None)

with AsyncSafeSignalManager(this_loop) as manager:
# Setup signal handlers
manager.handle(signal.SIGINT, _on_sigint)
manager.handle(signal.SIGTERM, _on_sigterm)
if platform.system() != 'Windows':
manager.handle(signal.SIGQUIT, _on_sigterm)
# Yield asyncio loop and current task.
yield this_loop, this_task
finally:
# No matter what happens, unset the loop.
with self.__loop_from_run_thread_lock:
self.__context._set_asyncio_loop(None)
Expand Down Expand Up @@ -306,9 +258,6 @@ async def run_async(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with other
asynchronous runs.

Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored
around the original signal handler. After the run ends, this behavior is undone.

:param: shutdown_when_idle if True (default), the service will shutdown when idle.
"""
# Make sure this has not been called from any thread but the main thread.
Expand Down Expand Up @@ -394,14 +343,20 @@ def run(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with
asynchronous runs (see `run_async()` documentation).

Note that KeyboardInterrupt is caught and ignored, as signals are handled separately.
After the run ends, this behavior is undone.

:param: shutdown_when_idle if True (default), the service will shutdown when idle
"""
loop = osrf_pycommon.process_utils.get_loop()
run_async_task = loop.create_task(self.run_async(
shutdown_when_idle=shutdown_when_idle
))
loop.run_until_complete(run_async_task)
return run_async_task.result()
while True:
try:
return loop.run_until_complete(run_async_task)
except KeyboardInterrupt:
continue

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
self.__shutting_down = True
Expand Down
10 changes: 2 additions & 8 deletions launch/launch/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from .ensure_argument_type_impl import ensure_argument_type
from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions
from .perform_substitutions_impl import perform_substitutions
from .signal_management import install_signal_handlers
from .signal_management import on_sigint
from .signal_management import on_sigquit
from .signal_management import on_sigterm
from .signal_management import AsyncSafeSignalManager
from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures

__all__ = [
Expand All @@ -32,10 +29,7 @@
'create_future',
'ensure_argument_type',
'perform_substitutions',
'install_signal_handlers',
'on_sigint',
'on_sigquit',
'on_sigterm',
'AsyncSafeSignalManager',
'normalize_to_list_of_substitutions',
'visit_all_entities_and_collect_futures',
]
Loading