Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes queries per-table #227

Merged
merged 4 commits into from
Jul 31, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from tilequeue.query import jinja_filter_bbox_overlaps
from tilequeue.query import jinja_filter_bbox_padded_intersection
from tilequeue.query import jinja_filter_geometry
from tilequeue.query import SourcesQueriesGenerator
from tilequeue.query import TemplateFinder
from tilequeue.query import TemplateQueryGenerator
from tilequeue.queue import make_sqs_queue
from tilequeue.tile import coord_int_zoom_up
from tilequeue.tile import coord_is_valid
Expand All @@ -41,7 +44,7 @@
from tilequeue.worker import S3Storage
from tilequeue.worker import SqsQueueReader
from tilequeue.worker import SqsQueueWriter
from tilequeue.postgresql import DBAffinityConnectionsNoLimit
from tilequeue.postgresql import DBConnectionPool
from urllib2 import urlopen
from zope.dottedname.resolve import resolve
import argparse
Expand Down Expand Up @@ -477,19 +480,7 @@ def _parse_postprocess_resources(post_process_item, cfg_path):
return resources


def parse_layer_data(query_cfg, buffer_cfg, template_path, reload_templates,
cfg_path):
if reload_templates:
from tilequeue.query import DevJinjaQueryGenerator
else:
from tilequeue.query import JinjaQueryGenerator
all_layer_names = query_cfg['all']
layers_config = query_cfg['layers']
post_process_config = query_cfg.get('post_process', [])
layer_data = []
all_layer_data = []
post_process_data = []

def make_jinja_environment(template_path):
environment = Environment(loader=FileSystemLoader(template_path))
environment.filters['geometry'] = jinja_filter_geometry
environment.filters['bbox_filter'] = jinja_filter_bbox_filter
Expand All @@ -498,20 +489,45 @@ def parse_layer_data(query_cfg, buffer_cfg, template_path, reload_templates,
jinja_filter_bbox_padded_intersection)
environment.filters['bbox'] = jinja_filter_bbox
environment.filters['bbox_overlaps'] = jinja_filter_bbox_overlaps
return environment


SourcesConfig = namedtuple('SourcesConfig', 'sources queries_generator')


def parse_source_data(queries_cfg):
from tilequeue.query import make_source
sources_cfg = queries_cfg['sources']
sources = []
for source_name, source_data in sources_cfg.items():
template = source_data['template']
start_zoom = int(source_data.get('start_zoom', 0))
source = make_source(source_name, template, start_zoom)
sources.append(source)
return sources


def make_queries_generator(sources, template_path, reload_templates):
jinja_environment = make_jinja_environment(template_path)
cache_templates = not reload_templates
template_finder = TemplateFinder(jinja_environment, cache_templates)
query_generator = TemplateQueryGenerator(template_finder)
queries_generator = SourcesQueriesGenerator(sources, query_generator)
return queries_generator


def parse_layer_data(query_cfg, buffer_cfg, cfg_path):
all_layer_names = query_cfg['all']
layers_config = query_cfg['layers']
post_process_config = query_cfg.get('post_process', [])
layer_data = []
all_layer_data = []
post_process_data = []

for layer_name, layer_config in layers_config.items():
template_name = layer_config['template']
start_zoom = layer_config['start_zoom']
area_threshold = int(layer_config.get('area-inclusion-threshold', 1))
if reload_templates:
query_generator = DevJinjaQueryGenerator(
environment, template_name, start_zoom)
else:
template = environment.get_template(template_name)
query_generator = JinjaQueryGenerator(template, start_zoom)
layer_datum = dict(
name=layer_name,
query_generator=query_generator,
is_clipped=layer_config.get('clip', True),
clip_factor=layer_config.get('clip_factor', 1.0),
geometry_types=layer_config['geometry_types'],
Expand Down Expand Up @@ -1008,7 +1024,7 @@ def tilequeue_consume_tile_traffic(cfg, peripherals):

conn_info = dict(cfg.postgresql_conn_info)
dbnames = conn_info.pop('dbnames')
sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False)
sql_conn_pool = DBConnectionPool(dbnames, conn_info, False)
sql_conn = sql_conn_pool.get_conns(1)[0]
with sql_conn.cursor() as cursor:

Expand Down
51 changes: 31 additions & 20 deletions tilequeue/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
from itertools import cycle
from itertools import islice
from psycopg2.extras import register_hstore, register_json
import psycopg2
import threading
import ujson


class DBAffinityConnectionsNoLimit(object):
class ConnectionsContextManager(object):

# Similar to the db affinity pool, but without keeping track of
# the connections. It's the caller's responsibility to call us
# back with the connection objects so that we can close them.
"""Handle automatically closing connections via with statement"""

def __init__(self, conns):
self.conns = conns

def __enter__(self):
return self.conns

def __exit__(self, exc_type, exc_val, exc_tb):
for conn in self.conns:
try:
conn.close()
except:
pass
suppress_exception = False
return suppress_exception


class DBConnectionPool(object):

"""Manage database connections with varying database names"""

def __init__(self, dbnames, conn_info, readonly=True):
self.dbnames = cycle(dbnames)
Expand All @@ -27,19 +46,11 @@ def _make_conn(self, conn_info):

def get_conns(self, n_conn):
with self.lock:
dbname = self.dbnames.next()
conn_info_with_db = dict(self.conn_info, dbname=dbname)
conns = [self._make_conn(conn_info_with_db)
for i in range(n_conn)]
return conns

def put_conns(self, conns):
for conn in conns:
try:
conn.close()
except:
pass

def closeall(self):
raise Exception('DBAffinityConnectionsNoLimit pool does not track '
'connections')
dbnames = list(islice(self.dbnames, n_conn))
conns = []
for dbname in dbnames:
conn_info_with_db = dict(self.conn_info, dbname=dbname)
conn = self._make_conn(conn_info_with_db)
conns.append(conn)
conns_ctx_mgr = ConnectionsContextManager(conns)
return conns_ctx_mgr
101 changes: 101 additions & 0 deletions tilequeue/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,104 @@ def process_coord(coord, nominal_zoom, feature_layers, post_process_data,
unpadded_bounds, cut_coords, buffer_cfg, extra_data, scale)

return all_formatted_tiles, extra_data


def convert_source_data_to_feature_layers(rows, layer_data, bounds, zoom):
# TODO we might want to fold in the other processing into this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a note for the glorious future or for the current sprint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would have been a performance improvement, if we find that when we run it on dev it's too slow. If it's unnecessary, it might be better to hold off on this since it might change with phase 3 anyway.

# step at some point. This will prevent us from having to iterate
# through all the features again.

layers = (
'boundaries',
'buildings',
'earth',
'landuse',
'places',
'pois',
'roads',
'transit',
'water',
)

features_by_layer = {}
for layer in layers:
features_by_layer[layer] = []

for row in rows:

fid = row.pop('__id__')

geometry = row.pop('__geometry__', None)
label_geometry = row.pop('__label__', None)
boundaries_geometry = row.pop('__boundaries_geometry__', None)
assert geometry or boundaries_geometry

common_props = row.pop('__properties__', None) or {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the same thing as row.pop('__properties__', {})?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see... the way it is at the moment handles the case where row['__properties__'] exists, but is None. I feel like that's slightly non-obvious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just confused myself too, so I expanded it out and added a comment in da19aa0.


row_props_by_layer = dict(
boundaries=row.pop('__boundaries_properties__', None),
buildings=row.pop('__buildings_properties__', None),
earth=row.pop('__earth_properties__', None),
landuse=row.pop('__landuse_properties__', None),
places=row.pop('__places_properties__', None),
pois=row.pop('__pois_properties__', None),
roads=row.pop('__roads_properties__', None),
transit=row.pop('__transit_properties__', None),
water=row.pop('__water_properties__', None),
)

# TODO at first pass, simulate the structure that we're
# expecting downstream in the process_coord function
for layer in layers:
layer_props = row_props_by_layer[layer]
if layer_props is not None:
props = common_props.copy()
props.update(layer_props)

min_zoom = props.get('min_zoom', None)
assert min_zoom is not None, \
'Missing min_zoom in layer %s' % layer

# a feature can belong to more than one layer
# this check ensures that it only appears in the
# layers it should
if min_zoom is None:
continue
# TODO would be better if 16 wasn't hard coded here
if zoom < 16 and min_zoom >= zoom + 1:
continue

query_props = dict(
__properties__=props,
__id__=fid,
)

if boundaries_geometry and layer == 'boundaries':
geom = boundaries_geometry
else:
geom = geometry
query_props['__geometry__'] = geom
if label_geometry:
query_props['__label__'] = label_geometry

features_by_layer[layer].append(query_props)

feature_layers = []
for layer_datum in layer_data:
layer = layer_datum['name']
features = features_by_layer[layer]
# TODO padded bounds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support the MVT Buffered format or something additional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly.

padded_bounds = dict(
polygon=bounds,
line=bounds,
point=bounds,
)
feature_layer = dict(
name=layer,
features=features,
layer_datum=layer_datum,
padded_bounds=padded_bounds,
)
feature_layers.append(feature_layer)

return feature_layers
Loading