Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle signals within the asyncio loop. #476

Merged
merged 9 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 24 additions & 71 deletions launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
from .launch_description import LaunchDescription
from .launch_description_entity import LaunchDescriptionEntity
from .some_actions_type import SomeActionsType
from .utilities import install_signal_handlers
from .utilities import on_sigint
from .utilities import on_sigquit
from .utilities import on_sigterm
from .utilities import AsyncSafeSignalManager
from .utilities import visit_all_entities_and_collect_futures


Expand All @@ -61,11 +58,6 @@ def __init__(
"""
Create a LaunchService.

If called outside of the main-thread before the function
:func:`launch.utilities.install_signal_handlers()` has been called,
a ValueError can be raised, as setting signal handlers cannot be done
outside of the main-thread.

:param: argv stored in the context for access by the entities, None results in []
:param: debug if True (not default), asyncio the logger are seutp for debug
"""
Expand All @@ -77,10 +69,6 @@ def __init__(
# Setup logging
self.__logger = launch.logging.get_logger('launch')

# Install signal handlers if not already installed, will raise if not
# in main-thread, call manually in main-thread to avoid this.
install_signal_handlers()

# Setup context and register a built-in event handler for bootstrapping.
self.__context = LaunchContext(argv=self.__argv)
self.__context.register_event_handler(OnIncludeLaunchDescription())
Expand Down Expand Up @@ -194,12 +182,7 @@ def _prepare_run_loop(self):
# Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT.
sigint_received = False

def _on_sigint(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
def _on_sigint(signum):
nonlocal sigint_received
base_msg = 'user interrupted with ctrl-c (SIGINT)'
if not sigint_received:
Expand All @@ -211,57 +194,24 @@ def _on_sigint(signum, frame, prev_handler):
sigint_received = True
else:
self.__logger.warning('{} again, ignoring...'.format(base_msg))
if callable(prev_handler):
try:
# Run pre-existing signal handler.
prev_handler(signum, frame)
except KeyboardInterrupt:
# Ignore exception.
pass
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGINT, current_handler)

on_sigint(_on_sigint)

def _on_sigterm(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return

def _on_sigterm(signum):
signame = signal.Signals(signum).name
self.__logger.error(
'user interrupted with ctrl-\\ ({}), terminating...'.format(signame))
# TODO(wjwwood): try to terminate running subprocesses before exiting.
self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes')
self.__logger.error('using {} can result in orphaned processes'.format(signame))
self.__logger.error('make sure no processes launched are still running')
this_loop.call_soon(this_task.cancel)
if callable(prev_handler):
# Run pre-existing signal handler.
prev_handler(signum, frame)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGTERM, current_handler)

on_sigterm(_on_sigterm)

def _on_sigquit(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...')
_on_sigterm(signum, frame, prev_handler)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGQUIT, current_handler)

on_sigquit(_on_sigquit)

# Yield asyncio loop and current task.
yield self.__loop_from_run_thread, this_task
finally:
# Unset the signal handlers while not running.
on_sigint(None)
on_sigterm(None)
on_sigquit(None)

with AsyncSafeSignalManager(this_loop) as manager:
# Setup signal handlers
manager.handle(signal.SIGINT, _on_sigint)
manager.handle(signal.SIGTERM, _on_sigterm)
manager.handle(signal.SIGQUIT, _on_sigterm)
# Yield asyncio loop and current task.
yield this_loop, this_task
finally:
# No matter what happens, unset the loop.
with self.__loop_from_run_thread_lock:
self.__context._set_asyncio_loop(None)
Expand Down Expand Up @@ -306,9 +256,6 @@ async def run_async(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with other
asynchronous runs.

Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored
around the original signal handler. After the run ends, this behavior is undone.

:param: shutdown_when_idle if True (default), the service will shutdown when idle.
"""
# Make sure this has not been called from any thread but the main thread.
Expand Down Expand Up @@ -394,14 +341,20 @@ def run(self, *, shutdown_when_idle=True) -> int:
This should only ever be run from the main thread and not concurrently with
asynchronous runs (see `run_async()` documentation).

Note that KeyboardInterrupt is caught and ignored, as signals are handled separately.
After the run ends, this behavior is undone.

:param: shutdown_when_idle if True (default), the service will shutdown when idle
"""
loop = osrf_pycommon.process_utils.get_loop()
run_async_task = loop.create_task(self.run_async(
shutdown_when_idle=shutdown_when_idle
))
loop.run_until_complete(run_async_task)
return run_async_task.result()
while True:
try:
return loop.run_until_complete(run_async_task)
except KeyboardInterrupt:
continue

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
self.__shutting_down = True
Expand Down
10 changes: 2 additions & 8 deletions launch/launch/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from .ensure_argument_type_impl import ensure_argument_type
from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions
from .perform_substitutions_impl import perform_substitutions
from .signal_management import install_signal_handlers
from .signal_management import on_sigint
from .signal_management import on_sigquit
from .signal_management import on_sigterm
from .signal_management import AsyncSafeSignalManager
from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures

__all__ = [
Expand All @@ -32,10 +29,7 @@
'create_future',
'ensure_argument_type',
'perform_substitutions',
'install_signal_handlers',
'on_sigint',
'on_sigquit',
'on_sigterm',
'AsyncSafeSignalManager',
'normalize_to_list_of_substitutions',
'visit_all_entities_and_collect_futures',
]
204 changes: 88 additions & 116 deletions launch/launch/utilities/signal_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,133 +12,105 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for the signal management functionality."""
"""Module for signal management functionality."""

import platform
import asyncio
import os
import signal
import threading
import socket

import launch.logging

__signal_handlers_installed_lock = threading.Lock()
__signal_handlers_installed = False
__custom_sigint_handler = None
__custom_sigquit_handler = None
__custom_sigterm_handler = None
from typing import Callable
from typing import Optional
from typing import Union


def on_sigint(handler):
class AsyncSafeSignalManager:
"""
Set the signal handler to be called on SIGINT.
A context manager class for asynchronous handling of signals.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
"""
global __custom_sigint_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigint_handler = handler
Similar in purpose to :func:`asyncio.loop.add_signal_handler` but
not limited to Unix platforms.

Signal handlers can be registered at any time with a given manager.
These will become active for the extent of said manager context.
Unlike regular signal handlers, asynchronous signals handlers
can safely interact with their event loop.

def on_sigquit(handler):
"""
Set the signal handler to be called on SIGQUIT.
The same manager can be used multiple consecutive times and even
be nested with other managers, as these are independent from each
other i.e. managers do not override each other's handlers.

Note Windows does not have SIGQUIT, so it can be set with this function,
but the handler will not be called.
If used outside of the main thread, a ValueError is raised.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
The underlying mechanism is built around :func:`signal.set_wakeup_fd`
so as to not interfere with regular handlers installed via
:func:`signal.signal`.
All signals received are forwarded to the previously setup file
descriptor, if any.
"""
global __custom_sigquit_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigquit_handler = handler


def on_sigterm(handler):
"""
Set the signal handler to be called on SIGTERM.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
"""
global __custom_sigterm_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigterm_handler = handler


def install_signal_handlers():
"""
Install custom signal handlers so that hooks can be setup from other threads.

Calling this multiple times does not fail, but the signals are only
installed once.

If called outside of the main-thread, a ValueError is raised, see:
https://docs.python.org/3.6/library/signal.html#signal.signal

Also, if you register your own signal handlers after calling this function,
then you should store and forward to the existing signal handlers, because
otherwise the signal handlers registered by on_sigint(), on_sigquit, etc.
will not be run.
And the signal handlers registered with those functions are used to
gracefully exit the LaunchService when signaled (at least), and without
them it may not behave correctly.

If you register signal handlers before calling this function, then your
signal handler will automatically be called by the signal handlers in this
thread.
"""
global __signal_handlers_installed_lock, __signal_handlers_installed
with __signal_handlers_installed_lock:
if __signal_handlers_installed:
return
__signal_handlers_installed = True

global __custom_sigint_handler, __custom_sigquit_handler, __custom_sigterm_handler

__original_sigint_handler = signal.getsignal(signal.SIGINT)
__original_sigterm_handler = signal.getsignal(signal.SIGTERM)

def __on_sigint(signum, frame):
if callable(__custom_sigint_handler):
__custom_sigint_handler(signum, frame, __original_sigint_handler)
elif callable(__original_sigint_handler):
__original_sigint_handler(signum, frame)

if platform.system() != 'Windows':
# Windows does not support SIGQUIT
__original_sigquit_handler = signal.getsignal(signal.SIGQUIT)

def __on_sigquit(signum, frame):
if callable(__custom_sigquit_handler):
__custom_sigquit_handler(signum, frame, __original_sigquit_handler)
elif callable(__original_sigquit_handler):
__original_sigquit_handler(signum, frame)

def __on_sigterm(signum, frame):
if callable(__custom_sigterm_handler):
__custom_sigterm_handler(signum, frame, __original_sigterm_handler)
elif callable(__original_sigterm_handler):
__original_sigterm_handler(signum, frame)

# signals must be registered in the main thread, but print a nicer message if we're not there
try:
signal.signal(signal.SIGINT, __on_sigint)
signal.signal(signal.SIGTERM, __on_sigterm)
if platform.system() != 'Windows':
# Windows does not support SIGQUIT
signal.signal(signal.SIGQUIT, __on_sigquit)
except ValueError:
logger = launch.logging.get_logger(__name__)
logger.error("failed to set signal handlers in '{}'".format(__name__))
logger.error('this function must be called in the main thread')
raise
def __init__(
self,
loop: asyncio.AbstractEventLoop
):
"""
Instantiate manager.

:param loop: event loop that will handle the signals.
"""
self.__loop = loop
self.__handlers = {}
self.__prev_wsock_fd = -1
self.__wsock, self.__rsock = socket.socketpair()
self.__wsock.setblocking(False)
self.__rsock.setblocking(False)

def __enter__(self):
self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal)
self.__prev_wsock_fd = signal.set_wakeup_fd(self.__wsock.fileno())
return self

def __exit__(self, type_, value, traceback):
assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wsock_fd)
self.__loop.remove_reader(self.__rsock.fileno())

def __handle_signal(self):
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
while True:
try:
data = self.__rsock.recv(4096)
if not data:
break
for signum in data:
if signum not in self.__handlers:
continue
self.__handlers[signum](signum)
if self.__prev_wsock_fd != -1:
os.write(self.__prev_wsock_fd, data)
except InterruptedError:
continue
except BlockingIOError:
break

def handle(
self,
signum: Union[signal.Signals, int],
handler: Optional[Callable[[int], None]],
) -> Optional[Callable[[int], None]]:
"""
Register a callback for asynchronous handling of a given signal.

:param signum: number of the signal to be handled
:param handler: callback taking a signal number
as its sole argument, or None
:return: previous handler if any, otherwise None
"""
signum = signal.Signals(signum)
if handler is not None:
if not callable(handler):
raise ValueError('signal handler must be a callable')
old_handler = self.__handlers.get(signum, None)
self.__handlers[signum] = handler
else:
old_handler = self.__handlers.pop(signum, None)
return old_handler
Loading