Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completing implementation of HappyBase connection pool. #1523

Merged
merged 1 commit into from
Feb 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions gcloud/bigtable/happybase/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Google Cloud Bigtable HappyBase pool module."""


import contextlib
import threading

import six
Expand All @@ -27,6 +28,14 @@
"""Minimum allowable size of a connection pool."""


class NoConnectionsAvailable(RuntimeError):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Exception raised when no connections are available.

This happens if a timeout was specified when obtaining a connection,
and no connection became available within the specified timeout.
"""


class ConnectionPool(object):
"""Thread-safe connection pool.

Expand Down Expand Up @@ -71,3 +80,72 @@ def __init__(self, size, **kwargs):
for _ in six.moves.range(size):
connection = Connection(**connection_kwargs)
self._queue.put(connection)

def _acquire_connection(self, timeout=None):
"""Acquire a connection from the pool.

:type timeout: int
:param timeout: (Optional) Time (in seconds) to wait for a connection
to open.

:rtype: :class:`.Connection`
:returns: An active connection from the queue stored on the pool.
:raises: :class:`NoConnectionsAvailable` if ``Queue.get`` fails
before the ``timeout`` (only if a timeout is specified).
"""
try:
return self._queue.get(block=True, timeout=timeout)
except six.moves.queue.Empty:
raise NoConnectionsAvailable('No connection available from pool '
'within specified timeout')

@contextlib.contextmanager
def connection(self, timeout=None):
"""Obtain a connection from the pool.

Must be used as a context manager, for example::

with pool.connection() as connection:
pass # do something with the connection

If ``timeout`` is omitted, this method waits forever for a connection
to become available.

:type timeout: int
:param timeout: (Optional) Time (in seconds) to wait for a connection
to open.

:rtype: :class:`.Connection`
:returns: An active connection from the pool.
:raises: :class:`NoConnectionsAvailable` if no connection can be
retrieved from the pool before the ``timeout`` (only if
a timeout is specified).
"""
connection = getattr(self._thread_connections, 'current', None)

retrieved_new_cnxn = False
if connection is None:
# In this case we need to actually grab a connection from the
# pool. After retrieval, the connection is stored on a thread
# local so that nested connection requests from the same
# thread can re-use the same connection instance.
#
# NOTE: This code acquires a lock before assigning to the
# thread local; see
# ('https://emptysqua.re/blog/'
# 'another-thing-about-pythons-threadlocals/')
retrieved_new_cnxn = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection

# This is a no-op for connections that have already been opened
# since they just call Client.start().
connection.open()
yield connection

# Remove thread local reference after the outermost 'with' block
# ends. Afterwards the thread no longer owns the connection.
if retrieved_new_cnxn:
del self._thread_connections.current
self._queue.put(connection)
102 changes: 102 additions & 0 deletions gcloud/bigtable/happybase/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,83 @@ def test_constructor_non_positive_size(self):
with self.assertRaises(ValueError):
self._makeOne(size)

def _makeOneWithMockQueue(self, queue_return):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import pool as MUT

# We are going to use a fake queue, so we don't want any connections
# or clusters to be created in the constructor.
size = -1
cluster = object()
with _Monkey(MUT, _MIN_POOL_SIZE=size):
pool = self._makeOne(size, cluster=cluster)

pool._queue = _Queue(queue_return)
return pool

def test__acquire_connection(self):
queue_return = object()
pool = self._makeOneWithMockQueue(queue_return)

timeout = 432
connection = pool._acquire_connection(timeout=timeout)
self.assertTrue(connection is queue_return)
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls, [])

def test__acquire_connection_failure(self):
from gcloud.bigtable.happybase.pool import NoConnectionsAvailable

pool = self._makeOneWithMockQueue(None)
timeout = 1027
with self.assertRaises(NoConnectionsAvailable):
pool._acquire_connection(timeout=timeout)
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls, [])

def test_connection_is_context_manager(self):
import contextlib
import six

queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
cnxn_context = pool.connection()
if six.PY3: # pragma: NO COVER
self.assertTrue(isinstance(cnxn_context,
contextlib._GeneratorContextManager))
else:
self.assertTrue(isinstance(cnxn_context,
contextlib.GeneratorContextManager))

def test_connection_no_current_cnxn(self):
queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
timeout = 55

self.assertFalse(hasattr(pool._thread_connections, 'current'))
with pool.connection(timeout=timeout) as connection:
self.assertEqual(pool._thread_connections.current, queue_return)
self.assertTrue(connection is queue_return)
self.assertFalse(hasattr(pool._thread_connections, 'current'))

self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls,
[(queue_return, None, None)])

def test_connection_with_current_cnxn(self):
current_cnxn = _Connection()
queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
pool._thread_connections.current = current_cnxn
timeout = 8001

with pool.connection(timeout=timeout) as connection:
self.assertTrue(connection is current_cnxn)

self.assertEqual(pool._queue._get_calls, [])
self.assertEqual(pool._queue._put_calls, [])
self.assertEqual(pool._thread_connections.current, current_cnxn)


class _Client(object):

Expand All @@ -147,6 +224,12 @@ def stop(self):
self.stop_calls += 1


class _Connection(object):

def open(self):
pass


class _Cluster(object):

def __init__(self, copies=()):
Expand All @@ -161,3 +244,22 @@ def copy(self):
return result
else:
return self


class _Queue(object):

def __init__(self, result=None):
self.result = result
self._get_calls = []
self._put_calls = []

def get(self, block=None, timeout=None):
self._get_calls.append((block, timeout))
if self.result is None:
import six
raise six.moves.queue.Empty
else:
return self.result

def put(self, item, block=None, timeout=None):
self._put_calls.append((item, block, timeout))