Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update wof process to enqueue to rawr queue #251

Merged
merged 1 commit into from
Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,13 +1442,28 @@ def print_count(label, total_count):


def tilequeue_process_wof_neighbourhoods(cfg, peripherals):
from tilequeue.rawr import make_rawr_enqueuer
from tilequeue.wof import make_wof_model
from tilequeue.wof import make_wof_url_neighbourhood_fetcher
from tilequeue.wof import make_wof_processor

wof_cfg = cfg.wof
assert wof_cfg, 'Missing wof config'

rawr_yaml = cfg.yml.get('rawr')
assert rawr_yaml is not None, 'Missing rawr configuration in yaml'

group_by_zoom = rawr_yaml.get('group-zoom')
assert group_by_zoom is not None, 'Missing group-zoom rawr config'

rawr_queue_name = rawr_yaml.get('queue')
assert rawr_queue_name, 'Missing rawr queue'
rawr_queue = make_rawr_queue(rawr_queue_name)

msg_marshall_yaml = cfg.yml.get('message-marshall')
assert msg_marshall_yaml, 'Missing message-marshall config'
msg_marshaller = make_message_marshaller(msg_marshall_yaml)

logger = make_logger(cfg, 'wof_process_neighbourhoods')
logger.info('WOF process neighbourhoods run started')

Expand All @@ -1464,11 +1479,11 @@ def tilequeue_process_wof_neighbourhoods(cfg, peripherals):
)
model = make_wof_model(wof_cfg['postgresql'])

n_enqueue_threads = 20
current_date = datetime.date.today()
rawr_enqueuer = make_rawr_enqueuer(
rawr_queue, msg_marshaller, group_by_zoom, logger)
processor = make_wof_processor(
fetcher, model, peripherals.toi, peripherals.queue_writer,
n_enqueue_threads, logger, current_date)
fetcher, model, peripherals.toi, rawr_enqueuer, logger, current_date)

logger.info('Processing ...')
processor()
Expand Down Expand Up @@ -1759,12 +1774,13 @@ def tilequeue_rawr_enqueue(cfg, args):
assert msg_marshall_yaml, 'Missing message-marshall config'
msg_marshaller = make_message_marshaller(msg_marshall_yaml)

from tilequeue.rawr import process_expiry
logger = make_logger(cfg, 'rawr_expiry')
from tilequeue.rawr import make_rawr_enqueuer
rawr_enqueuer = make_rawr_enqueuer(
rawr_queue, msg_marshaller, group_by_zoom, logger)
with open(args.expiry_path) as fh:
coords = create_coords_generator_from_tiles_file(fh)
process_expiry(
rawr_queue, msg_marshaller, group_by_zoom, coords, logger)
rawr_enqueuer(coords)


def tilequeue_rawr_process(cfg, peripherals):
Expand Down
66 changes: 39 additions & 27 deletions tilequeue/rawr.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,34 +83,42 @@ def done(self, msg_handle):
)


def process_expiry(rawr_queue, msg_marshaller, group_by_zoom, coords,
logger=None):
class RawrEnqueuer(object):
"""enqueue coords from expiry grouped by parent zoom"""
grouped_by_zoom = defaultdict(list)
for coord in coords:
assert group_by_zoom <= coord.zoom
parent = coord.zoomTo(group_by_zoom).container()
parent_coord_int = coord_marshall_int(parent)
grouped_by_zoom[parent_coord_int].append(coord)

n_coords = 0
payloads = []
for _, coords in grouped_by_zoom.iteritems():
payload = msg_marshaller.marshall(coords)
payloads.append(payload)
n_coords += len(coords)
n_payloads = len(payloads)

rawr_queue_batch_size = 10
n_msgs_sent = 0
for payloads_chunk in grouper(payloads, rawr_queue_batch_size):
rawr_queue.send(payloads_chunk)
n_msgs_sent += 1

if logger:
logger.info(
'Expiry processed: coords(%d) payloads(%d) enqueue-calls(%d))' %
(n_coords, n_payloads, n_msgs_sent))

def __init__(self, rawr_queue, msg_marshaller, group_by_zoom, logger):
self.rawr_queue = rawr_queue
self.msg_marshaller = msg_marshaller
self.group_by_zoom = group_by_zoom
self.logger = logger

def __call__(self, coords):
grouped_by_zoom = defaultdict(list)
for coord in coords:
assert self.group_by_zoom <= coord.zoom
parent = coord.zoomTo(self.group_by_zoom).container()
parent_coord_int = coord_marshall_int(parent)
grouped_by_zoom[parent_coord_int].append(coord)

n_coords = 0
payloads = []
for _, coords in grouped_by_zoom.iteritems():
payload = self.msg_marshaller.marshall(coords)
payloads.append(payload)
n_coords += len(coords)
n_payloads = len(payloads)

rawr_queue_batch_size = 10
n_msgs_sent = 0
for payloads_chunk in grouper(payloads, rawr_queue_batch_size):
self.rawr_queue.send(payloads_chunk)
n_msgs_sent += 1

if self.logger:
self.logger.info(
'Expiry processed: '
'coords(%d) payloads(%d) enqueue-calls(%d))' %
(n_coords, n_payloads, n_msgs_sent))


def common_parent(coords, parent_zoom):
Expand Down Expand Up @@ -307,6 +315,10 @@ def make_rawr_s3_path(tile, prefix, suffix):
return path_with_hash


def make_rawr_enqueuer(rawr_queue, msg_marshaller, group_by_zoom, logger):
return RawrEnqueuer(rawr_queue, msg_marshaller, group_by_zoom, logger)


class RawrS3Sink(object):

"""Rawr sink to write to s3"""
Expand Down
10 changes: 5 additions & 5 deletions tilequeue/wof.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,12 @@ def log_failure(logger, failure):
class WofProcessor(object):

def __init__(self, fetcher, model, redis_cache_index, intersector,
queue_writer, logger, current_date):
rawr_enqueuer, logger, current_date):
self.fetcher = fetcher
self.model = model
self.redis_cache_index = redis_cache_index
self.intersector = intersector
self.queue_writer = queue_writer
self.rawr_enqueuer = rawr_enqueuer
self.logger = logger
self.zoom_expiry = 16
self.zoom_until = 11
Expand Down Expand Up @@ -1183,7 +1183,7 @@ def _sync_neighbourhoods():
if coords:
self.logger.info('Asking enqueuer to enqueue %d coords ...' %
len(coords))
self.queue_writer.write_batch(coords)
self.rawr_enqueuer(coords)
self.logger.info('Asking enqueuer to enqueue %d coords ... done' %
len(coords))
else:
Expand Down Expand Up @@ -1253,12 +1253,12 @@ def make_wof_model(postgresql_conn_info):


def make_wof_processor(
fetcher, model, redis_cache_index, queue_writer, n_threads, logger,
fetcher, model, redis_cache_index, rawr_enqueuer, logger,
current_date):
from tilequeue.command import explode_and_intersect
wof_processor = WofProcessor(
fetcher, model, redis_cache_index, explode_and_intersect,
queue_writer, logger, current_date)
rawr_enqueuer, logger, current_date)
return wof_processor


Expand Down