From db53e06569ea7612ac1feea7efa9bc1e2ff68c1c Mon Sep 17 00:00:00 2001 From: Ian Dees Date: Tue, 31 Jan 2017 16:47:56 -0500 Subject: [PATCH 1/3] Back out connection pool changes --- tilequeue/command.py | 2 +- tilequeue/postgresql.py | 96 ++++++++++++++++------------------------- tilequeue/query.py | 8 ++-- 3 files changed, 42 insertions(+), 64 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index b60ae7b3..7f864998 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -533,7 +533,7 @@ def tilequeue_process(cfg, peripherals): io_pool = ThreadPool(n_io_workers) feature_fetcher = DataFetcher(cfg.postgresql_conn_info, all_layer_data, - io_pool, n_layers, n_total_needed_query) + io_pool, n_layers) # create all queues used to manage pipeline diff --git a/tilequeue/postgresql.py b/tilequeue/postgresql.py index cc6fb219..de65360b 100644 --- a/tilequeue/postgresql.py +++ b/tilequeue/postgresql.py @@ -1,69 +1,47 @@ from itertools import cycle -from psycopg2.pool import ThreadedConnectionPool from psycopg2.extras import register_hstore, register_json +import psycopg2 import threading -import ujson as json -class DatabaseCycleConnectionPool(object): - - """ - Maintains a psycopg2 ThreadedConnectionPool for each of the - given dbnames. When a client requests a set of connections, - all of those connections will come from the same database. - """ - - def __init__(self, min_conns_per_db, max_conns_per_db, dbnames, conn_info): - self._pools = [] - self._conns_to_pool = {} - - for dbname in dbnames: - pool = ThreadedConnectionPool( - min_conns_per_db, - max_conns_per_db, - dbname=dbname, - **conn_info - ) - self._pools.append(pool) - - self._pool_cycle = cycle(self._pools) - self._lock = threading.Lock() - - def get_conns(self, n_conns): - conns = [] - - try: - with self._lock: - pool_to_use = next(self._pool_cycle) - for _ in range(n_conns): - conn = pool_to_use.getconn() - - conn.set_session(readonly=True, autocommit=True) - register_json(conn, loads=json.loads, globally=True) - register_hstore(conn, globally=True) - - self._conns_to_pool[id(conn)] = pool_to_use - conns.append(conn) - assert len(conns) == n_conns, \ - "Couldn't collect enough connections" - except: - if conns: - self.put_conns(conns) - conns = [] - raise - +class DBAffinityConnectionsNoLimit(object): + + # Similar to the db affinity pool, but without keeping track of + # the connections. It's the caller's responsibility to call us + # back with the connection objects so that we can close them. + + def __init__(self, dbnames, conn_info): + self.dbnames = dbnames + self.conn_info = conn_info + self.conn_mapping = {} + self.lock = threading.Lock() + self.dbname_index = 0 + + def _make_conn(self, conn_info): + conn = psycopg2.connect(**conn_info) + conn.set_session(readonly=True, autocommit=True) + register_hstore(conn) + register_json(conn) + return conn + + def get_conns(self, n_conn): + with self.lock: + dbname = self.dbnames[self.dbname_index] + self.dbname_index += 1 + if self.dbname_index >= len(self.dbnames): + self.dbname_index = 0 + conn_info_with_db = dict(self.conn_info, dbname=dbname) + conns = [self._make_conn(conn_info_with_db) + for i in range(n_conn)] return conns def put_conns(self, conns): - with self._lock: - for conn in conns: - pool = self._conns_to_pool.pop(id(conn), None) - assert pool is not None, \ - "Couldn't find the pool for connection" - pool.putconn(conn) + for conn in conns: + try: + conn.close() + except: + pass def closeall(self): - with self._lock: - for pool in self._pools: - pool.closeall() - self._conns_to_pool.clear() + raise Exception('DBAffinityConnectionsNoLimit pool does not track ' + 'connections') diff --git a/tilequeue/query.py b/tilequeue/query.py index f28c1f28..2b655886 100644 --- a/tilequeue/query.py +++ b/tilequeue/query.py @@ -1,5 +1,5 @@ from psycopg2.extras import RealDictCursor -from tilequeue.postgresql import DatabaseCycleConnectionPool +from tilequeue.postgresql import DBAffinityConnectionsNoLimit from tilequeue.tile import calc_meters_per_pixel_dim from tilequeue.tile import coord_to_mercator_bounds from tilequeue.transform import calculate_padded_bounds @@ -150,15 +150,15 @@ def enqueue_queries(sql_conns, thread_pool, layer_data, zoom, unpadded_bounds): class DataFetcher(object): - def __init__(self, conn_info, layer_data, io_pool, n_conn, max_conn): + def __init__(self, conn_info, layer_data, io_pool, n_conn): self.conn_info = dict(conn_info) self.layer_data = layer_data self.io_pool = io_pool self.dbnames = self.conn_info.pop('dbnames') self.dbnames_query_index = 0 - self.sql_conn_pool = DatabaseCycleConnectionPool( - n_conn, max_conn, self.dbnames, self.conn_info) + self.sql_conn_pool = DBAffinityConnectionsNoLimit( + self.dbnames, self.conn_info) self.n_conn = n_conn def __call__(self, coord, layer_data=None): From 5d6fdda11b3a37c225c0f98d59247f7243c3a9f2 Mon Sep 17 00:00:00 2001 From: Ian Dees Date: Tue, 31 Jan 2017 16:48:27 -0500 Subject: [PATCH 2/3] Use ujson for the psql json column type --- tilequeue/postgresql.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tilequeue/postgresql.py b/tilequeue/postgresql.py index de65360b..1a277a72 100644 --- a/tilequeue/postgresql.py +++ b/tilequeue/postgresql.py @@ -2,6 +2,7 @@ from psycopg2.extras import register_hstore, register_json import psycopg2 import threading +import ujson class DBAffinityConnectionsNoLimit(object): @@ -21,7 +22,7 @@ def _make_conn(self, conn_info): conn = psycopg2.connect(**conn_info) conn.set_session(readonly=True, autocommit=True) register_hstore(conn) - register_json(conn) + register_json(conn, loads=ujson.loads) return conn def get_conns(self, n_conn): From 5bb6cc3ffb92736515df94b62d7d1981eadd7c44 Mon Sep 17 00:00:00 2001 From: Ian Dees Date: Tue, 31 Jan 2017 17:10:23 -0500 Subject: [PATCH 3/3] Use cycle instead of counting an index ourselves --- tilequeue/postgresql.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tilequeue/postgresql.py b/tilequeue/postgresql.py index 1a277a72..9ccdaf4b 100644 --- a/tilequeue/postgresql.py +++ b/tilequeue/postgresql.py @@ -12,11 +12,10 @@ class DBAffinityConnectionsNoLimit(object): # back with the connection objects so that we can close them. def __init__(self, dbnames, conn_info): - self.dbnames = dbnames + self.dbnames = cycle(dbnames) self.conn_info = conn_info self.conn_mapping = {} self.lock = threading.Lock() - self.dbname_index = 0 def _make_conn(self, conn_info): conn = psycopg2.connect(**conn_info) @@ -27,10 +26,7 @@ def _make_conn(self, conn_info): def get_conns(self, n_conn): with self.lock: - dbname = self.dbnames[self.dbname_index] - self.dbname_index += 1 - if self.dbname_index >= len(self.dbnames): - self.dbname_index = 0 + dbname = self.dbnames.next() conn_info_with_db = dict(self.conn_info, dbname=dbname) conns = [self._make_conn(conn_info_with_db) for i in range(n_conn)]