Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging background thread transport #3407

Merged
merged 6 commits into from
May 12, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 74 additions & 67 deletions logging/google/cloud/logging/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
Uses a background worker to log to Stackdriver Logging asynchronously.
"""

from __future__ import print_function

import atexit

This comment was marked as spam.

This comment was marked as spam.

import copy
import logging
import threading

from six.moves import range
from six.moves import queue

from google.cloud.logging.handlers.transports.base import Transport
Expand All @@ -33,14 +36,14 @@
_LOGGER = logging.getLogger(__name__)


def _get_many(q, max_items=None):
def _get_many(queue_, max_items=None):
"""Get multiple items from a Queue.

Gets at least one (blocking) and at most :param:`max_items` items
Gets at least one (blocking) and at most ``max_items`` items
(non-blocking) from a given Queue. Does not mark the items as done.

:type q: ~queue.Queue
:param q: The Queue to get items from.
:type queue_: :class:`~queue.Queue`
:param queue_: The Queue to get items from.

:type max_items: int
:param max_items: The maximum number of items to get. If ``None``, then all
Expand All @@ -50,46 +53,54 @@ def _get_many(q, max_items=None):
:returns: A sequence of items retrieved from the queue.
"""
# Always return at least one item.
items = [q.get()]
items = [queue_.get()]
while max_items is None or len(items) < max_items:

This comment was marked as spam.

This comment was marked as spam.

try:
items.append(q.get_nowait())
items.append(queue_.get_nowait())
except queue.Empty:
break
return items


class _Worker(object):
"""A background thread that writes batches of log entries."""
"""A background thread that writes batches of log entries.

def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""
The background thread is started automatically.
:type cloud_logger: ~google.cloud.logging.logger.Logger

This comment was marked as spam.

This comment was marked as spam.

:param cloud_logger: The logger to send entries to.

:type cloud_logger: ~google.cloud.logging.logger.Logger
:param cloud_logger: The logger to send entries to.
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.

:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.
:type max_batch_size: int
:param max_batch_size: The maximum number of items to send at a time
in the background thread.
"""

:type max_batch_size: int
:param max_batch_size: The maximum number of items to send at a time
in the background thread.
"""
def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._max_batch_size = max_batch_size
self._queue = queue.Queue(-1)
self._queue = queue.Queue(0)

This comment was marked as spam.

This comment was marked as spam.

self._operational_lock = threading.Lock()
self._thread = None
self.start()

@property
def is_alive(self):
"""Returns True is the background thread is running."""
return self._thread and self._thread.is_alive()
return self._thread is not None and self._thread.is_alive()

def _safely_commit_batch(self, batch):
total_logs = len(batch.entries)

try:
if total_logs > 0:
batch.commit()
_LOGGER.debug('Submitted %d logs', total_logs)
except Exception:
_LOGGER.error(
'Failed to submit %d logs.', total_logs, exc_info=True)

def _thread_main(self):

This comment was marked as spam.

This comment was marked as spam.

"""The entry point for the worker thread.
Expand All @@ -99,35 +110,25 @@ def _thread_main(self):
"""
_LOGGER.debug('Background thread started.')

quit = False
quit_ = False
while True:
batch = self._cloud_logger.batch()
items = _get_many(self._queue, max_items=self._max_batch_size)

for item in items:
if item is _WORKER_TERMINATOR:
quit = True
# Continue, don't break, try to process all items we got
# back before quitting.
continue

batch.log_struct(**item)

total_logs = len(batch.entries)

try:
if total_logs:
batch.commit()
_LOGGER.debug('Submitted %d logs', total_logs)
except Exception:
_LOGGER.error(
'Failed to submit %d logs.', total_logs, exc_info=True)
finally:
# Mark all collected tasks as done.
for n in range(len(items)):
self._queue.task_done()

if quit:
quit_ = True
# Continue processing items, don't break, try to process
# all items we got back before quitting.
else:
batch.log_struct(**item)

self._safely_commit_batch(batch)

for _ in range(len(items)):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self._queue.task_done()

if quit_:
break

_LOGGER.debug('Background thread exited gracefully.')

This comment was marked as spam.

This comment was marked as spam.

Expand All @@ -145,7 +146,7 @@ def start(self):
self._thread = threading.Thread(
target=self._thread_main,
name=_WORKER_THREAD_NAME)
self._thread.setDaemon(True)
self._thread.daemon = True
self._thread.start()
atexit.register(self._main_thread_terminated)

Expand All @@ -155,7 +156,7 @@ def stop(self, grace_period=None):
This does not terminate the background thread. It simply queues the
stop signal. If the main process exits before the background thread
processes the stop signal, it will be terminated without finishing
work. The :param:`grace_period` parameter will give the background
work. The ``grace_period`` parameter will give the background
thread some time to finish processing before this function returns.

:type grace_period: float
Expand All @@ -174,11 +175,15 @@ def stop(self, grace_period=None):
self._queue.put_nowait(_WORKER_TERMINATOR)

if grace_period is not None:
print('Waiting up to %d seconds.' % grace_period)
print('Waiting up to %d seconds.' % (grace_period,))

self._thread.join(timeout=grace_period)

# Check this before disowning the thread, because after we disown
# the thread is_alive will be False regardless of if the thread
# exited or not.
success = not self.is_alive

This comment was marked as spam.

This comment was marked as spam.


self._thread = None

return success
Expand All @@ -191,12 +196,12 @@ def _main_thread_terminated(self):
if not self._queue.empty():
print(
'Program shutting down, attempting to send %d queued log '
'entries to Stackdriver Logging...' % self._queue.qsize())
'entries to Stackdriver Logging...' % (self._queue.qsize(),))

if self.stop(self._grace_period):
print('Sent all pending logs.')
else:
print('Failed to send %d pending logs.' % self._queue.qsize())
print('Failed to send %d pending logs.' % (self._queue.qsize(),))

def enqueue(self, record, message):
"""Queues a log entry to be written by the background thread.
Expand All @@ -211,31 +216,33 @@ def enqueue(self, record, message):
self._queue.put_nowait({

This comment was marked as spam.

This comment was marked as spam.

'info': {
'message': message,
'python_logger': record.name
'python_logger': record.name,
},
'severity': record.levelname
'severity': record.levelname,
})


class BackgroundThreadTransport(Transport):
"""Asynchronous transport that uses a background thread."""
"""Asynchronous transport that uses a background thread.

def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""
:type client: ~google.cloud.logging.client.Client
:param client: The Logging client.
:type client: ~google.cloud.logging.client.Client

This comment was marked as spam.

This comment was marked as spam.

:param client: The Logging client.

:type name: str
:param name: the name of the logger.
:type name: str
:param name: the name of the logger.

:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.

:type batch_size: int
:param batch_size: The maximum number of items to send at a time in the
background thread.
"""

:type batch_size: int
:param batch_size: The maximum number of items to send at a time in the
background thread.
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""

This comment was marked as spam.

This comment was marked as spam.

"""
http = copy.deepcopy(client._http)
self.client = client.__class__(
Expand Down
Loading