Skip to content

Commit

Permalink
Add exception handling for BrokenPipeError
Browse files Browse the repository at this point in the history
If we write to the pipe, used to communicate with the apt parent
process, after apt has closed it, Python raises a BrokenPipeError.

This commit adds exception handling to the write function and a
global boolean BROKENPIPE used as exit condition for the reader
threads.

The commit leaves a few TODO notes that suggest better (more
readable) exception handling.
  • Loading branch information
lukpueh committed Dec 20, 2019
1 parent 6f83723 commit 91af874
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions intoto.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,18 @@
# Upon reception we set INTERRUPTED to true, which may be used to gracefully
# terminate.
INTERRUPTED = False
# TODO: Maybe we can replace the signal handler with a KeyboardInterrupt
# try/except block in the main loop, for better readability.
def signal_handler(*junk):
# Set global INTERRUPTED flag telling worker threads to terminate
logger.debug("Received SIGINT, setting global INTERRUPTED true")
global INTERRUPTED
INTERRUPTED = True

# Global BROKENPIPE flag should be set to true, if a `write` or `flush` on a
# stream raises a BrokenPipeError, to gracefully terminate reader threads.
BROKENPIPE = False

# APT Method Interface Message definition
# The first line of each message is called the message header. The first 3
# digits (called the Status Code) have the usual meaning found in the http
Expand Down Expand Up @@ -387,8 +393,11 @@ def read_one(stream):
"""
message_str = ""
# Read from passed stream until apt sends us a SIGINT or EOF (see below)
while not INTERRUPTED: # pragma: no branch
# Read from stream until we get a SIGINT/BROKENPIPE, or reach EOF (see below)
# TODO: Do we need exception handling for the case where we select/read from
# a stream that was closed? If so, we should do it in the main loop for
# better readability.
while not (INTERRUPTED or BROKENPIPE): # pragma: no branch
# Only read if there is data on the stream (non-blocking)
if not select.select([stream], [], [], 0)[0]:
continue
Expand Down Expand Up @@ -418,8 +427,21 @@ def write_one(message_str, stream):
"""Write the passed message to the passed stream.
"""
stream.write(message_str)
stream.flush()
try:
stream.write(message_str)
stream.flush()

except BrokenPipeError:
# TODO: Move exception handling to main loop for better readability
global BROKENPIPE
BROKENPIPE = True
logger.debug("BrokenPipeError while writing '{}' to '{}'.".format(
message_str, stream))
# Python flushes standard streams on exit; redirect remaining output
# to devnull to avoid another BrokenPipeError at shutdown
# See https://docs.python.org/3/library/signal.html#note-on-sigpipe
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())


def notify_apt(code, message_text, uri):
Expand Down Expand Up @@ -713,7 +735,7 @@ def loop():
"""
# Start http transport in a subprocess
# Messages from the parent process received on sys.stdin are relayed to the
# subprocesses stdin and vice versa, messages written to the subprocess's
# subprocess' stdin and vice versa, messages written to the subprocess'
# stdout are relayed to the parent via sys.stdout.
http_proc = subprocess.Popen([APT_METHOD_HTTP], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, universal_newlines=True)
Expand All @@ -730,8 +752,9 @@ def loop():
apt_thread = threading.Thread(target=read_to_queue, args=(sys.stdin,
apt_queue))

# Start reader threads. They will run until they see an EOF on their stream
# or the global INTERRUPTED flag is set to true (on SIGINT from apt).
# Start reader threads.
# They will run until they see an EOF on their stream, or the global
# INTERRUPTED or BROKENPIPE flags are set to true.
http_thread.start()
apt_thread.start()

Expand Down Expand Up @@ -766,16 +789,20 @@ def loop():
logger.debug("Relay message")
write_one(message, out)

# Exit when both threads have terminated (on EOF or INTERRUPTED)
# Exit when both threads have terminated (EOF, INTERRUPTED or BROKENPIPE)
# NOTE: We do not check if there are still messages on the streams or
# in the queue, assuming that there aren't or we can ignore them if both
# threads have terminated.
if (not apt_thread.is_alive() and not http_thread.is_alive()):
logger.debug("The worker threads are dead. Long live the worker threads!"
"Terminating.")

# If apt has sent us a SIGINT we relay it to the subprocess
if INTERRUPTED: # pragma: no branch
# If INTERRUPTED or BROKENPIPE are true it (likely?) means that apt
# sent a SIGINT or closed the pipe we were writing to. This means we
# should exit and tell the http child process to exit too.
# TODO: Could it be that the http child closed a pipe or sent a SITERM?
# TODO: Should we behave differently for the two signals?
if INTERRUPTED or BROKENPIPE: # pragma: no branch
logger.debug("Relay SIGINT to http subprocess")
http_proc.send_signal(signal.SIGINT)

Expand Down

0 comments on commit 91af874

Please sign in to comment.