diff --git a/tilequeue/log.py b/tilequeue/log.py index 2e7e7a72..7fd99f51 100644 --- a/tilequeue/log.py +++ b/tilequeue/log.py @@ -147,6 +147,7 @@ def processed(self, intersect_metrics, n_enqueued, n_inflight, timing, total=intersect_metrics['total'], hits=intersect_metrics['hits'], misses=intersect_metrics['misses'], + cached=intersect_metrics['cached'], ), enqueued=n_enqueued, inflight=n_inflight, diff --git a/tilequeue/rawr.py b/tilequeue/rawr.py index fb70ee89..c09322ea 100644 --- a/tilequeue/rawr.py +++ b/tilequeue/rawr.py @@ -187,6 +187,11 @@ def __init__(self, s3_client, bucket, key): def tiles_of_interest(self): """conditionally get the toi from s3""" + + # also return back whether the response was cached + # useful for metrics + is_cached = False + get_options = dict( Bucket=self.bucket, Key=self.key, @@ -206,6 +211,7 @@ def tiles_of_interest(self): if status_code == 304: assert self.prev_toi toi = self.prev_toi + is_cached = True elif status_code == 200: body = resp['Body'] try: @@ -222,15 +228,19 @@ def tiles_of_interest(self): else: assert 0, 'Unknown status code from toi get: %s' % status_code - return toi + return toi, is_cached def __call__(self, coords): - toi = self.tiles_of_interest() - coord_ints = convert_to_coord_ints(coords) - intersected_coord_ints, intersect_metrics = \ - explode_and_intersect(coord_ints, toi) - coords = map(coord_unmarshall_int, intersected_coord_ints) - return coords, intersect_metrics + timing = {} + with time_block(timing, 'fetch'): + toi, is_toi_cached = self.tiles_of_interest() + with time_block(timing, 'intersect'): + coord_ints = convert_to_coord_ints(coords) + intersected_coord_ints, intersect_metrics = \ + explode_and_intersect(coord_ints, toi) + coords = map(coord_unmarshall_int, intersected_coord_ints) + intersect_metrics['cached'] = is_toi_cached + return coords, intersect_metrics, timing class EmptyToiIntersector(object): @@ -242,7 +252,7 @@ class EmptyToiIntersector(object): """ def tiles_of_interest(self): - return set([]) + return set([]), False def __call__(self, coords): metrics = dict( @@ -306,16 +316,23 @@ def __call__(self): continue try: - with time_block(timing, 'rawr_gen'): - self.rawr_gen(rawr_tile_coord) + rawr_gen_timing = {} + with time_block(rawr_gen_timing, 'total'): + rawr_gen_specific_timing = self.rawr_gen(rawr_tile_coord) + rawr_gen_timing.update(rawr_gen_specific_timing) + timing['rawr_gen'] = rawr_gen_timing + except Exception as e: self.log_exception(e, 'rawr tile gen', parent) continue try: - with time_block(timing, 'toi_intersect'): - coords_to_enqueue, intersect_metrics = \ + intersect_timing = {} + with time_block(intersect_timing, 'total'): + coords_to_enqueue, intersect_metrics, int_spec_timing = \ self.rawr_toi_intersector(coords) + intersect_timing.update(int_spec_timing) + timing['toi'] = intersect_timing except Exception as e: self.log_exception(e, 'intersect coords', parent) continue diff --git a/tilequeue/stats.py b/tilequeue/stats.py index 676858b7..f92e1d7a 100644 --- a/tilequeue/stats.py +++ b/tilequeue/stats.py @@ -41,6 +41,14 @@ class RawrTilePipelineStatsHandler(object): def __init__(self, stats): self.stats = stats + def emit_time_dict(self, pipe, timing, prefix): + for timing_label, value in timing.items(): + metric_name = '%s.%s' % (prefix, timing_label) + if isinstance(value, dict): + self.emit_time_dict(pipe, value, metric_name) + else: + pipe.timing(metric_name, value) + def __call__(self, intersect_metrics, n_enqueued, n_inflight, timing): with self.stats.pipeline() as pipe: @@ -58,6 +66,5 @@ def __call__(self, intersect_metrics, n_enqueued, n_inflight, timing): pipe.gauge('rawr.process.enqueued', n_enqueued) pipe.gauge('rawr.process.inflight', n_inflight) - for timing_label, value in timing.items(): - metric_name = 'rawr.process.time.%s' % timing_label - pipe.timing(metric_name, value) + prefix = 'rawr.process.time' + self.emit_time_dict(pipe, timing, prefix)