Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit more detailed timing and stats #274

Merged
merged 3 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tilequeue/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def processed(self, intersect_metrics, n_enqueued, n_inflight, timing,
total=intersect_metrics['total'],
hits=intersect_metrics['hits'],
misses=intersect_metrics['misses'],
cached=intersect_metrics['cached'],
),
enqueued=n_enqueued,
inflight=n_inflight,
Expand Down
41 changes: 29 additions & 12 deletions tilequeue/rawr.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def __init__(self, s3_client, bucket, key):

def tiles_of_interest(self):
"""conditionally get the toi from s3"""

# also return back whether the response was cached
# useful for metrics
is_cached = False

get_options = dict(
Bucket=self.bucket,
Key=self.key,
Expand All @@ -206,6 +211,7 @@ def tiles_of_interest(self):
if status_code == 304:
assert self.prev_toi
toi = self.prev_toi
is_cached = True
elif status_code == 200:
body = resp['Body']
try:
Expand All @@ -222,15 +228,19 @@ def tiles_of_interest(self):
else:
assert 0, 'Unknown status code from toi get: %s' % status_code

return toi
return toi, is_cached

def __call__(self, coords):
toi = self.tiles_of_interest()
coord_ints = convert_to_coord_ints(coords)
intersected_coord_ints, intersect_metrics = \
explode_and_intersect(coord_ints, toi)
coords = map(coord_unmarshall_int, intersected_coord_ints)
return coords, intersect_metrics
timing = {}
with time_block(timing, 'fetch'):
toi, is_toi_cached = self.tiles_of_interest()
with time_block(timing, 'intersect'):
coord_ints = convert_to_coord_ints(coords)
intersected_coord_ints, intersect_metrics = \
explode_and_intersect(coord_ints, toi)
coords = map(coord_unmarshall_int, intersected_coord_ints)
intersect_metrics['cached'] = is_toi_cached
return coords, intersect_metrics, timing


class EmptyToiIntersector(object):
Expand All @@ -242,7 +252,7 @@ class EmptyToiIntersector(object):
"""

def tiles_of_interest(self):
return set([])
return set([]), False

def __call__(self, coords):
metrics = dict(
Expand Down Expand Up @@ -306,16 +316,23 @@ def __call__(self):
continue

try:
with time_block(timing, 'rawr_gen'):
self.rawr_gen(rawr_tile_coord)
rawr_gen_timing = {}
with time_block(rawr_gen_timing, 'total'):
rawr_gen_specific_timing = self.rawr_gen(rawr_tile_coord)
rawr_gen_timing.update(rawr_gen_specific_timing)
timing['rawr_gen'] = rawr_gen_timing

except Exception as e:
self.log_exception(e, 'rawr tile gen', parent)
continue

try:
with time_block(timing, 'toi_intersect'):
coords_to_enqueue, intersect_metrics = \
intersect_timing = {}
with time_block(intersect_timing, 'total'):
coords_to_enqueue, intersect_metrics, int_spec_timing = \
self.rawr_toi_intersector(coords)
intersect_timing.update(int_spec_timing)
timing['toi'] = intersect_timing
except Exception as e:
self.log_exception(e, 'intersect coords', parent)
continue
Expand Down
13 changes: 10 additions & 3 deletions tilequeue/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class RawrTilePipelineStatsHandler(object):
def __init__(self, stats):
self.stats = stats

def emit_time_dict(self, pipe, timing, prefix):
for timing_label, value in timing.items():
metric_name = '%s.%s' % (prefix, timing_label)
if isinstance(value, dict):
self.emit_time_dict(pipe, value, metric_name)
else:
pipe.timing(metric_name, value)

def __call__(self, intersect_metrics, n_enqueued, n_inflight, timing):
with self.stats.pipeline() as pipe:

Expand All @@ -58,6 +66,5 @@ def __call__(self, intersect_metrics, n_enqueued, n_inflight, timing):
pipe.gauge('rawr.process.enqueued', n_enqueued)
pipe.gauge('rawr.process.inflight', n_inflight)

for timing_label, value in timing.items():
metric_name = 'rawr.process.time.%s' % timing_label
pipe.timing(metric_name, value)
prefix = 'rawr.process.time'
self.emit_time_dict(pipe, timing, prefix)