Skip to content

Commit

Permalink
Infer the max zoom to enqueue from metatile size
Browse files Browse the repository at this point in the history
  • Loading branch information
rmarianski committed Apr 5, 2017
1 parent c6bf885 commit cb38515
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 48 deletions.
4 changes: 2 additions & 2 deletions config.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ tiles:
# process on update
parent-zoom-until: 11

# this is the max zoom used when enqueueing the tiles of interest
enqueue_max_zoom: 15
# zoom where tile content stops changing
max-zoom-with-changes: 16

process:
# number of simultaneous "querysets" to issue to the database. The
Expand Down
62 changes: 45 additions & 17 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,13 @@ def test_merge_override(self):

class TestCliConfiguration(unittest.TestCase):

def _call_fut(self, args, config_dict):
def _call_fut(self, config_dict):
from tilequeue.config import make_config_from_argparse
from yaml import dump
from cStringIO import StringIO
raw_yaml = dump(config_dict)
return make_config_from_argparse(
args,
opencfg=self._fp(raw_yaml))

def _fp(self, raw_yaml):
# stub out a file object that has __enter__, __exit__ methods
from StringIO import StringIO
from contextlib import closing
return lambda filename: closing(StringIO(raw_yaml))

def _args(self, data):
# create an object with data as the attributes
return type('mock-args', (object,), data)
raw_yaml_file_obj = StringIO(raw_yaml)
return make_config_from_argparse(raw_yaml_file_obj)

def _assert_cfg(self, cfg, to_check):
# cfg is the config object to validate
Expand All @@ -67,8 +57,7 @@ def _assert_cfg(self, cfg, to_check):
self.assertEqual(v, cfg_val)

def test_no_config(self):
cfg = self._call_fut(
self._args(dict(config=None)), {})
cfg = self._call_fut(dict(config=None))
# just assert some of the defaults are set
self._assert_cfg(cfg,
dict(s3_path='osm',
Expand All @@ -78,10 +67,49 @@ def test_no_config(self):

def test_config_osm_path_modified(self):
cfg = self._call_fut(
self._args(dict(config='config')),
dict(store=dict(path='custompath')))
self._assert_cfg(cfg,
dict(s3_path='custompath',
output_formats=['json'],
seed_all_zoom_start=None,
seed_all_zoom_until=None))


class TestMetatileConfiguration(unittest.TestCase):

def _call_fut(self, config_dict):
from tilequeue.config import make_config_from_argparse
from yaml import dump
from cStringIO import StringIO
raw_yaml = dump(config_dict)
raw_yaml_file_obj = StringIO(raw_yaml)
return make_config_from_argparse(raw_yaml_file_obj)

def test_metatile_size_default(self):
config_dict = {}
cfg = self._call_fut(config_dict)
self.assertIsNone(cfg.metatile_size)
self.assertEquals(cfg.metatile_zoom, 0)

def test_metatile_size_1(self):
config_dict = dict(metatile=dict(size=1))
cfg = self._call_fut(config_dict)
self.assertEquals(cfg.metatile_size, 1)
self.assertEquals(cfg.metatile_zoom, 0)

def test_metatile_size_2(self):
config_dict = dict(metatile=dict(size=2))
cfg = self._call_fut(config_dict)
self.assertEquals(cfg.metatile_size, 2)
self.assertEquals(cfg.metatile_zoom, 1)

def test_metatile_size_4(self):
config_dict = dict(metatile=dict(size=4))
cfg = self._call_fut(config_dict)
self.assertEquals(cfg.metatile_size, 4)
self.assertEquals(cfg.metatile_zoom, 2)

def test_max_zoom(self):
config_dict = dict(metatile=dict(size=2))
cfg = self._call_fut(config_dict)
self.assertEquals(cfg.max_zoom, 15)
7 changes: 4 additions & 3 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,11 @@ def tilequeue_process(cfg, peripherals):
thread_sqs_queue_reader_stop = threading.Event()
sqs_queue_reader = SqsQueueReader(
sqs_queue, sqs_input_queue, logger, thread_sqs_queue_reader_stop,
cfg.metatile_size)
cfg.max_zoom)

data_fetch = DataFetch(
feature_fetcher, sqs_input_queue, sql_data_fetch_queue, io_pool,
peripherals.redis_cache_index, logger, cfg.metatile_size)
peripherals.redis_cache_index, logger, cfg.metatile_zoom, cfg.max_zoom)

data_processor = ProcessAndFormatData(
post_process_data, formats, sql_data_fetch_queue, processor_queue,
Expand Down Expand Up @@ -1455,7 +1455,8 @@ def tilequeue_main(argv_args=None):
args = parser.parse_args(argv_args)
assert os.path.exists(args.config), \
'Config file {} does not exist!'.format(args.config)
cfg = make_config_from_argparse(args.config)
with open(args.config) as fh:
cfg = make_config_from_argparse(fh)
redis_client = make_redis_client(cfg)
Peripherals = namedtuple('Peripherals', 'redis_cache_index queue')
queue = make_queue(
Expand Down
27 changes: 18 additions & 9 deletions tilequeue/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from tilequeue.tile import bounds_buffer
from yaml import load
import math


class Configuration(object):
Expand All @@ -24,8 +25,6 @@ def __init__(self, yml):
self.s3_path = self._cfg('store path')
self.s3_date_prefix = self._cfg('store date-prefix')

self.enqueue_max_zoom = self.yml['tiles']['enqueue_max_zoom']

seed_cfg = self.yml['tiles']['seed']
self.seed_all_zoom_start = seed_cfg['all']['zoom-start']
self.seed_all_zoom_until = seed_cfg['all']['zoom-until']
Expand Down Expand Up @@ -98,6 +97,17 @@ def __init__(self, yml):
self.wof = self.yml.get('wof')

self.metatile_size = self._cfg('metatile size')
if self.metatile_size is None:
self.metatile_zoom = 0
else:
self.metatile_zoom = int(math.log(self.metatile_size, 2))
assert (1 << self.metatile_zoom) == self.metatile_size, \
"Metatile size must be a power of two."

self.max_zoom_with_changes = self._cfg('tiles max-zoom-with-changes')
assert self.max_zoom_with_changes > self.metatile_zoom
self.max_zoom = self.max_zoom_with_changes - self.metatile_zoom

self.store_orig = self._cfg('metatile store_metatile_and_originals')

self.sql_queue_buffer_size = self._cfg('queue_buffer_size sql')
Expand Down Expand Up @@ -162,7 +172,7 @@ def default_yml_config():
'expired-location': None,
'parent-zoom-until': None,
},
'enqueue_max_zoom': 15,
'max-zoom-with-changes': 16,
},
'process': {
'n-simultaneous-query-sets': 0,
Expand Down Expand Up @@ -214,12 +224,11 @@ def merge_cfg(dest, source):
return dest


def make_config_from_argparse(config_path, opencfg=open):
# opencfg for testing
cfg = default_yml_config()
with opencfg(config_path) as config_fp:
yml_data = load(config_fp.read())
cfg = merge_cfg(cfg, yml_data)
def make_config_from_argparse(config_file_handle, default_yml=None):
if default_yml is None:
default_yml = default_yml_config()
yml_data = load(config_file_handle)
cfg = merge_cfg(default_yml, yml_data)
return Configuration(cfg)


Expand Down
29 changes: 12 additions & 17 deletions tilequeue/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import signal
import sys
import time
import math


# long enough to not fight with other threads, but not long enough
Expand Down Expand Up @@ -93,20 +92,17 @@ def __call__(self, coord, data):

class SqsQueueReader(object):

def __init__(self, sqs_queue, output_queue, logger, stop,
metatile_size, sqs_msgs_to_read_size=10):
def __init__(
self, sqs_queue, output_queue, logger, stop, max_zoom,
sqs_msgs_to_read_size=10):
self.sqs_queue = sqs_queue
self.output = OutputQueue(output_queue, stop)
self.sqs_msgs_to_read_size = sqs_msgs_to_read_size
self.logger = logger
self.stop = stop
self.metatile_zoom = int(math.log(metatile_size, 2))
assert (1 << self.metatile_zoom) == metatile_size, \
"Metatile size must be a power of two."
self.max_zoom = max_zoom

def __call__(self):
max_zoom = 16 - self.metatile_zoom

while not self.stop.is_set():
try:
msgs = self.sqs_queue.read(
Expand All @@ -121,11 +117,11 @@ def __call__(self):
if self.stop.is_set():
break

if msg.coord.zoom > max_zoom:
if msg.coord.zoom > self.max_zoom:
self.logger.log(
logging.WARNING,
'Job coordinates above max zoom are not supported, '
'skipping %r > %d' % (msg.coord, max_zoom))
'skipping %r > %d' % (msg.coord, self.max_zoom))

# delete jobs that we can't handle from the queue,
# otherwise we'll get stuck in a cycle of timed-out jobs
Expand Down Expand Up @@ -161,17 +157,17 @@ def __call__(self):

class DataFetch(object):

def __init__(self, fetcher, input_queue, output_queue, io_pool,
redis_cache_index, logger, metatile_size):
def __init__(
self, fetcher, input_queue, output_queue, io_pool,
redis_cache_index, logger, metatile_zoom, max_zoom):
self.fetcher = fetcher
self.input_queue = input_queue
self.output_queue = output_queue
self.io_pool = io_pool
self.redis_cache_index = redis_cache_index
self.logger = logger
self.metatile_zoom = int(math.log(metatile_size, 2))
assert (1 << self.metatile_zoom) == metatile_size, \
"Metatile size must be a power of two."
self.metatile_zoom = metatile_zoom
self.max_zoom = max_zoom

def __call__(self, stop):
saw_sentinel = False
Expand Down Expand Up @@ -208,7 +204,6 @@ def __call__(self, stop):

metadata = data['metadata']
metadata['timing']['fetch_seconds'] = time.time() - start
max_zoom = 16 - self.metatile_zoom

# every tile job that we get from the queue is a "parent" tile
# and its four children to cut from it. at zoom 15, this may
Expand All @@ -218,7 +213,7 @@ def __call__(self, stop):
if nominal_zoom > coord.zoom:
cut_coords.extend(coord_children_range(coord, nominal_zoom))

if coord.zoom == max_zoom:
if coord.zoom == self.max_zoom:
async_jobs = []
children_until = 20
# ask redis if there are any tiles underneath in the
Expand Down

0 comments on commit cb38515

Please sign in to comment.