Skip to content

Commit

Permalink
Handle signals within the asyncio loop. (#476)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
hidmic authored and mjeronimo committed May 4, 2021
1 parent 6ce4b35 commit f0e4642
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 249 deletions.
97 changes: 26 additions & 71 deletions launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import collections.abc
import contextlib
import logging
import platform
import signal
import threading
import traceback
Expand All @@ -42,10 +43,7 @@
from .launch_description import LaunchDescription
from .launch_description_entity import LaunchDescriptionEntity
from .some_actions_type import SomeActionsType
from .utilities import install_signal_handlers
from .utilities import on_sigint
from .utilities import on_sigquit
from .utilities import on_sigterm
from .utilities import AsyncSafeSignalManager
from .utilities import visit_all_entities_and_collect_futures


Expand All @@ -62,11 +60,6 @@ def __init__(
"""
Create a LaunchService.
If called outside of the main-thread before the function
:func:`launch.utilities.install_signal_handlers()` has been called,
a ValueError can be raised, as setting signal handlers cannot be done
outside of the main-thread.
:param: argv stored in the context for access by the entities, None results in []
:param: noninteractive if True (not default), this service will assume it has
no terminal associated e.g. it is being executed from a non interactive script
Expand All @@ -80,10 +73,6 @@ def __init__(
# Setup logging
self.__logger = launch.logging.get_logger('launch')

# Install signal handlers if not already installed, will raise if not
# in main-thread, call manually in main-thread to avoid this.
install_signal_handlers()

# Setup context and register a built-in event handler for bootstrapping.
self.__context = LaunchContext(argv=self.__argv, noninteractive=noninteractive)
self.__context.register_event_handler(OnIncludeLaunchDescription())
Expand Down Expand Up @@ -197,12 +186,7 @@ def _prepare_run_loop(self):
# Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT.
sigint_received = False

def _on_sigint(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
def _on_sigint(signum):
nonlocal sigint_received
base_msg = 'user interrupted with ctrl-c (SIGINT)'
if not sigint_received:
Expand All @@ -214,57 +198,25 @@ def _on_sigint(signum, frame, prev_handler):
sigint_received = True
else:
self.__logger.warning('{} again, ignoring...'.format(base_msg))
if callable(prev_handler):
try:
# Run pre-existing signal handler.
prev_handler(signum, frame)
except KeyboardInterrupt:
# Ignore exception.
pass
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGINT, current_handler)

on_sigint(_on_sigint)

def _on_sigterm(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return

def _on_sigterm(signum):
signame = signal.Signals(signum).name
self.__logger.error(
'user interrupted with ctrl-\\ ({}), terminating...'.format(signame))
# TODO(wjwwood): try to terminate running subprocesses before exiting.
self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes')
self.__logger.error('using {} can result in orphaned processes'.format(signame))
self.__logger.error('make sure no processes launched are still running')
this_loop.call_soon(this_task.cancel)
if callable(prev_handler):
# Run pre-existing signal handler.
prev_handler(signum, frame)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGTERM, current_handler)

on_sigterm(_on_sigterm)

def _on_sigquit(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...')
_on_sigterm(signum, frame, prev_handler)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGQUIT, current_handler)

on_sigquit(_on_sigquit)

# Yield asyncio loop and current task.
yield self.__loop_from_run_thread, this_task
finally:
# Unset the signal handlers while not running.
on_sigint(None)
on_sigterm(None)
on_sigquit(None)

with AsyncSafeSignalManager(this_loop) as manager:
# Setup signal handlers
manager.handle(signal.SIGINT, _on_sigint)
manager.handle(signal.SIGTERM, _on_sigterm)
if platform.system() != 'Windows':
manager.handle(signal.SIGQUIT, _on_sigterm)
# Yield asyncio loop and current task.
yield this_loop, this_task
finally:
# No matter what happens, unset the loop.
with self.__loop_from_run_thread_lock:
self.__context._set_asyncio_loop(None)
Expand Down Expand Up @@ -309,9 +261,6 @@ async def run_async(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with other
asynchronous runs.
Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored
around the original signal handler. After the run ends, this behavior is undone.
:param: shutdown_when_idle if True (default), the service will shutdown when idle.
"""
# Make sure this has not been called from any thread but the main thread.
Expand Down Expand Up @@ -397,14 +346,20 @@ def run(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with
asynchronous runs (see `run_async()` documentation).
Note that KeyboardInterrupt is caught and ignored, as signals are handled separately.
After the run ends, this behavior is undone.
:param: shutdown_when_idle if True (default), the service will shutdown when idle
"""
loop = osrf_pycommon.process_utils.get_loop()
run_async_task = loop.create_task(self.run_async(
shutdown_when_idle=shutdown_when_idle
))
loop.run_until_complete(run_async_task)
return run_async_task.result()
while True:
try:
return loop.run_until_complete(run_async_task)
except KeyboardInterrupt:
continue

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
self.__shutting_down = True
Expand Down
10 changes: 2 additions & 8 deletions launch/launch/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from .ensure_argument_type_impl import ensure_argument_type
from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions
from .perform_substitutions_impl import perform_substitutions
from .signal_management import install_signal_handlers
from .signal_management import on_sigint
from .signal_management import on_sigquit
from .signal_management import on_sigterm
from .signal_management import AsyncSafeSignalManager
from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures

__all__ = [
Expand All @@ -32,10 +29,7 @@
'create_future',
'ensure_argument_type',
'perform_substitutions',
'install_signal_handlers',
'on_sigint',
'on_sigquit',
'on_sigterm',
'AsyncSafeSignalManager',
'normalize_to_list_of_substitutions',
'visit_all_entities_and_collect_futures',
]
Loading

0 comments on commit f0e4642

Please sign in to comment.