diff --git a/playhouse/pool.py b/playhouse/pool.py index 2ee3b486f..db4248c8d 100644 --- a/playhouse/pool.py +++ b/playhouse/pool.py @@ -31,9 +31,11 @@ class Meta: That's it! """ +import functools import heapq import logging import random +import threading import time from collections import namedtuple from itertools import chain @@ -67,6 +69,14 @@ class MaxConnectionsExceeded(ValueError): pass 'checked_out')) +def locked(fn): + @functools.wraps(fn) + def inner(self, *args, **kwargs): + with self._pool_lock: + return fn(self, *args, **kwargs) + return inner + + class PooledDatabase(object): def __init__(self, database, max_connections=20, stale_timeout=None, timeout=None, **kwargs): @@ -76,6 +86,8 @@ def __init__(self, database, max_connections=20, stale_timeout=None, if self._wait_timeout == 0: self._wait_timeout = float('inf') + self._pool_lock = threading.RLock() + # Available / idle connections stored in a heap, sorted oldest first. self._connections = [] @@ -119,6 +131,7 @@ def connect(self, reuse_if_open=False): raise MaxConnectionsExceeded('Max connections exceeded, timed out ' 'attempting to connect.') + @locked def _connect(self): while True: try: @@ -154,7 +167,7 @@ def _connect(self): len(self._in_use) >= self._max_connections): raise MaxConnectionsExceeded('Exceeded maximum connections.') conn = super(PooledDatabase, self)._connect() - ts = time.time() - random.random() / 1000 + ts = time.time() key = self.conn_key(conn) logger.debug('Created new connection %s.', key) @@ -173,6 +186,7 @@ def _can_reuse(self, conn): # Called on check-in to make sure the connection can be re-used. return True + @locked def _close(self, conn, close_conn=False): key = self.conn_key(conn) if close_conn: @@ -188,6 +202,7 @@ def _close(self, conn, close_conn=False): else: logger.debug('Closed %s.', key) + @locked def manual_close(self): """ Close the underlying connection without returning it to the pool. @@ -206,40 +221,40 @@ def manual_close(self): self.close() self._close(conn, close_conn=True) + @locked def close_idle(self): # Close any open connections that are not currently in-use. - with self._lock: - for _, conn in self._connections: - self._close(conn, close_conn=True) - self._connections = [] + for _, conn in self._connections: + self._close(conn, close_conn=True) + self._connections = [] + @locked def close_stale(self, age=600): # Close any connections that are in-use but were checked out quite some # time ago and can be considered stale. - with self._lock: - in_use = {} - cutoff = time.time() - age - n = 0 - for key, pool_conn in self._in_use.items(): - if pool_conn.checked_out < cutoff: - self._close(pool_conn.connection, close_conn=True) - n += 1 - else: - in_use[key] = pool_conn - self._in_use = in_use + in_use = {} + cutoff = time.time() - age + n = 0 + for key, pool_conn in self._in_use.items(): + if pool_conn.checked_out < cutoff: + self._close(pool_conn.connection, close_conn=True) + n += 1 + else: + in_use[key] = pool_conn + self._in_use = in_use return n + @locked def close_all(self): # Close all connections -- available and in-use. Warning: may break any # active connections used by other threads. self.close() - with self._lock: - for _, conn in self._connections: - self._close(conn, close_conn=True) - for pool_conn in self._in_use.values(): - self._close(pool_conn.connection, close_conn=True) - self._connections = [] - self._in_use = {} + for _, conn in self._connections: + self._close(conn, close_conn=True) + for pool_conn in self._in_use.values(): + self._close(pool_conn.connection, close_conn=True) + self._connections = [] + self._in_use = {} class PooledMySQLDatabase(PooledDatabase, MySQLDatabase):