Skip to content

Commit

Permalink
implemented topN queries. made more informative error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Deep Ganguli committed Jan 22, 2014
1 parent b660ba7 commit ac74bf6
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def post(self,query):
self.result_json = data;
res.close()
except urllib2.HTTPError, e:
raise IOError('Malformed query: \n {0}'.format(json.dumps(self.query_dict, indent = 4)))
raise IOError('{0} \n Query is: {1}'.format(e, json.dumps(self.query_dict, indent = 4)))
else:
self.result = self.parse()
parsed = self.parse()
Expand Down Expand Up @@ -83,12 +83,11 @@ def export_tsv(self,dest_path):
w = query_utils.UnicodeWriter(f)

if self.result:
if self.query_type == "timeseries":
if self.query_type == "topN" or self.query_type == "timeseries":
for item in self.result:
timestamp = item['timestamp']
result = item['result']

if type(result) is list:
if type(result) is list: #topN
for line in result:
w.writerow(line.values() + [timestamp])
else: #timeseries
Expand All @@ -106,17 +105,43 @@ def export_pandas(self):
if self.query_type == "timeseries":
nres = [v['result'].items() + [('timestamp',v['timestamp'])] for v in self.result]
nres = [dict(v) for v in nres]
elif self.query_type == "topN":
nres = []
for item in self.result: # this is a {'result':list of dicts, 'timestap': timestamp}
timestamp = item['timestamp']
results = item['result']
tres = [dict(res.items() + [('timestamp', timestamp)]) for res in results]
nres += tres
elif self.query_type == "groupby":
nres = [v['event'].items() + [('timestamp', v['timestamp'])] for v in self.result]
nres = [dict(v) for v in nres]
else:
print('not implemented yet')
print('query: {0} not implemented yet'.format(self.query_type))
return None

df = pandas.DataFrame(nres)
return df

# --------- Query implementations ---------
def topN(self, **args):
query_dict = {"queryType" : "topN"}
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations','intervals', 'dimension', 'threshold', 'metric']

for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
elif key == "aggregations" :
query_dict[key] = build_aggregators(val)
elif key == "postAggregations" :
query_dict[key] = build_post_aggregators(val)
elif key == "filter" :
query_dict[key] = val.filter['filter']
else:
query_dict[key] = val

self.query_dict = query_dict
self.query_type = "topN"
return self.post(query_dict)

def timeseries(self, **args):

Expand Down

0 comments on commit ac74bf6

Please sign in to comment.