Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for filtered aggregator #28

Merged
merged 2 commits into from
Sep 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pydruid/utils/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#
from six import iteritems

from .filters import Filter


def longsum(raw_metric):
return {"type": "longSum", "fieldName": raw_metric}

Expand All @@ -34,9 +37,26 @@ def max(raw_metric):
def count(raw_metric):
return {"type": "count", "fieldName": raw_metric}


def hyperunique(raw_metric):
return {"type": "hyperUnique", "fieldName": raw_metric}


def filtered(filter, agg):
return {"type": "filtered",
"filter": Filter.build_filter(filter),
"aggregator": agg}


def build_aggregators(agg_input):
return [dict([('name', k)] + list(v.items()))
for (k, v) in iteritems(agg_input)]
return [_build_aggregator(name, kwargs)
for (name, kwargs) in iteritems(agg_input)]


def _build_aggregator(name, kwargs):
if kwargs["type"] == "filtered":
kwargs["aggregator"]["name"] = name
else:
kwargs.update({"name": name})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this affects anything, but before, if kwarks already contained a name it would use that instead of overriding it, since we passed the name as the first key/value pair to dict(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm gonna check it, maybe I could also write some tests for this module in order to ensure that the change does not brake compatibility. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, tests would be awesome!


return kwargs
69 changes: 69 additions & 0 deletions tests/utils/test_aggregators.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: UTF-8 -*-

from operator import itemgetter
from copy import deepcopy

from pydruid.utils import aggregators
from pydruid.utils import filters


class TestAggregators:
Expand All @@ -16,6 +18,27 @@ def test_aggregators(self):
for f, agg_type in aggs_funcs:
assert f('metric') == {'type': agg_type, 'fieldName': 'metric'}

def test_filtered_aggregator(self):
filter_ = filters.Filter(dimension='dim', value='val')
aggs = [aggregators.count('metric1'),
aggregators.longsum('metric2'),
aggregators.doublesum('metric3'),
aggregators.min('metric4'),
aggregators.max('metric5'),
aggregators.hyperunique('metric6')]
for agg in aggs:
expected = {
'type': 'filtered',
'filter': {
'type': 'selector',
'dimension': 'dim',
'value': 'val'
},
'aggregator': agg
}
actual = aggregators.filtered(filter_, agg)
assert actual == expected

def test_build_aggregators(self):
agg_input = {
'agg1': aggregators.count('metric1'),
Expand All @@ -36,3 +59,49 @@ def test_build_aggregators(self):
]
assert (sorted(built_agg, key=itemgetter('name')) ==
sorted(expected, key=itemgetter('name')))

def test_build_filtered_aggregator(self):
filter_ = filters.Filter(dimension='dim', value='val')
agg_input = {
'agg1': aggregators.filtered(filter_,
aggregators.count('metric1')),
'agg2': aggregators.filtered(filter_,
aggregators.longsum('metric2')),
'agg3': aggregators.filtered(filter_,
aggregators.doublesum('metric3')),
'agg4': aggregators.filtered(filter_,
aggregators.min('metric4')),
'agg5': aggregators.filtered(filter_,
aggregators.max('metric5')),
'agg6': aggregators.filtered(filter_,
aggregators.hyperunique('metric6'))
}
base = {
'type': 'filtered',
'filter': {
'type': 'selector',
'dimension': 'dim',
'value': 'val'
}
}

aggs = [
{'name': 'agg1', 'type': 'count', 'fieldName': 'metric1'},
{'name': 'agg2', 'type': 'longSum', 'fieldName': 'metric2'},
{'name': 'agg3', 'type': 'doubleSum', 'fieldName': 'metric3'},
{'name': 'agg4', 'type': 'min', 'fieldName': 'metric4'},
{'name': 'agg5', 'type': 'max', 'fieldName': 'metric5'},
{'name': 'agg6', 'type': 'hyperUnique', 'fieldName': 'metric6'},
]
expected = []
for agg in aggs:
exp = deepcopy(base)
exp.update({'aggregator': agg})
expected.append(exp)

built_agg = aggregators.build_aggregators(agg_input)
expected = sorted(built_agg, key=lambda k: itemgetter('name')(
itemgetter('aggregator')(k)))
actual = sorted(expected, key=lambda k: itemgetter('name')(
itemgetter('aggregator')(k)))
assert expected == actual