diff --git a/logging/google/cloud/logging/handlers/handlers.py b/logging/google/cloud/logging/handlers/handlers.py index ae66c4516ee9..62ec6c6d561a 100644 --- a/logging/google/cloud/logging/handlers/handlers.py +++ b/logging/google/cloud/logging/handlers/handlers.py @@ -20,7 +20,11 @@ DEFAULT_LOGGER_NAME = 'python' -EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client') +EXCLUDED_LOGGER_DEFAULTS = ( + 'google.cloud', + 'google.auth', + 'google_auth_httplib2', +) class CloudLoggingHandler(logging.StreamHandler): diff --git a/logging/google/cloud/logging/handlers/transports/background_thread.py b/logging/google/cloud/logging/handlers/transports/background_thread.py index 9c8ea85c937a..4b651243be45 100644 --- a/logging/google/cloud/logging/handlers/transports/background_thread.py +++ b/logging/google/cloud/logging/handlers/transports/background_thread.py @@ -17,139 +17,231 @@ Uses a background worker to log to Stackdriver Logging asynchronously. """ +from __future__ import print_function + import atexit import copy +import logging import threading +from six.moves import range +from six.moves import queue + from google.cloud.logging.handlers.transports.base import Transport -_WORKER_THREAD_NAME = 'google.cloud.logging.handlers.transport.Worker' +_DEFAULT_GRACE_PERIOD = 5.0 # Seconds +_DEFAULT_MAX_BATCH_SIZE = 10 +_WORKER_THREAD_NAME = 'google.cloud.logging.Worker' +_WORKER_TERMINATOR = object() +_LOGGER = logging.getLogger(__name__) -class _Worker(object): - """A threaded worker that writes batches of log entries +def _get_many(queue_, max_items=None): + """Get multiple items from a Queue. - Writes entries to the logger API. + Gets at least one (blocking) and at most ``max_items`` items + (non-blocking) from a given Queue. Does not mark the items as done. - This class reuses a single :class:`Batch` method to write successive - entries. + :type queue_: :class:`~queue.Queue` + :param queue_: The Queue to get items from. - Currently, the only public methods are constructing it (which also starts - it) and enqueuing :class:`Logger` (record, message) pairs. + :type max_items: int + :param max_items: The maximum number of items to get. If ``None``, then all + available items in the queue are returned. + + :rtype: Sequence + :returns: A sequence of items retrieved from the queue. """ + # Always return at least one item. + items = [queue_.get()] + while max_items is None or len(items) < max_items: + try: + items.append(queue_.get_nowait()) + except queue.Empty: + break + return items - def __init__(self, logger): - self.started = False - self.stopping = False - self.stopped = False - # _entries_condition is used to signal from the main thread whether - # there are any waiting queued logger entries to be written - self._entries_condition = threading.Condition() +class _Worker(object): + """A background thread that writes batches of log entries. - # _stop_condition is used to signal from the worker thread to the - # main thread that it's finished its last entries - self._stop_condition = threading.Condition() + :type cloud_logger: :class:`~google.cloud.logging.logger.Logger` + :param cloud_logger: The logger to send entries to. - # This object continually reuses the same :class:`Batch` object to - # write multiple entries at the same time. - self.logger = logger - self.batch = self.logger.batch() + :type grace_period: float + :param grace_period: The amount of time to wait for pending logs to + be submitted when the process is shutting down. + :type max_batch_size: int + :param max_batch_size: The maximum number of items to send at a time + in the background thread. + """ + + def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD, + max_batch_size=_DEFAULT_MAX_BATCH_SIZE): + self._cloud_logger = cloud_logger + self._grace_period = grace_period + self._max_batch_size = max_batch_size + self._queue = queue.Queue(0) + self._operational_lock = threading.Lock() self._thread = None - # Number in seconds of how long to wait for worker to send remaining - self._stop_timeout = 5 + @property + def is_alive(self): + """Returns True is the background thread is running.""" + return self._thread is not None and self._thread.is_alive() - self._start() + def _safely_commit_batch(self, batch): + total_logs = len(batch.entries) - def _run(self): + try: + if total_logs > 0: + batch.commit() + _LOGGER.debug('Submitted %d logs', total_logs) + except Exception: + _LOGGER.error( + 'Failed to submit %d logs.', total_logs, exc_info=True) + + def _thread_main(self): """The entry point for the worker thread. - Loops until ``stopping`` is set to :data:`True`, and commits batch - entries written during :meth:`enqueue`. + Pulls pending log entries off the queue and writes them in batches to + the Cloud Logger. """ - try: - self._entries_condition.acquire() - self.started = True - while not self.stopping: - if len(self.batch.entries) == 0: - # branch coverage of this code extremely flaky - self._entries_condition.wait() # pragma: NO COVER - - if len(self.batch.entries) > 0: - self.batch.commit() - finally: - self._entries_condition.release() - - # main thread may be waiting for worker thread to finish writing its - # final entries. here we signal that it's done. - self._stop_condition.acquire() - self._stop_condition.notify() - self._stop_condition.release() - - def _start(self): - """Called by this class's constructor - - This method is responsible for starting the thread and registering - the exit handlers. + _LOGGER.debug('Background thread started.') + + quit_ = False + while True: + batch = self._cloud_logger.batch() + items = _get_many(self._queue, max_items=self._max_batch_size) + + for item in items: + if item is _WORKER_TERMINATOR: + quit_ = True + # Continue processing items, don't break, try to process + # all items we got back before quitting. + else: + batch.log_struct(**item) + + self._safely_commit_batch(batch) + + for _ in range(len(items)): + self._queue.task_done() + + if quit_: + break + + _LOGGER.debug('Background thread exited gracefully.') + + def start(self): + """Starts the background thread. + + Additionally, this registers a handler for process exit to attempt + to send any pending log entries before shutdown. """ - try: - self._entries_condition.acquire() + with self._operational_lock: + if self.is_alive: + return + self._thread = threading.Thread( - target=self._run, name=_WORKER_THREAD_NAME) - self._thread.setDaemon(True) + target=self._thread_main, + name=_WORKER_THREAD_NAME) + self._thread.daemon = True self._thread.start() - finally: - self._entries_condition.release() - atexit.register(self._stop) + atexit.register(self._main_thread_terminated) + + def stop(self, grace_period=None): + """Signals the background thread to stop. - def _stop(self): - """Signals the worker thread to shut down + This does not terminate the background thread. It simply queues the + stop signal. If the main process exits before the background thread + processes the stop signal, it will be terminated without finishing + work. The ``grace_period`` parameter will give the background + thread some time to finish processing before this function returns. - Also waits for ``stop_timeout`` seconds for the worker to finish. + :type grace_period: float + :param grace_period: If specified, this method will block up to this + many seconds to allow the background thread to finish work before + returning. - This method is called by the ``atexit`` handler registered by - :meth:`start`. + :rtype: bool + :returns: True if the thread terminated. False if the thread is still + running. """ - if not self.started or self.stopping: - return + if not self.is_alive: + return True + + with self._operational_lock: + self._queue.put_nowait(_WORKER_TERMINATOR) + + if grace_period is not None: + print('Waiting up to %d seconds.' % (grace_period,)) - # lock the stop condition first so that the worker - # thread can't notify it's finished before we wait - self._stop_condition.acquire() + self._thread.join(timeout=grace_period) - # now notify the worker thread to shutdown - self._entries_condition.acquire() - self.stopping = True - self._entries_condition.notify() - self._entries_condition.release() + # Check this before disowning the thread, because after we disown + # the thread is_alive will be False regardless of if the thread + # exited or not. + success = not self.is_alive - # now wait for it to signal it's finished - self._stop_condition.wait(self._stop_timeout) - self._stop_condition.release() - self.stopped = True + self._thread = None + + return success + + def _main_thread_terminated(self): + """Callback that attempts to send pending logs before termination.""" + if not self.is_alive: + return + + if not self._queue.empty(): + print( + 'Program shutting down, attempting to send %d queued log ' + 'entries to Stackdriver Logging...' % (self._queue.qsize(),)) + + if self.stop(self._grace_period): + print('Sent all pending logs.') + else: + print('Failed to send %d pending logs.' % (self._queue.qsize(),)) def enqueue(self, record, message): - """Queues up a log entry to be written by the background thread.""" - try: - self._entries_condition.acquire() - if self.stopping: - return - info = {'message': message, 'python_logger': record.name} - self.batch.log_struct(info, severity=record.levelname) - self._entries_condition.notify() - finally: - self._entries_condition.release() + """Queues a log entry to be written by the background thread. + + :type record: :class:`logging.LogRecord` + :param record: Python log record that the handler was called with. + + :type message: str + :param message: The message from the ``LogRecord`` after being + formatted by the associated log formatters. + """ + self._queue.put_nowait({ + 'info': { + 'message': message, + 'python_logger': record.name, + }, + 'severity': record.levelname, + }) class BackgroundThreadTransport(Transport): - """Aysnchronous transport that uses a background thread. + """Asynchronous transport that uses a background thread. + + :type client: :class:`~google.cloud.logging.client.Client` + :param client: The Logging client. + + :type name: str + :param name: the name of the logger. + + :type grace_period: float + :param grace_period: The amount of time to wait for pending logs to + be submitted when the process is shutting down. - Writes logging entries as a batch process. + :type batch_size: int + :param batch_size: The maximum number of items to send at a time in the + background thread. """ - def __init__(self, client, name): + def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD, + batch_size=_DEFAULT_MAX_BATCH_SIZE): http = copy.deepcopy(client._http) self.client = client.__class__( client.project, client._credentials, http) diff --git a/logging/nox.py b/logging/nox.py index 7f6447c56924..5d4751a955a5 100644 --- a/logging/nox.py +++ b/logging/nox.py @@ -39,7 +39,7 @@ def unit_tests(session, python_version): 'py.test', '--quiet', '--cov=google.cloud.logging', '--cov=tests.unit', '--cov-append', '--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97', - 'tests/unit', + 'tests/unit', *session.posargs ) @@ -63,7 +63,7 @@ def system_tests(session, python_version): session.install('.') # Run py.test against the system tests. - session.run('py.test', '-vvv', 'tests/system.py') + session.run('py.test', '-vvv', 'tests/system.py', *session.posargs) @nox.session diff --git a/logging/tests/unit/handlers/transports/test_background_thread.py b/logging/tests/unit/handlers/transports/test_background_thread.py index 3952a0b6422c..177c30e10863 100644 --- a/logging/tests/unit/handlers/transports/test_background_thread.py +++ b/logging/tests/unit/handlers/transports/test_background_thread.py @@ -13,12 +13,12 @@ # limitations under the License. import logging -import time import unittest +import mock -class TestBackgroundThreadHandler(unittest.TestCase): +class TestBackgroundThreadHandler(unittest.TestCase): PROJECT = 'PROJECT' @staticmethod @@ -29,36 +29,41 @@ def _get_target_class(): return BackgroundThreadTransport def _make_one(self, *args, **kw): - return self._get_target_class()(*args, **kw) - - def test_ctor(self): + worker_patch = mock.patch( + 'google.cloud.logging.handlers.transports.' + 'background_thread._Worker', + autospec=True) + with worker_patch as worker_mock: + return self._get_target_class()(*args, **kw), worker_mock + + def test_constructor(self): client = _Client(self.PROJECT) - NAME = 'python_logger' - transport = self._make_one(client, NAME) - self.assertEqual(transport.worker.logger.name, NAME) + name = 'python_logger' + + transport, worker = self._make_one(client, name) + + logger, = worker.call_args[0] # call_args[0] is *args. + self.assertEqual(logger.name, name) def test_send(self): client = _Client(self.PROJECT) - NAME = 'python_logger' - transport = self._make_one(client, NAME) - transport.worker.batch = client.logger(NAME).batch() + name = 'python_logger' + + transport, _ = self._make_one(client, name) python_logger_name = 'mylogger' message = 'hello world' - record = logging.LogRecord(python_logger_name, logging.INFO, - None, None, message, None, None) + record = logging.LogRecord( + python_logger_name, logging.INFO, + None, None, message, None, None) + transport.send(record, message) - EXPECTED_STRUCT = { - 'message': message, - 'python_logger': python_logger_name - } - EXPECTED_SENT = (EXPECTED_STRUCT, 'INFO') - self.assertEqual(transport.worker.batch.log_struct_called_with, - EXPECTED_SENT) + transport.worker.enqueue.assert_called_once_with(record, message) -class TestWorker(unittest.TestCase): +class Test_Worker(unittest.TestCase): + NAME = 'python_logger' @staticmethod def _get_target_class(): @@ -69,79 +74,188 @@ def _get_target_class(): def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) - def test_ctor(self): - NAME = 'python_logger' - logger = _Logger(NAME) - worker = self._make_one(logger) - self.assertEqual(worker.batch, logger._batch) + def _start_with_thread_patch(self, worker): + with mock.patch('threading.Thread', new=_Thread) as thread_mock: + with mock.patch('atexit.register') as atexit_mock: + worker.start() + return thread_mock, atexit_mock - def test_run(self): - NAME = 'python_logger' - logger = _Logger(NAME) - worker = self._make_one(logger) + def test_constructor(self): + logger = _Logger(self.NAME) + grace_period = 50 + max_batch_size = 50 - python_logger_name = 'mylogger' - message = 'hello world' - record = logging.LogRecord(python_logger_name, - logging.INFO, None, None, - message, None, None) + worker = self._make_one( + logger, grace_period=grace_period, max_batch_size=max_batch_size) - worker._start() + self.assertEqual(worker._cloud_logger, logger) + self.assertEqual(worker._grace_period, grace_period) + self.assertEqual(worker._max_batch_size, max_batch_size) + self.assertFalse(worker.is_alive) + self.assertIsNone(worker._thread) - # first sleep is for branch coverage - ensure condition - # where queue is empty occurs - time.sleep(1) - # second polling is to avoid starting/stopping worker - # before anything ran - while not worker.started: - time.sleep(1) # pragma: NO COVER + def test_start(self): + from google.cloud.logging.handlers.transports import background_thread - worker.enqueue(record, message) - # Set timeout to none so worker thread finishes - worker._stop_timeout = None - worker._stop() - self.assertTrue(worker.batch.commit_called) + worker = self._make_one(_Logger(self.NAME)) - def test_run_after_stopped(self): - # No-op - name = 'python_logger' - logger = _Logger(name) - worker = self._make_one(logger) + _, atexit_mock = self._start_with_thread_patch(worker) - python_logger_name = 'mylogger' - message = 'hello world' - record = logging.LogRecord(python_logger_name, - logging.INFO, None, None, - message, None, None) - - worker._start() - while not worker.started: - time.sleep(1) # pragma: NO COVER - worker._stop_timeout = None - worker._stop() - worker.enqueue(record, message) - self.assertFalse(worker.batch.commit_called) - worker._stop() + self.assertTrue(worker.is_alive) + self.assertIsNotNone(worker._thread) + self.assertTrue(worker._thread.daemon) + self.assertEqual(worker._thread._target, worker._thread_main) + self.assertEqual( + worker._thread._name, background_thread._WORKER_THREAD_NAME) + atexit_mock.assert_called_once_with(worker._main_thread_terminated) - def test_run_enqueue_early(self): - # No-op - NAME = 'python_logger' - logger = _Logger(NAME) - worker = self._make_one(logger) + # Calling start again should not start a new thread. + current_thread = worker._thread + self._start_with_thread_patch(worker) + self.assertIs(current_thread, worker._thread) - python_logger_name = 'mylogger' - message = 'hello world' - record = logging.LogRecord(python_logger_name, - logging.INFO, None, None, - message, None, None) + def test_stop(self): + from google.cloud.logging.handlers.transports import background_thread + + grace_period = 5.0 + worker = self._make_one(_Logger(self.NAME)) + + self._start_with_thread_patch(worker) + thread = worker._thread + + worker.stop(grace_period) + + self.assertEqual(worker._queue.qsize(), 1) + self.assertEqual( + worker._queue.get(), background_thread._WORKER_TERMINATOR) + self.assertFalse(worker.is_alive) + self.assertIsNone(worker._thread) + self.assertEqual(thread._timeout, grace_period) + + # Stopping twice should not be an error + worker.stop() + + def test_stop_no_grace(self): + worker = self._make_one(_Logger(self.NAME)) + + self._start_with_thread_patch(worker) + thread = worker._thread + + worker.stop() + + self.assertEqual(thread._timeout, None) + + def test__main_thread_terminated(self): + worker = self._make_one(_Logger(self.NAME)) + + self._start_with_thread_patch(worker) + worker._main_thread_terminated() + + self.assertFalse(worker.is_alive) + + # Calling twice should not be an error + worker._main_thread_terminated() + + def test__main_thread_terminated_non_empty_queue(self): + worker = self._make_one(_Logger(self.NAME)) + self._start_with_thread_patch(worker) + worker.enqueue(mock.Mock(), '') + worker._main_thread_terminated() + + self.assertFalse(worker.is_alive) + + def test__main_thread_terminated_did_not_join(self): + worker = self._make_one(_Logger(self.NAME)) + + self._start_with_thread_patch(worker) + worker._thread._terminate_on_join = False + worker.enqueue(mock.Mock(), '') + worker._main_thread_terminated() + + self.assertFalse(worker.is_alive) + + @staticmethod + def _enqueue_record(worker, message): + record = logging.LogRecord( + 'python_logger', logging.INFO, + None, None, message, None, None) worker.enqueue(record, message) - worker._start() - while not worker.started: - time.sleep(1) # pragma: NO COVER - worker._stop_timeout = None - worker._stop() - self.assertTrue(worker.stopped) + + def test__thread_main(self): + from google.cloud.logging.handlers.transports import background_thread + + worker = self._make_one(_Logger(self.NAME)) + + # Enqueue two records and the termination signal. + self._enqueue_record(worker, '1') + self._enqueue_record(worker, '2') + worker._queue.put_nowait(background_thread._WORKER_TERMINATOR) + + worker._thread_main() + + self.assertTrue(worker._cloud_logger._batch.commit_called) + self.assertEqual(worker._cloud_logger._batch.commit_count, 2) + self.assertEqual(worker._queue.qsize(), 0) + + def test__thread_main_error(self): + from google.cloud.logging.handlers.transports import background_thread + + worker = self._make_one(_Logger(self.NAME)) + worker._cloud_logger._batch_cls = _RaisingBatch + + # Enqueue one record and the termination signal. + self._enqueue_record(worker, '1') + worker._queue.put_nowait(background_thread._WORKER_TERMINATOR) + + worker._thread_main() + + self.assertTrue(worker._cloud_logger._batch.commit_called) + self.assertEqual(worker._queue.qsize(), 0) + + def test__thread_main_batches(self): + from google.cloud.logging.handlers.transports import background_thread + + worker = self._make_one(_Logger(self.NAME), max_batch_size=2) + + # Enqueue three records and the termination signal. This should be + # enough to perform two separate batches and a third loop with just + # the exit. + self._enqueue_record(worker, '1') + self._enqueue_record(worker, '2') + self._enqueue_record(worker, '3') + self._enqueue_record(worker, '4') + worker._queue.put_nowait(background_thread._WORKER_TERMINATOR) + + worker._thread_main() + + # The last batch should not have been executed because it had no items. + self.assertFalse(worker._cloud_logger._batch.commit_called) + self.assertEqual(worker._queue.qsize(), 0) + + +class _Thread(object): + + def __init__(self, target, name): + self._target = target + self._name = name + self._timeout = None + self._terminate_on_join = True + self.daemon = False + + def is_alive(self): + return self._is_alive + + def start(self): + self._is_alive = True + + def stop(self): + self._is_alive = False + + def join(self, timeout=None): + self._timeout = timeout + if self._terminate_on_join: + self.stop() class _Batch(object): @@ -149,23 +263,33 @@ class _Batch(object): def __init__(self): self.entries = [] self.commit_called = False + self.commit_count = None - def log_struct(self, record, severity=logging.INFO): - self.log_struct_called_with = (record, severity) - self.entries.append(record) + def log_struct(self, info, severity=logging.INFO): + self.log_struct_called_with = (info, severity) + self.entries.append(info) def commit(self): self.commit_called = True + self.commit_count = len(self.entries) del self.entries[:] +class _RaisingBatch(_Batch): + def commit(self): + self.commit_called = True + raise ValueError('This batch raises on commit.') + + class _Logger(object): def __init__(self, name): self.name = name + self._batch_cls = _Batch + self._batch = None def batch(self): - self._batch = _Batch() + self._batch = self._batch_cls() return self._batch