Skip to content

Commit

Permalink
Merge pull request #28 from se7entyse7en/filtered_aggregation
Browse files Browse the repository at this point in the history
Added support for filtered aggregator
  • Loading branch information
xvrl committed Sep 21, 2015
2 parents 8d03eb0 + 8b8b138 commit 7b74ceb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
24 changes: 22 additions & 2 deletions pydruid/utils/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#
from six import iteritems

from .filters import Filter


def longsum(raw_metric):
return {"type": "longSum", "fieldName": raw_metric}

Expand All @@ -34,9 +37,26 @@ def max(raw_metric):
def count(raw_metric):
return {"type": "count", "fieldName": raw_metric}


def hyperunique(raw_metric):
return {"type": "hyperUnique", "fieldName": raw_metric}


def filtered(filter, agg):
return {"type": "filtered",
"filter": Filter.build_filter(filter),
"aggregator": agg}


def build_aggregators(agg_input):
return [dict([('name', k)] + list(v.items()))
for (k, v) in iteritems(agg_input)]
return [_build_aggregator(name, kwargs)
for (name, kwargs) in iteritems(agg_input)]


def _build_aggregator(name, kwargs):
if kwargs["type"] == "filtered":
kwargs["aggregator"]["name"] = name
else:
kwargs.update({"name": name})

return kwargs
69 changes: 69 additions & 0 deletions tests/utils/test_aggregators.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: UTF-8 -*-

from operator import itemgetter
from copy import deepcopy

from pydruid.utils import aggregators
from pydruid.utils import filters


class TestAggregators:
Expand All @@ -16,6 +18,27 @@ def test_aggregators(self):
for f, agg_type in aggs_funcs:
assert f('metric') == {'type': agg_type, 'fieldName': 'metric'}

def test_filtered_aggregator(self):
filter_ = filters.Filter(dimension='dim', value='val')
aggs = [aggregators.count('metric1'),
aggregators.longsum('metric2'),
aggregators.doublesum('metric3'),
aggregators.min('metric4'),
aggregators.max('metric5'),
aggregators.hyperunique('metric6')]
for agg in aggs:
expected = {
'type': 'filtered',
'filter': {
'type': 'selector',
'dimension': 'dim',
'value': 'val'
},
'aggregator': agg
}
actual = aggregators.filtered(filter_, agg)
assert actual == expected

def test_build_aggregators(self):
agg_input = {
'agg1': aggregators.count('metric1'),
Expand All @@ -36,3 +59,49 @@ def test_build_aggregators(self):
]
assert (sorted(built_agg, key=itemgetter('name')) ==
sorted(expected, key=itemgetter('name')))

def test_build_filtered_aggregator(self):
filter_ = filters.Filter(dimension='dim', value='val')
agg_input = {
'agg1': aggregators.filtered(filter_,
aggregators.count('metric1')),
'agg2': aggregators.filtered(filter_,
aggregators.longsum('metric2')),
'agg3': aggregators.filtered(filter_,
aggregators.doublesum('metric3')),
'agg4': aggregators.filtered(filter_,
aggregators.min('metric4')),
'agg5': aggregators.filtered(filter_,
aggregators.max('metric5')),
'agg6': aggregators.filtered(filter_,
aggregators.hyperunique('metric6'))
}
base = {
'type': 'filtered',
'filter': {
'type': 'selector',
'dimension': 'dim',
'value': 'val'
}
}

aggs = [
{'name': 'agg1', 'type': 'count', 'fieldName': 'metric1'},
{'name': 'agg2', 'type': 'longSum', 'fieldName': 'metric2'},
{'name': 'agg3', 'type': 'doubleSum', 'fieldName': 'metric3'},
{'name': 'agg4', 'type': 'min', 'fieldName': 'metric4'},
{'name': 'agg5', 'type': 'max', 'fieldName': 'metric5'},
{'name': 'agg6', 'type': 'hyperUnique', 'fieldName': 'metric6'},
]
expected = []
for agg in aggs:
exp = deepcopy(base)
exp.update({'aggregator': agg})
expected.append(exp)

built_agg = aggregators.build_aggregators(agg_input)
expected = sorted(built_agg, key=lambda k: itemgetter('name')(
itemgetter('aggregator')(k)))
actual = sorted(expected, key=lambda k: itemgetter('name')(
itemgetter('aggregator')(k)))
assert expected == actual

0 comments on commit 7b74ceb

Please sign in to comment.