Skip to content

Commit

Permalink
Merge pull request #2412 from oliver-sanders/duplicate-log-entry-fix
Browse files Browse the repository at this point in the history
Fix redirection of stdout and stderr to suite logs
  • Loading branch information
hjoliver committed Aug 29, 2017
2 parents 95fded2 + 4f3906f commit 6f4e80f
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 40 deletions.
23 changes: 6 additions & 17 deletions lib/cylc/daemonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,6 @@
_TIMEOUT = 300.0 # 5 minutes


def redirect(logd):
"""Redirect standard file descriptors
Note that simply reassigning the sys streams is not sufficient
if we import modules that write to stdin and stdout from C
code - evidently the subprocess module is in this category!
"""
sout = file(os.path.join(logd, SuiteLog.OUT), 'a+', 0) # 0 => unbuffered
serr = file(os.path.join(logd, SuiteLog.ERR), 'a+', 0)
dvnl = file(os.devnull, 'r')
os.dup2(sout.fileno(), sys.stdout.fileno())
os.dup2(serr.fileno(), sys.stderr.fileno())
os.dup2(dvnl.fileno(), sys.stdin.fileno())


def daemonize(server):
"""Turn a cylc scheduler into a Unix daemon.
Expand Down Expand Up @@ -143,5 +128,9 @@ def daemonize(server):
# reset umask, octal
os.umask(022)

# redirect output to the suite log files
redirect(logd)
# Redirect /dev/null to stdin.
# Note that simply reassigning the sys streams is not sufficient
# if we import modules that write to stdin and stdout from C
# code - evidently the subprocess module is in this category!
dvnl = file(os.devnull, 'r')
os.dup2(dvnl.fileno(), sys.stdin.fileno())
9 changes: 6 additions & 3 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,17 @@ def start(self):
self.suite_db_mgr.restart_upgrade()

try:
if not self.options.no_detach and not cylc.flags.debug:
detach = not (self.options.no_detach or cylc.flags.debug)

if detach:
daemonize(self)

# Setup the suite log.
slog = SuiteLog.get_inst(self.suite)
if cylc.flags.debug:
slog.pimp(DEBUG)
slog.pimp(detach, DEBUG)
else:
slog.pimp()
slog.pimp(detach)

self.proc_pool = SuiteProcPool()
self.configure_comms_daemon()
Expand Down
82 changes: 62 additions & 20 deletions lib/cylc/suite_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ def get_logs(directory, basename, absolute_path=True):
return [os.path.basename(log) for log in new_logs + old_logs]


class StreamRedirectRoller(object):
"""Redirect a stream to a rolling file.
Note that simply reassigning the sys streams is not sufficient
if we import modules that write to stdin and stdout from C
code - evidently the subprocess module is in this category!
Args:
stream (file): The stream to redirect.
link_path (str): The path to the symlink pointing at the log file.
"""

def __init__(self, stream, link_path):
self.stream = stream
self.path = link_path

def doRollover(self):
file_handle = file(self.path, 'a+', 0) # 0 => unbuffered
os.dup2(file_handle.fileno(), self.stream.fileno())


class RollingFileHandler(logging.handlers.BaseRotatingHandler):
"""A file handler for log files rotated by symlinking with support for
synchronised rotating of multiple logs."""
Expand Down Expand Up @@ -157,6 +179,7 @@ class RollingFileHandlerGroup(object):

def __init__(self):
self.handlers = {}
self.stream_handlers = []
self.duplicate_handlers = []

def add(self, log):
Expand All @@ -170,6 +193,10 @@ def add(self, log):
else:
self.duplicate_handlers.append(handler)

def add_stream(self, stream):
"""Add a file stream to this group"""
self.stream_handlers.append(stream)

def broadcast_roll(self, origin, *args):
"""Roll all other logs in this group, origin should be the
RollingFileHandler instance that is calling this method."""
Expand All @@ -178,6 +205,8 @@ def broadcast_roll(self, origin, *args):
handler.doRollover(*args)
for handler in self.duplicate_handlers:
handler.notifyRollover()
for handler in self.stream_handlers:
handler.doRollover()

def roll_all(self):
"""Roll all logs in this group. Call with origin = False to roll all
Expand Down Expand Up @@ -250,6 +279,9 @@ def __init__(self, suite, test_params=None):
self.loggers[self.OUT] = None
self.loggers[self.ERR] = None

# File streams
self.streams = []

# Filename stamp functions.
if self.is_test:
self.stamp = lambda: get_current_time_string(True, True, True
Expand Down Expand Up @@ -294,17 +326,17 @@ def get_log_path(self, log):
if log in self.loggers:
return self.log_paths[log]

def pimp(self, log_logger_level=logging.INFO):
def pimp(self, detach=False, log_logger_level=logging.INFO):
"""Initiate the suite logs."""
if not self.loggers[self.LOG]:
# Don't initiate logs if they exist already.
self._create_logs(log_logger_level=log_logger_level)
self._create_logs(detach, log_logger_level=log_logger_level)
self._register_syncronised_logs()
self._group.roll_all()
elif self.roll_at_startup:
self._group.roll_all()

def _create_logs(self, log_logger_level=logging.INFO):
def _create_logs(self, detach, log_logger_level=logging.INFO):
"""Sets up the log files and their file handlers."""
# Logging formatters.
# plain_formatter = logging.Formatter('%(message)s')
Expand Down Expand Up @@ -345,12 +377,6 @@ def _create_logs(self, log_logger_level=logging.INFO):
log_err_fh.setFormatter(iso8601_formatter)
log.addHandler(log_err_fh)

# Output errors to stderr.
log_stderr_fh = logging.StreamHandler(sys.stderr)
log_stderr_fh.setLevel(logging.WARNING)
log_stderr_fh.setFormatter(iso8601_formatter)
log.addHandler(log_stderr_fh)

# --- Create the 'out' logger. ---
out = logging.getLogger(self.OUT)
self.loggers[self.OUT] = out
Expand All @@ -366,12 +392,6 @@ def _create_logs(self, log_logger_level=logging.INFO):
out_fh.setFormatter(iso8601_formatter)
out.addHandler(out_fh)

# Output to stdout.
out_stdout_fh = logging.StreamHandler(sys.stdout)
out_stdout_fh.setLevel(logging.INFO)
out_stdout_fh.setFormatter(iso8601_formatter)
out.addHandler(out_stdout_fh)

# --- Create the 'err' logger. ---
err = logging.getLogger(self.ERR)
self.loggers[self.ERR] = err
Expand All @@ -387,11 +407,31 @@ def _create_logs(self, log_logger_level=logging.INFO):
err_fh.setFormatter(iso8601_formatter)
err.addHandler(err_fh)

# Output to stderr.
err_stderr_fh = logging.StreamHandler(sys.stderr)
err_stderr_fh.setLevel(logging.WARNING)
err_stderr_fh.setFormatter(iso8601_formatter)
err.addHandler(err_stderr_fh)
if detach:
# If we are in detached mode redirect stdout/stderr to the logs.
self.streams = [
StreamRedirectRoller(sys.stdout, self.log_paths[self.OUT]),
StreamRedirectRoller(sys.stderr, self.log_paths[self.ERR])
]
else:
# If we are not in detached mode redirect the logs to
# stdout/stderr:

# LOG: warnings or higher -> stderr
log_stderr_fh = logging.StreamHandler(sys.stderr)
log_stderr_fh.setLevel(logging.WARNING)
log_stderr_fh.setFormatter(iso8601_formatter)
log.addHandler(log_stderr_fh)
# OUT: info or higher -> stdout
out_stdout_fh = logging.StreamHandler(sys.stdout)
out_stdout_fh.setLevel(logging.INFO)
out_stdout_fh.setFormatter(iso8601_formatter)
out.addHandler(out_stdout_fh)
# ERR: warnings or higher -> stderr
err_stderr_fh = logging.StreamHandler(sys.stderr)
err_stderr_fh.setLevel(logging.WARNING)
err_stderr_fh.setFormatter(iso8601_formatter)
err.addHandler(err_stderr_fh)

def _register_syncronised_logs(self):
"""Establishes synchronisation between the logs."""
Expand All @@ -401,6 +441,8 @@ def _register_syncronised_logs(self):
for handler in log.handlers:
if isinstance(handler, RollingFileHandler):
handler.register_syncronised_group(self._group)
for stream in self.streams:
self._group.add_stream(stream)


class ISO8601DateTimeFormatter(logging.Formatter):
Expand Down
46 changes: 46 additions & 0 deletions tests/logging/02-duplicates.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
. $(dirname $0)/test_header
#-------------------------------------------------------------------------------
set_test_number 3
#-------------------------------------------------------------------------------
install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
suite_run_ok "${TEST_NAME_BASE}-run" cylc run "${SUITE_NAME}"
while cylc ping "${SUITE_NAME}" 2>/dev/null; do
sleep 1
done
sleep 8
suite_run_ok "${TEST_NAME_BASE}-restart" cylc restart "${SUITE_NAME}"
while cylc ping "${SUITE_NAME}" 2>/dev/null; do
sleep 1
done
if [[ -e "${SUITE_RUN_DIR}/work/2/pub/test-succeeded" ]]; then
ok "${TEST_NAME_BASE}-check"
else
fail "${TEST_NAME_BASE}-check"
echo 'OUT - Duplicated Entries:' >&2
cat "${SUITE_RUN_DIR}/work/2/pub/out-duplication" >&2
echo 'ERR - Duplicated Entries:' >&2
cat "${SUITE_RUN_DIR}/work/2/pub/err-duplication" >&2
echo 'LOG - Duplicated Entries:' >&2
cat "${SUITE_RUN_DIR}/work/2/pub/log-duplication" >&2
fi

#-------------------------------------------------------------------------------
purge_suite "${SUITE_NAME}"
exit
49 changes: 49 additions & 0 deletions tests/logging/02-duplicates/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[scheduling]
cycling mode = integer
initial cycle point = 1
[[dependencies]]
[[[R1/1]]]
graph = """
foo:fail => bar
foo & bar => restart
"""
[[[R1/2]]]
graph = """
restart[-P1] => foo
foo:fail => bar
foo & bar => pub
"""
[runtime]
[[foo]]
script = false
[[bar]]
script = """
cylc reset "${CYLC_SUITE_NAME}" "foo.${CYLC_TASK_CYCLE_POINT}" -s succeeded
"""
[[restart]]
script = """
cylc stop "${CYLC_SUITE_NAME}"
"""
[[pub]]
script = """
# Extract timestamp lines from logs
for file in $(find "${CYLC_SUITE_RUN_DIR}/log/suite/" -name '*.*'); do
if $(grep -q '.*-.*-.*' "${file}"); then
grep '.*-.*-.*' "${file}" | sort -u > $(basename $file)
else
touch $(basename $file)
fi
done
# Write out duplicate entries to *-duplication files.
sort $(find . -name 'out*') | uniq -d > out-duplication
sort $(find . -name 'err*') | uniq -d > err-duplication
sort $(find . -name 'log*') | uniq -d > log-duplication
# Fail if any of these files contain any content.
if [[ -s out-duplication || -s err-duplication || -s log-duplication ]]; then
touch 'test-failed'
else
touch 'test-succeeded'
fi
"""

0 comments on commit 6f4e80f

Please sign in to comment.