Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove my postgres connection pools #149

Merged
merged 4 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def tilequeue_process(cfg, peripherals):
io_pool = ThreadPool(n_io_workers)

feature_fetcher = DataFetcher(cfg.postgresql_conn_info, all_layer_data,
io_pool, n_layers, n_total_needed_query)
io_pool, n_layers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


# create all queues used to manage pipeline

Expand Down
96 changes: 36 additions & 60 deletions tilequeue/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,44 @@
from itertools import cycle
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import register_hstore, register_json
import psycopg2
import threading
import ujson as json


class DatabaseCycleConnectionPool(object):

"""
Maintains a psycopg2 ThreadedConnectionPool for each of the
given dbnames. When a client requests a set of connections,
all of those connections will come from the same database.
"""

def __init__(self, min_conns_per_db, max_conns_per_db, dbnames, conn_info):
self._pools = []
self._conns_to_pool = {}

for dbname in dbnames:
pool = ThreadedConnectionPool(
min_conns_per_db,
max_conns_per_db,
dbname=dbname,
**conn_info
)
self._pools.append(pool)

self._pool_cycle = cycle(self._pools)
self._lock = threading.Lock()

def get_conns(self, n_conns):
conns = []

try:
with self._lock:
pool_to_use = next(self._pool_cycle)
for _ in range(n_conns):
conn = pool_to_use.getconn()
self._conns_to_pool[id(conn)] = pool_to_use
conns.append(conn)

conn.set_session(readonly=True, autocommit=True)
register_json(conn, loads=json.loads, globally=True)
register_hstore(conn, globally=True)
assert len(conns) == n_conns, \
"Couldn't collect enough connections"
except:
if conns:
self.put_conns(conns)
conns = []
raise

import ujson


class DBAffinityConnectionsNoLimit(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back at history I was expecting to see 3x as much content here, mostly based off this SHA:

And then bringing some of the hstore / ujson changes in from:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't use the other database pools, so I removed them.

The ujson stuff is there. The globally=True thing I wanted to leave off in case it caused the connection problem.


# Similar to the db affinity pool, but without keeping track of
# the connections. It's the caller's responsibility to call us
# back with the connection objects so that we can close them.

def __init__(self, dbnames, conn_info):
self.dbnames = cycle(dbnames)
self.conn_info = conn_info
self.conn_mapping = {}
self.lock = threading.Lock()

def _make_conn(self, conn_info):
conn = psycopg2.connect(**conn_info)
conn.set_session(readonly=True, autocommit=True)
register_hstore(conn)
register_json(conn, loads=ujson.loads)
return conn

def get_conns(self, n_conn):
with self.lock:
dbname = self.dbnames.next()
conn_info_with_db = dict(self.conn_info, dbname=dbname)
conns = [self._make_conn(conn_info_with_db)
for i in range(n_conn)]
return conns

def put_conns(self, conns):
with self._lock:
for conn in conns:
pool = self._conns_to_pool.pop(id(conn), None)
assert pool is not None, \
"Couldn't find the pool for connection"
pool.putconn(conn)
for conn in conns:
try:
conn.close()
except:
pass

def closeall(self):
with self._lock:
for pool in self._pools:
pool.closeall()
self._conns_to_pool.clear()
raise Exception('DBAffinityConnectionsNoLimit pool does not track '
'connections')
8 changes: 4 additions & 4 deletions tilequeue/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from psycopg2.extras import RealDictCursor
from tilequeue.postgresql import DatabaseCycleConnectionPool
from tilequeue.postgresql import DBAffinityConnectionsNoLimit
from tilequeue.tile import calc_meters_per_pixel_dim
from tilequeue.tile import coord_to_mercator_bounds
from tilequeue.transform import calculate_padded_bounds
Expand Down Expand Up @@ -150,15 +150,15 @@ def enqueue_queries(sql_conns, thread_pool, layer_data, zoom, unpadded_bounds):

class DataFetcher(object):

def __init__(self, conn_info, layer_data, io_pool, n_conn, max_conn):
def __init__(self, conn_info, layer_data, io_pool, n_conn):
self.conn_info = dict(conn_info)
self.layer_data = layer_data
self.io_pool = io_pool

self.dbnames = self.conn_info.pop('dbnames')
self.dbnames_query_index = 0
self.sql_conn_pool = DatabaseCycleConnectionPool(
n_conn, max_conn, self.dbnames, self.conn_info)
self.sql_conn_pool = DBAffinityConnectionsNoLimit(
self.dbnames, self.conn_info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section around line 160 used to read:

        self.sql_conn_pool = DBAffinityConnectionsNoLimit(
            self.dbnames, n_conn, self.conn_info)

Where self.n_conn = n_conn was part of the function arguments.

Similarly on 170 we used to say:

        sql_conns, conn_info = self.sql_conn_pool.get_conns()

instead of proposed:

         sql_conns = self.sql_conn_pool.get_conns(self.n_conn)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that was fairly messy, so this is a cleanup of that.

self.n_conn = n_conn

def __call__(self, coord, layer_data=None):
Expand Down