diff --git a/tilequeue/command.py b/tilequeue/command.py index 04c403d7..31ca3de6 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -18,6 +18,9 @@ from tilequeue.query import jinja_filter_bbox_overlaps from tilequeue.query import jinja_filter_bbox_padded_intersection from tilequeue.query import jinja_filter_geometry +from tilequeue.query import SourcesQueriesGenerator +from tilequeue.query import TemplateFinder +from tilequeue.query import TemplateQueryGenerator from tilequeue.queue import make_sqs_queue from tilequeue.tile import coord_int_zoom_up from tilequeue.tile import coord_is_valid @@ -41,7 +44,7 @@ from tilequeue.worker import S3Storage from tilequeue.worker import SqsQueueReader from tilequeue.worker import SqsQueueWriter -from tilequeue.postgresql import DBAffinityConnectionsNoLimit +from tilequeue.postgresql import DBConnectionPool from urllib2 import urlopen from zope.dottedname.resolve import resolve import argparse @@ -477,19 +480,7 @@ def _parse_postprocess_resources(post_process_item, cfg_path): return resources -def parse_layer_data(query_cfg, buffer_cfg, template_path, reload_templates, - cfg_path): - if reload_templates: - from tilequeue.query import DevJinjaQueryGenerator - else: - from tilequeue.query import JinjaQueryGenerator - all_layer_names = query_cfg['all'] - layers_config = query_cfg['layers'] - post_process_config = query_cfg.get('post_process', []) - layer_data = [] - all_layer_data = [] - post_process_data = [] - +def make_jinja_environment(template_path): environment = Environment(loader=FileSystemLoader(template_path)) environment.filters['geometry'] = jinja_filter_geometry environment.filters['bbox_filter'] = jinja_filter_bbox_filter @@ -498,20 +489,45 @@ def parse_layer_data(query_cfg, buffer_cfg, template_path, reload_templates, jinja_filter_bbox_padded_intersection) environment.filters['bbox'] = jinja_filter_bbox environment.filters['bbox_overlaps'] = jinja_filter_bbox_overlaps + return environment + + +SourcesConfig = namedtuple('SourcesConfig', 'sources queries_generator') + + +def parse_source_data(queries_cfg): + from tilequeue.query import make_source + sources_cfg = queries_cfg['sources'] + sources = [] + for source_name, source_data in sources_cfg.items(): + template = source_data['template'] + start_zoom = int(source_data.get('start_zoom', 0)) + source = make_source(source_name, template, start_zoom) + sources.append(source) + return sources + + +def make_queries_generator(sources, template_path, reload_templates): + jinja_environment = make_jinja_environment(template_path) + cache_templates = not reload_templates + template_finder = TemplateFinder(jinja_environment, cache_templates) + query_generator = TemplateQueryGenerator(template_finder) + queries_generator = SourcesQueriesGenerator(sources, query_generator) + return queries_generator + + +def parse_layer_data(query_cfg, buffer_cfg, cfg_path): + all_layer_names = query_cfg['all'] + layers_config = query_cfg['layers'] + post_process_config = query_cfg.get('post_process', []) + layer_data = [] + all_layer_data = [] + post_process_data = [] for layer_name, layer_config in layers_config.items(): - template_name = layer_config['template'] - start_zoom = layer_config['start_zoom'] area_threshold = int(layer_config.get('area-inclusion-threshold', 1)) - if reload_templates: - query_generator = DevJinjaQueryGenerator( - environment, template_name, start_zoom) - else: - template = environment.get_template(template_name) - query_generator = JinjaQueryGenerator(template, start_zoom) layer_datum = dict( name=layer_name, - query_generator=query_generator, is_clipped=layer_config.get('clip', True), clip_factor=layer_config.get('clip_factor', 1.0), geometry_types=layer_config['geometry_types'], @@ -581,8 +597,7 @@ def tilequeue_process(cfg, peripherals): query_cfg = yaml.load(query_cfg_fp) all_layer_data, layer_data, post_process_data = ( parse_layer_data( - query_cfg, cfg.buffer_cfg, cfg.template_path, cfg.reload_templates, - os.path.dirname(cfg.query_cfg))) + query_cfg, cfg.buffer_cfg, os.path.dirname(cfg.query_cfg))) formats = lookup_formats(cfg.output_formats) @@ -632,9 +647,11 @@ def tilequeue_process(cfg, peripherals): n_max_io_workers = 50 n_io_workers = min(n_total_needed, n_max_io_workers) io_pool = ThreadPool(n_io_workers) - - feature_fetcher = DataFetcher(cfg.postgresql_conn_info, all_layer_data, - io_pool, n_layers) + sources = parse_source_data(query_cfg) + queries_generator = make_queries_generator( + sources, cfg.template_path, cfg.reload_templates) + feature_fetcher = DataFetcher( + cfg.postgresql_conn_info, queries_generator, io_pool) # create all queues used to manage pipeline @@ -665,7 +682,7 @@ def tilequeue_process(cfg, peripherals): data_processor = ProcessAndFormatData( post_process_data, formats, sql_data_fetch_queue, processor_queue, - cfg.buffer_cfg, output_calc_mapping, logger) + cfg.buffer_cfg, output_calc_mapping, layer_data, logger) s3_storage = S3Storage(processor_queue, s3_store_queue, io_pool, store, logger, cfg.metatile_size) @@ -1008,7 +1025,7 @@ def tilequeue_consume_tile_traffic(cfg, peripherals): conn_info = dict(cfg.postgresql_conn_info) dbnames = conn_info.pop('dbnames') - sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False) + sql_conn_pool = DBConnectionPool(dbnames, conn_info, False) sql_conn = sql_conn_pool.get_conns(1)[0] with sql_conn.cursor() as cursor: diff --git a/tilequeue/postgresql.py b/tilequeue/postgresql.py index 9cb73541..74a7cd08 100644 --- a/tilequeue/postgresql.py +++ b/tilequeue/postgresql.py @@ -1,15 +1,34 @@ from itertools import cycle +from itertools import islice from psycopg2.extras import register_hstore, register_json import psycopg2 import threading import ujson -class DBAffinityConnectionsNoLimit(object): +class ConnectionsContextManager(object): - # Similar to the db affinity pool, but without keeping track of - # the connections. It's the caller's responsibility to call us - # back with the connection objects so that we can close them. + """Handle automatically closing connections via with statement""" + + def __init__(self, conns): + self.conns = conns + + def __enter__(self): + return self.conns + + def __exit__(self, exc_type, exc_val, exc_tb): + for conn in self.conns: + try: + conn.close() + except: + pass + suppress_exception = False + return suppress_exception + + +class DBConnectionPool(object): + + """Manage database connections with varying database names""" def __init__(self, dbnames, conn_info, readonly=True): self.dbnames = cycle(dbnames) @@ -27,19 +46,11 @@ def _make_conn(self, conn_info): def get_conns(self, n_conn): with self.lock: - dbname = self.dbnames.next() - conn_info_with_db = dict(self.conn_info, dbname=dbname) - conns = [self._make_conn(conn_info_with_db) - for i in range(n_conn)] - return conns - - def put_conns(self, conns): - for conn in conns: - try: - conn.close() - except: - pass - - def closeall(self): - raise Exception('DBAffinityConnectionsNoLimit pool does not track ' - 'connections') + dbnames = list(islice(self.dbnames, n_conn)) + conns = [] + for dbname in dbnames: + conn_info_with_db = dict(self.conn_info, dbname=dbname) + conn = self._make_conn(conn_info_with_db) + conns.append(conn) + conns_ctx_mgr = ConnectionsContextManager(conns) + return conns_ctx_mgr diff --git a/tilequeue/process.py b/tilequeue/process.py index 54457c42..efc24813 100644 --- a/tilequeue/process.py +++ b/tilequeue/process.py @@ -443,3 +443,108 @@ def process_coord(coord, nominal_zoom, feature_layers, post_process_data, unpadded_bounds, cut_coords, buffer_cfg, extra_data, scale) return all_formatted_tiles, extra_data + + +def convert_source_data_to_feature_layers(rows, layer_data, bounds, zoom): + # TODO we might want to fold in the other processing into this + # step at some point. This will prevent us from having to iterate + # through all the features again. + + layers = ( + 'boundaries', + 'buildings', + 'earth', + 'landuse', + 'places', + 'pois', + 'roads', + 'transit', + 'water', + ) + + features_by_layer = {} + for layer in layers: + features_by_layer[layer] = [] + + for row in rows: + + fid = row.pop('__id__') + + geometry = row.pop('__geometry__', None) + label_geometry = row.pop('__label__', None) + boundaries_geometry = row.pop('__boundaries_geometry__', None) + assert geometry or boundaries_geometry + + common_props = row.pop('__properties__', None) + if common_props is None: + # if __properties__ exists but is null in the query, we + # want to normalize that to an empty dict too + common_props = {} + + row_props_by_layer = dict( + boundaries=row.pop('__boundaries_properties__', None), + buildings=row.pop('__buildings_properties__', None), + earth=row.pop('__earth_properties__', None), + landuse=row.pop('__landuse_properties__', None), + places=row.pop('__places_properties__', None), + pois=row.pop('__pois_properties__', None), + roads=row.pop('__roads_properties__', None), + transit=row.pop('__transit_properties__', None), + water=row.pop('__water_properties__', None), + ) + + # TODO at first pass, simulate the structure that we're + # expecting downstream in the process_coord function + for layer in layers: + layer_props = row_props_by_layer[layer] + if layer_props is not None: + props = common_props.copy() + props.update(layer_props) + + min_zoom = props.get('min_zoom', None) + assert min_zoom is not None, \ + 'Missing min_zoom in layer %s' % layer + + # a feature can belong to more than one layer + # this check ensures that it only appears in the + # layers it should + if min_zoom is None: + continue + # TODO would be better if 16 wasn't hard coded here + if zoom < 16 and min_zoom >= zoom + 1: + continue + + query_props = dict( + __properties__=props, + __id__=fid, + ) + + if boundaries_geometry and layer == 'boundaries': + geom = boundaries_geometry + else: + geom = geometry + query_props['__geometry__'] = geom + if label_geometry: + query_props['__label__'] = label_geometry + + features_by_layer[layer].append(query_props) + + feature_layers = [] + for layer_datum in layer_data: + layer = layer_datum['name'] + features = features_by_layer[layer] + # TODO padded bounds + padded_bounds = dict( + polygon=bounds, + line=bounds, + point=bounds, + ) + feature_layer = dict( + name=layer, + features=features, + layer_datum=layer_datum, + padded_bounds=padded_bounds, + ) + feature_layers.append(feature_layer) + + return feature_layers diff --git a/tilequeue/query.py b/tilequeue/query.py index 8f9f9cfe..fd7cad7c 100644 --- a/tilequeue/query.py +++ b/tilequeue/query.py @@ -1,37 +1,74 @@ +from collections import namedtuple from psycopg2.extras import RealDictCursor -from tilequeue.postgresql import DBAffinityConnectionsNoLimit -from tilequeue.tile import calc_meters_per_pixel_dim +from tilequeue.postgresql import DBConnectionPool from tilequeue.transform import calculate_padded_bounds import sys -def generate_query(start_zoom, template, bounds, zoom): - if zoom < start_zoom: - return None - query = template.render(bounds=bounds, zoom=zoom) - return query +DataSource = namedtuple('DataSource', 'name template, start_zoom') -class JinjaQueryGenerator(object): +def make_source(name, template, start_zoom): + return DataSource(name, template, start_zoom) - def __init__(self, template, start_zoom): - self.template = template - self.start_zoom = start_zoom - def __call__(self, bounds, zoom): - return generate_query(self.start_zoom, self.template, bounds, zoom) +class TemplateFinder(object): + """Look up the jinja template -class DevJinjaQueryGenerator(object): + The cache_templates option is expected to be set in production to + avoid having to regenerate the template repeatedly. + """ - def __init__(self, environment, template_name, start_zoom): - self.environment = environment - self.template_name = template_name - self.start_zoom = start_zoom + def __init__(self, jinja_environment, cache_templates=False): + self.environment = jinja_environment + self.cache_templates = cache_templates + if cache_templates: + self.template_cache = {} - def __call__(self, bounds, zoom): - template = self.environment.get_template(self.template_name) - return generate_query(self.start_zoom, template, bounds, zoom) + def __call__(self, source_name): + template = None + if self.cache_templates: + template = self.template_cache.get(source_name) + if not template: + template = self.environment.get_template(source_name) + if self.cache_templates: + self.template_cache[source_name] = template + return template + + +class TemplateQueryGenerator(object): + + def __init__(self, template_finder): + self.template_finder = template_finder + + def __call__(self, source, bounds, zoom): + template = self.template_finder(source) + + # TODO bounds padding + padded_bounds = dict( + polygon=bounds, + line=bounds, + point=bounds, + ) + + query = template.render(bounds=padded_bounds, zoom=zoom) + return query + + +class SourcesQueriesGenerator(object): + + def __init__(self, sources, query_generator): + self.sources = sources + self.query_generator = query_generator + + def __call__(self, zoom, bounds): + queries = [] + for source in self.sources: + if source.start_zoom <= zoom: + query = self.query_generator(source.template, bounds, zoom) + queries.append(query) + return queries def jinja_filter_geometry(value): @@ -82,27 +119,16 @@ def jinja_filter_bbox_overlaps(bounds, geometry_col_name, srid=3857): return bbox_filter -def build_feature_queries(unpadded_bounds, layer_data, zoom): - meters_per_pixel_dim = calc_meters_per_pixel_dim(zoom) - queries_to_execute = [] - for layer_datum in layer_data: - query_bounds_pad_fn = layer_datum['query_bounds_pad_fn'] - padded_bounds = query_bounds_pad_fn( - unpadded_bounds, meters_per_pixel_dim) - query_generator = layer_datum['query_generator'] - query = query_generator(padded_bounds, zoom) - queries_to_execute.append((layer_datum, query, padded_bounds)) - return queries_to_execute - - -def execute_query(conn, query, layer_datum, padded_bounds): +def execute_query(conn, query): try: cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(query) rows = list(cursor.fetchall()) - return rows, layer_datum, padded_bounds + return rows except: + # TODO this kind of thing is only necessary if we re-use connections + # If any exception occurs during query execution, close the # connection to ensure it is not in an invalid state. The # connection pool knows to create new connections to replace @@ -114,113 +140,66 @@ def execute_query(conn, query, layer_datum, padded_bounds): raise -def trim_layer_datum(layer_datum): - layer_datum_result = dict( - [(k, v) for k, v in layer_datum.items() - if k not in ('query_generator', 'query_bounds_pad_fn')]) - return layer_datum_result - - -def enqueue_queries(sql_conns, thread_pool, layer_data, zoom, unpadded_bounds): - - queries_to_execute = build_feature_queries( - unpadded_bounds, layer_data, zoom) +class DataFetchException(Exception): - empty_results = [] - async_results = [] - for (layer_datum, query, padded_bounds), sql_conn in zip( - queries_to_execute, sql_conns): - layer_datum = trim_layer_datum(layer_datum) - if query is None: - empty_feature_layer = dict( - name=layer_datum['name'], - features=[], - layer_datum=layer_datum, - padded_bounds=padded_bounds, - ) - empty_results.append(empty_feature_layer) - else: - async_result = thread_pool.apply_async( - execute_query, (sql_conn, query, layer_datum, padded_bounds)) - async_results.append(async_result) + """Capture all exceptions when trying to read data""" - return empty_results, async_results + def __init__(self, exceptions): + self.exceptions = exceptions + msgs = ', '.join([x.message for x in exceptions]) + super(DataFetchException, self).__init__(msgs) class DataFetcher(object): - def __init__(self, conn_info, layer_data, io_pool, n_conn): + def __init__(self, conn_info, queries_generator, io_pool): self.conn_info = dict(conn_info) - self.layer_data = layer_data + self.queries_generator = queries_generator self.io_pool = io_pool self.dbnames = self.conn_info.pop('dbnames') self.dbnames_query_index = 0 - self.sql_conn_pool = DBAffinityConnectionsNoLimit( + self.sql_conn_pool = DBConnectionPool( self.dbnames, self.conn_info) - self.n_conn = n_conn - def __call__(self, zoom, unpadded_bounds, layer_data=None): - if layer_data is None: - layer_data = self.layer_data + def __call__(self, zoom, unpadded_bounds): + queries = self.queries_generator(zoom, unpadded_bounds) - sql_conns = self.sql_conn_pool.get_conns(self.n_conn) - try: - # the padded bounds are used here in order to only have to - # issue a single set of queries to the database for all - # formats - empty_results, async_results = enqueue_queries( - sql_conns, self.io_pool, layer_data, zoom, unpadded_bounds) - - feature_layers = [] - async_exception = None + n_conns = len(queries) + assert n_conns, 'no queries' + + with self.sql_conn_pool.get_conns(n_conns) as sql_conns: + async_results = [] + for query, conn in zip(queries, sql_conns): + async_result = self.io_pool.apply_async( + execute_query, (conn, query)) + async_results.append(async_result) + + all_source_rows = [] + async_exceptions = [] for async_result in async_results: try: - rows, layer_datum, padded_bounds = async_result.get() + source_rows = async_result.get() + # TODO can all the source rows just be smashed together? + # seems like it because the data allows discrimination + all_source_rows.extend(source_rows) except: exc_type, exc_value, exc_traceback = sys.exc_info() async_exception = exc_value - # iterate through all async results to give others - # a chance to close any connections that yielded - # exceptions + async_exceptions.append(async_exception) continue - # don't continue processing if an error occurred on - # any results - if async_exception is not None: - continue + if async_exceptions: + raise DataFetchException(async_exceptions) + + read_rows = [] + for row in all_source_rows: + read_row = {} + for k, v in row.items(): + if isinstance(v, buffer): + v = bytes(v) + if v is not None: + read_row[k] = v + read_rows.append(read_row) - # read the bytes out of each row, otherwise the pickle - # will fail because the geometry is a read buffer - # only keep values that are not None - read_rows = [] - for row in rows: - read_row = {} - for k, v in row.items(): - if isinstance(v, buffer): - v = bytes(v) - if v is not None: - read_row[k] = v - read_rows.append(read_row) - - feature_layer = dict( - name=layer_datum['name'], features=read_rows, - layer_datum=layer_datum, - padded_bounds=padded_bounds, - ) - feature_layers.append(feature_layer) - - # bail if an error occurred - if async_exception is not None: - raise async_exception - - feature_layers.extend(empty_results) - - return dict( - feature_layers=feature_layers, - unpadded_bounds=unpadded_bounds, - padded_bounds=padded_bounds, - ) - - finally: - self.sql_conn_pool.put_conns(sql_conns) + return read_rows diff --git a/tilequeue/worker.py b/tilequeue/worker.py index a25af8ca..f6dfe4c9 100644 --- a/tilequeue/worker.py +++ b/tilequeue/worker.py @@ -1,5 +1,6 @@ from operator import attrgetter from psycopg2.extensions import TransactionRollbackError +from tilequeue.process import convert_source_data_to_feature_layers from tilequeue.process import process_coord from tilequeue.store import write_tile_if_changed from tilequeue.tile import coord_children_range @@ -187,7 +188,7 @@ def __call__(self, stop): start = time.time() try: - fetch_data = self.fetcher(nominal_zoom, unpadded_bounds) + source_rows = self.fetcher(nominal_zoom, unpadded_bounds) except: exc_type, exc_value, exc_traceback = sys.exc_info() stacktrace = format_stacktrace_one_line( @@ -214,8 +215,8 @@ def __call__(self, stop): data = dict( metadata=metadata, coord=coord, - feature_layers=fetch_data['feature_layers'], - unpadded_bounds=fetch_data['unpadded_bounds'], + source_rows=source_rows, + unpadded_bounds=unpadded_bounds, cut_coords=cut_coords, nominal_zoom=nominal_zoom, ) @@ -233,7 +234,8 @@ class ProcessAndFormatData(object): scale = 4096 def __init__(self, post_process_data, formats, input_queue, - output_queue, buffer_cfg, output_calc_mapping, logger): + output_queue, buffer_cfg, output_calc_mapping, layer_data, + logger): formats.sort(key=attrgetter('sort_key')) self.post_process_data = post_process_data self.formats = formats @@ -241,6 +243,7 @@ def __init__(self, post_process_data, formats, input_queue, self.output_queue = output_queue self.buffer_cfg = buffer_cfg self.output_calc_mapping = output_calc_mapping + self.layer_data = layer_data self.logger = logger def __call__(self, stop): @@ -260,14 +263,17 @@ def __call__(self, stop): break coord = data['coord'] - feature_layers = data['feature_layers'] unpadded_bounds = data['unpadded_bounds'] cut_coords = data['cut_coords'] nominal_zoom = data['nominal_zoom'] + source_rows = data['source_rows'] start = time.time() try: + feature_layers = convert_source_data_to_feature_layers( + source_rows, self.layer_data, unpadded_bounds, + nominal_zoom) formatted_tiles, extra_data = process_coord( coord, nominal_zoom, feature_layers, self.post_process_data, self.formats, unpadded_bounds,