Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command to automatically prune tiles of interest #176

Merged
merged 14 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion logging.conf.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[loggers]
keys=root,process,seed,intersect,drain,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query
keys=root,process,seed,intersect,drain,prune_tiles_of_interest,enqueue_tiles_of_interest,dump_tiles_of_interest,load_tiles_of_interest,wof_process_neighbourhoods,query

[handlers]
keys=consoleHandler
Expand All @@ -11,6 +11,12 @@ keys=simpleFormatter
level=WARNING
handlers=consoleHandler

[logger_prune_tiles_of_interest]
level=INFO
handlers=consoleHandler
qualName=prune_tiles_of_interest
propagate=0

[logger_enqueue_tiles_of_interest]
level=INFO
handlers=consoleHandler
Expand Down
3 changes: 3 additions & 0 deletions tilequeue/cache/redis_cache_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def intersect(self, coords, tiles_of_interest=None):
if serialized_coord in tiles_of_interest:
yield coord

def remove_tiles_of_interest(self, coord_ints):
return self.redis_client.srem(self.cache_set_key, coord_ints)

def fetch_tiles_of_interest(self):
raw_tiles_of_interest = self.redis_client.smembers(self.cache_set_key)
tiles_of_interest = set()
Expand Down
114 changes: 113 additions & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
from tilequeue.query import jinja_filter_geometry
from tilequeue.query import jinja_filter_bbox_overlaps
from tilequeue.queue import make_sqs_queue
from tilequeue.store import s3_tile_key
from tilequeue.tile import coord_int_zoom_up
from tilequeue.tile import coord_marshall_int
from tilequeue.tile import coord_unmarshall_int
from tilequeue.tile import parse_expired_coord_string, deserialize_coord
from tilequeue.tile import create_coord
from tilequeue.tile import deserialize_coord
from tilequeue.tile import parse_expired_coord_string
from tilequeue.tile import seed_tiles
from tilequeue.tile import tile_generator_for_multiple_bounds
from tilequeue.tile import tile_generator_for_single_bounds
Expand Down Expand Up @@ -949,6 +952,113 @@ def tilequeue_enqueue_tiles_of_interest(cfg, peripherals):
logger.info('%d tiles of interest processed' % n_toi)


def tilequeue_prune_tiles_of_interest(cfg, peripherals):
logger = make_logger(cfg, 'prune_tiles_of_interest')
logger.info('Pruning tiles of interest')

logger.info('Fetching tiles recently requested ...')
import psycopg2
import boto

prune_cfg = cfg.yml.get('toi-prune', {})

redshift_uri = prune_cfg.get('redshift-uri')
assert redshift_uri, ("A redshift connection URI must "
"be present in the config yaml")

redshift_days_to_query = prune_cfg.get('days')
assert redshift_days_to_query, ("Number of days to query "
"redshift is not specified")

s3_parts = prune_cfg.get('s3')
assert s3_parts, ("The name of an S3 bucket containing tiles "
"to delete must be specified")

new_toi = set()
with psycopg2.connect(redshift_uri) as conn:
with conn.cursor() as cur:
cur.execute("""
select x, y, z
from tile_traffic_v4
where (date >= dateadd(day, -{days}, current_date))
and (z between 10 and 16)
and (x between 0 and pow(2,z)-1)
and (y between 0 and pow(2,z)-1)
group by z, x, y
order by z, x, y;""".format(days=redshift_days_to_query))
n_trr = cur.rowcount
for (x, y, z) in cur:
coord = create_coord(x, y, z)
coord_int = coord_marshall_int(coord)
new_toi.add(coord_int)

logger.info('Fetching tiles recently requested ... done. %s found', n_trr)

for name, info in prune_cfg.get('always-include-bboxes', {}).items():
logger.info('Adding in tiles from %s ...', name)

bounds = map(float, info['bbox'].split(','))
bounds_tileset = set()
for coord in tile_generator_for_single_bounds(
bounds, info['min_zoom'], info['max_zoom']):
coord_int = coord_marshall_int(coord)
bounds_tileset.add(coord_int)
n_inc = len(bounds_tileset)
new_toi = new_toi.union(bounds_tileset)

logger.info('Adding in tiles from %s ... done. %s found', name, n_inc)

logger.info('New tiles of interest set includes %s tiles', len(new_toi))

logger.info('Fetching existing tiles of interest ...')
tiles_of_interest = peripherals.redis_cache_index.fetch_tiles_of_interest()
n_toi = len(tiles_of_interest)
logger.info('Fetching existing tiles of interest ... done. %s found',
n_toi)

logger.info('Computing tiles to remove ...')
toi_to_remove = tiles_of_interest - new_toi
logger.info('Computing tiles to remove ... done. %s found',
len(toi_to_remove))

# Null out the reference to old TOI to save some memory
tiles_of_interest = None

logger.info('Removing %s tiles from TOI and S3 ...',
len(toi_to_remove))

def delete_tile_of_interest(s3_parts, coord_ints):
# Remove from the redis toi set
removed = peripherals.redis_cache_index.remove_tiles_of_interest(
coord_ints)

logger.info('Removed %s tiles from redis', removed)

# Remove from S3
s3 = boto.connect_s3(cfg.aws_access_key_id, cfg.aws_secret_access_key)
buk = s3.get_bucket(s3_parts['bucket'], validate=False)
keys = [
s3_tile_key(
s3_parts['date-prefix'],
s3_parts['path'],
s3_parts['layer'],
coord_unmarshall_int(coord_int),
s3_parts['format']
)
for coord_int in coord_ints
]
del_result = buk.delete_keys(keys)
removed = len(del_result.deleted)

logger.info('Removed %s tiles from S3', removed)

for coord_ints in grouper(toi_to_remove, 1000):
delete_tile_of_interest(s3_parts, coord_ints)

logger.info('Removing %s tiles from TOI and S3 ... done',
len(toi_to_remove))


def tilequeue_tile_sizes(cfg, peripherals):
# find averages, counts, and medians for metro extract tiles
assert cfg.metro_extract_url
Expand Down Expand Up @@ -1313,6 +1423,8 @@ def tilequeue_main(argv_args=None):
create_command_parser(tilequeue_load_tiles_of_interest)),
('enqueue-tiles-of-interest',
create_command_parser(tilequeue_enqueue_tiles_of_interest)),
('prune-tiles-of-interest',
create_command_parser(tilequeue_prune_tiles_of_interest)),
('tile-size', create_command_parser(tilequeue_tile_sizes)),
('wof-process-neighbourhoods', create_command_parser(
tilequeue_process_wof_neighbourhoods)),
Expand Down
4 changes: 4 additions & 0 deletions tilequeue/tile.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def deserialize_coord(coord_string):
return coord


def create_coord(x, y, z):
return Coordinate(row=y, column=x, zoom=z)


def parse_expired_coord_string(coord_string):
# we use the same format in the queue as the expired tile list from
# osm2pgsql
Expand Down
6 changes: 4 additions & 2 deletions tilequeue/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import traceback
from itertools import izip_longest


def format_stacktrace_one_line(exc_info=None):
Expand All @@ -14,6 +15,7 @@ def format_stacktrace_one_line(exc_info=None):
return stacktrace


def grouper(seq, size):
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks"""
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)