From 5f22aa197c5dfb4c0927e75b84f2d5a682e6dc9c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 26 Sep 2022 18:40:44 +0100 Subject: [PATCH 1/3] Complement image: propagate SIGTERM to all workers This should mean that logs from worker processes are flushed before shutdown. When a test completes, Complement stops the docker container, which means that synapse will receive a SIGTERM. Currently, the `complement_fork_starter` exits immediately (without notifying the worker processes), which means that the workers never get a chance to flush their logs before the whole container is vaped. We can fix this by propagating the SIGTERM to the children. --- changelog.d/13914.misc | 1 + synapse/app/complement_fork_starter.py | 31 ++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 changelog.d/13914.misc diff --git a/changelog.d/13914.misc b/changelog.d/13914.misc new file mode 100644 index 000000000000..c29bc25d388b --- /dev/null +++ b/changelog.d/13914.misc @@ -0,0 +1 @@ +Complement image: propagate SIGTERM to all workers. diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py index 89eb07df2733..ccbe5145b440 100644 --- a/synapse/app/complement_fork_starter.py +++ b/synapse/app/complement_fork_starter.py @@ -51,11 +51,18 @@ import importlib import itertools import multiprocessing +import os +import signal import sys -from typing import Any, Callable, List +from types import FrameType +from typing import Any, Callable, List, Optional from twisted.internet.main import installReactor +# a list of the original signal handlers, before we installed our custom ones. +# We restore these in our child processes. +_original_signal_handlers: dict[int, Callable] = {} + class ProxiedReactor: """ @@ -105,6 +112,11 @@ def _worker_entrypoint( sys.argv = args + # reset the custom signal handlers that we installed, so that the children start + # from a clean slate. + for sig, handler in _original_signal_handlers.items(): + signal.signal(sig, handler) + from twisted.internet.epollreactor import EPollReactor proxy_reactor._install_real_reactor(EPollReactor()) @@ -167,13 +179,28 @@ def main() -> None: update_proc.join() print("===== PREPARED DATABASE =====", file=sys.stderr) + processes: List[multiprocessing.Process] = [] + + # Install signal handlers to propagate signals to all our children, so that they + # shut down cleanly. This also inhibits our own exit, but that's good: we want to + # wait until the children have exited. + def handle_signal(signum: int, frame: Optional[FrameType]) -> None: + print( + "complement_fork_starter: Caught signal %i. Stopping children." % signum, + file=sys.stderr, + ) + for p in processes: + os.kill(p.pid, signum) + + for sig in (signal.SIGINT, signal.SIGTERM): + _original_signal_handlers[sig] = signal.signal(sig, handle_signal) + # At this point, we've imported all the main entrypoints for all the workers. # Now we basically just fork() out to create the workers we need. # Because we're using fork(), all the workers get a clone of this launcher's # memory space and don't need to repeat the work of loading the code! # Instead of using fork() directly, we use the multiprocessing library, # which uses fork() on Unix platforms. - processes = [] for (func, worker_args) in zip(worker_functions, args_by_worker): process = multiprocessing.Process( target=_worker_entrypoint, args=(func, proxy_reactor, worker_args) From 3cabcc32c1c67e46bd4407d7b699415007a97b21 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 26 Sep 2022 18:55:59 +0100 Subject: [PATCH 2/3] fix lint --- synapse/app/complement_fork_starter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py index ccbe5145b440..0ea4ed3ebda1 100644 --- a/synapse/app/complement_fork_starter.py +++ b/synapse/app/complement_fork_starter.py @@ -61,7 +61,7 @@ # a list of the original signal handlers, before we installed our custom ones. # We restore these in our child processes. -_original_signal_handlers: dict[int, Callable] = {} +_original_signal_handlers: dict[int, Any] = {} class ProxiedReactor: @@ -190,7 +190,8 @@ def handle_signal(signum: int, frame: Optional[FrameType]) -> None: file=sys.stderr, ) for p in processes: - os.kill(p.pid, signum) + if p.pid: + os.kill(p.pid, signum) for sig in (signal.SIGINT, signal.SIGTERM): _original_signal_handlers[sig] = signal.signal(sig, handle_signal) From b5ea2f800937f89c280c40e1b3ef4fc7716b8709 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 26 Sep 2022 19:20:05 +0100 Subject: [PATCH 3/3] Update synapse/app/complement_fork_starter.py --- synapse/app/complement_fork_starter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py index 0ea4ed3ebda1..b22f315453ba 100644 --- a/synapse/app/complement_fork_starter.py +++ b/synapse/app/complement_fork_starter.py @@ -186,7 +186,7 @@ def main() -> None: # wait until the children have exited. def handle_signal(signum: int, frame: Optional[FrameType]) -> None: print( - "complement_fork_starter: Caught signal %i. Stopping children." % signum, + f"complement_fork_starter: Caught signal {signum}. Stopping children.", file=sys.stderr, ) for p in processes: