Skip to content

Commit

Permalink
Merge pull request #204 from GeoThings/consume-tile-traffic
Browse files Browse the repository at this point in the history
Prune TOI adapted to postgres and standalone tileserver
  • Loading branch information
nvkelso authored Jun 5, 2017
2 parents ee70891 + ff54142 commit 5f71f10
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 39 deletions.
11 changes: 9 additions & 2 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ toi-store:

# Configuration for the tiles of interest prune/garden command.
toi-prune:
# Connection and query configuration for a RedShift database containing
# location of tileserver logs
tile-traffic-log-path: ../tileserver/nohup.out

# Connection and query configuration for a database containing
# request information for tiles.
redshift:
tile-history:
database-uri: postgresql://user:password@localhost:5439/database
# The number of days of history to query for.
days: 30
Expand All @@ -206,6 +209,10 @@ toi-prune:
path: osm
layer: all
format: zip
# layer and format of tiles to be deleted
store:
layer: all
format: zip
always-include:
# Sets of tiles to always include in the tiles of interest.
# For more information about options here, see the code:
Expand Down
8 changes: 7 additions & 1 deletion logging.conf.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[loggers]
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query,consume_tile_traffic

[handlers]
keys=consoleHandler
Expand Down Expand Up @@ -71,6 +71,12 @@ handlers=consoleHandler
qualName=query
propagate=0

[logger_consume_tile_traffic]
level=INFO
handlers=consoleHandler
qualName=consume_tile_traffic
propagate=0

[handler_consoleHandler]
class=StreamHandler
formatter=simpleFormatter
Expand Down
96 changes: 63 additions & 33 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
from tilequeue.toi import save_set_to_fp
from tilequeue.top_tiles import parse_top_tiles
from tilequeue.utils import grouper
from tilequeue.utils import parse_log_file
from tilequeue.worker import DataFetch
from tilequeue.worker import ProcessAndFormatData
from tilequeue.worker import QueuePrint
from tilequeue.worker import S3Storage
from tilequeue.worker import SqsQueueReader
from tilequeue.worker import SqsQueueWriter
from tilequeue.postgresql import DBAffinityConnectionsNoLimit
from urllib2 import urlopen
from zope.dottedname.resolve import resolve
import argparse
Expand Down Expand Up @@ -958,6 +960,41 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals):
logger.info('%d tiles of interest processed' % n_toi)


def tilequeue_consume_tile_traffic(cfg, peripherals):
logger = make_logger(cfg, 'consume_tile_traffic')
logger.info('Consuming tile traffic logs ...')

tile_log_records = None
with open(cfg.tile_traffic_log_path, 'r') as log_file:
tile_log_records = parse_log_file(log_file)

if not tile_log_records:
logger.info("Couldn't parse log file")
sys.exit(1)

conn_info = dict(cfg.postgresql_conn_info)
dbnames = conn_info.pop('dbnames')
sql_conn_pool = DBAffinityConnectionsNoLimit(dbnames, conn_info, False)
sql_conn = sql_conn_pool.get_conns(1)[0]
with sql_conn.cursor() as cursor:

# insert the log records after the latest_date
cursor.execute('SELECT max(date) from tile_traffic_v4')
max_timestamp = cursor.fetchone()[0]

n_coords_inserted = 0
for host, timestamp, coord_int in tile_log_records:
if not max_timestamp or timestamp > max_timestamp:
coord = coord_unmarshall_int(coord_int)
cursor.execute("INSERT into tile_traffic_v4 (date, z, x, y, tilesize, service, host) VALUES ('%s', %d, %d, %d, %d, '%s', '%s')"
% (timestamp, coord.zoom, coord.column, coord.row, 512, 'vector-tiles', host))
n_coords_inserted += 1

logger.info('Inserted %d records' % n_coords_inserted)

sql_conn_pool.put_conns([sql_conn])


def emit_toi_stats(toi_set, peripherals):
"""
Calculates new TOI stats and emits them via statsd.
Expand All @@ -977,7 +1014,6 @@ def emit_toi_stats(toi_set, peripherals):
count
)


def tilequeue_prune_tiles_of_interest(cfg, peripherals):
logger = make_logger(cfg, 'prune_tiles_of_interest')
logger.info('Pruning tiles of interest ...')
Expand All @@ -991,28 +1027,36 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):

prune_cfg = cfg.yml.get('toi-prune', {})

redshift_cfg = prune_cfg.get('redshift', {})
redshift_uri = redshift_cfg.get('database-uri')
assert redshift_uri, ("A redshift connection URI must "
tile_history_cfg = prune_cfg.get('tile-history', {})
db_conn_info = tile_history_cfg.get('database-uri')
assert db_conn_info, ("A postgres-compatible connection URI must "
"be present in the config yaml")

redshift_days_to_query = redshift_cfg.get('days')
redshift_days_to_query = tile_history_cfg.get('days')
assert redshift_days_to_query, ("Number of days to query "
"redshift is not specified")

redshift_zoom_cutoff = int(redshift_cfg.get('max-zoom', '16'))
redshift_zoom_cutoff = int(tile_history_cfg.get('max-zoom', '16'))

s3_parts = prune_cfg.get('s3')
assert s3_parts, ("The name of an S3 bucket containing tiles "
"to delete must be specified")
# flag indicating that s3 entry in toi-prune is used for s3 store
legacy_fallback = 's3' in prune_cfg
store_parts = prune_cfg.get('s3') or prune_cfg.get('store')
assert store_parts, ("The configuration of a store containing tiles "
"to delete must be specified under toi-prune:store or toi-prune:s3")
# explictly override the store configuration with values provided in toi-prune:s3
if legacy_fallback:
cfg.store_type = 's3'
cfg.s3_bucket = store_parts['bucket']
cfg.s3_date_prefix = store_parts['date-prefix']
cfg.s3_path = store_parts['path']

redshift_results = defaultdict(int)
with psycopg2.connect(redshift_uri) as conn:
with psycopg2.connect(db_conn_info) as conn:
with conn.cursor() as cur:
cur.execute("""
select x, y, z, tilesize, count(*)
from tile_traffic_v4
where (date >= dateadd(day, -{days}, getdate()))
where (date >= (current_timestamp - interval '{days} days'))
and (z between 0 and {max_zoom})
and (x between 0 and pow(2,z)-1)
and (y between 0 and pow(2,z)-1)
Expand All @@ -1021,7 +1065,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):
order by z, x, y, tilesize
""".format(
days=redshift_days_to_query,
max_zoom=redshift_zoom_cutoff,
max_zoom=redshift_zoom_cutoff
))
for (x, y, z, tile_size, count) in cur:
coord = create_coord(x, y, z)
Expand Down Expand Up @@ -1128,25 +1172,7 @@ def tilequeue_prune_tiles_of_interest(cfg, peripherals):
len(toi_to_remove))
peripherals.stats.gauge('gardener.removed', len(toi_to_remove))

def delete_from_s3(s3_parts, coord_ints):
# Remove from S3
s3 = boto.connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key)
buk = s3.get_bucket(s3_parts['bucket'], validate=False)
keys = [
s3_tile_key(
s3_parts['date-prefix'],
s3_parts['path'],
s3_parts['layer'],
coord_unmarshall_int(coord_int),
s3_parts['format']
)
for coord_int in coord_ints
]
del_result = buk.delete_keys(keys)
removed = len(del_result.deleted)

logger.info('Removed %s tiles from S3', removed)

store = make_store(cfg.store_type, cfg.s3_bucket, cfg)
if not toi_to_remove:
logger.info('Skipping TOI remove step because there are '
'no tiles to remove')
Expand All @@ -1155,7 +1181,9 @@ def delete_from_s3(s3_parts, coord_ints):
len(toi_to_remove))

for coord_ints in grouper(toi_to_remove, 1000):
delete_from_s3(s3_parts, coord_ints)
removed = store.delete_tiles(map(coord_unmarshall_int, coord_ints),
lookup_format_by_extension(store_parts['format']), store_parts['layer'])
logger.info('Removed %s tiles from S3', removed)

logger.info('Removing %s tiles from TOI and S3 ... done',
len(toi_to_remove))
Expand Down Expand Up @@ -1553,6 +1581,8 @@ def tilequeue_main(argv_args=None):
tilequeue_process_wof_neighbourhoods)),
('wof-load-initial-neighbourhoods', create_command_parser(
tilequeue_initial_load_wof_neighbourhoods)),
('consume-tile-traffic', create_command_parser(
tilequeue_consume_tile_traffic))
)
for parser_name, parser_func in parser_config:
subparser = subparsers.add_parser(parser_name)
Expand Down
2 changes: 2 additions & 0 deletions tilequeue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def __init__(self, yml):
self.proc_queue_buffer_size = self._cfg('queue_buffer_size proc')
self.s3_queue_buffer_size = self._cfg('queue_buffer_size s3')

self.tile_traffic_log_path = self._cfg('toi-prune tile-traffic-log-path')

def _cfg(self, yamlkeys_str):
yamlkeys = yamlkeys_str.split()
yamlval = self.yml
Expand Down
2 changes: 2 additions & 0 deletions tilequeue/format/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat):
vtm=vtm_format,
mvt=mvt_format,
mvtb=mvtb_format,
zip=zip_format
)

name_to_format = {
Expand All @@ -116,6 +117,7 @@ def format_vtm(fp, feature_layers, zoom, bounds_merc, bounds_lnglat):
'TopoJSON': topojson_format,
'MVT': mvt_format,
'MVT Buffered': mvtb_format,
'ZIP Metatile': zip_format
}


Expand Down
5 changes: 3 additions & 2 deletions tilequeue/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ class DBAffinityConnectionsNoLimit(object):
# the connections. It's the caller's responsibility to call us
# back with the connection objects so that we can close them.

def __init__(self, dbnames, conn_info):
def __init__(self, dbnames, conn_info, readonly=True):
self.dbnames = cycle(dbnames)
self.conn_info = conn_info
self.conn_mapping = {}
self.lock = threading.Lock()
self.readonly = readonly

def _make_conn(self, conn_info):
conn = psycopg2.connect(**conn_info)
conn.set_session(readonly=True, autocommit=True)
conn.set_session(readonly=self.readonly, autocommit=True)
register_hstore(conn)
register_json(conn, loads=ujson.loads)
return conn
Expand Down
18 changes: 18 additions & 0 deletions tilequeue/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ def read_tile(self, coord, format, layer):
tile_data = key.get_contents_as_string()
return tile_data

def delete_tiles(self, coords, format, layer):
key_names = [
s3_tile_key(self.date_prefix, self.path, layer, coord, format.extension)
for coord in coords
]
del_result = self.bucket.delete_keys(key_names)
return len(del_result.deleted)


def make_dir_path(base_path, coord, layer):
path = os.path.join(
Expand Down Expand Up @@ -200,6 +208,16 @@ def read_tile(self, coord, format, layer):
except IOError:
return None

def delete_tiles(self, coords, format, layer):
delete_count = 0
for coord in coords:
file_path = make_file_path(self.base_path, coord, layer, format.extension)
if os.path.isfile(file_path):
os.remove(file_path)
delete_count += 1

return delete_count


def make_tile_file_store(base_path=None):
if base_path is None:
Expand Down
23 changes: 22 additions & 1 deletion tilequeue/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import sys
import traceback
import re
from itertools import islice

from datetime import datetime
from tilequeue.tile import coord_marshall_int
from tilequeue.tile import create_coord

def format_stacktrace_one_line(exc_info=None):
# exc_info is expected to be an exception tuple from sys.exc_info()
Expand All @@ -23,3 +26,21 @@ def grouper(iterable, n):
if not chunk:
return
yield chunk

def parse_log_file(log_file):
ip_pattern = '(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
# didn't match againts explicit date pattern, in case it changes
date_pattern = '\[([\d\w\s\/:]+)\]'
tile_id_pattern = '\/([\w]+)\/([\d]+)\/([\d]+)\/([\d]+)\.([\d\w]*)'

log_pattern = '%s - - %s "([\w]+) %s.*' % (ip_pattern, date_pattern, tile_id_pattern)

tile_log_records = []
for log_string in log_file:
match = re.search(log_pattern, log_string)
if match and len(match.groups()) == 8:
tile_log_records.append((match.group(1),
datetime.strptime(match.group(2), '%d/%B/%Y %H:%M:%S'),
coord_marshall_int(create_coord(match.group(6), match.group(7), match.group(5)))))

return tile_log_records

0 comments on commit 5f71f10

Please sign in to comment.