Skip to content

Commit

Permalink
client.py:
Browse files Browse the repository at this point in the history
- query implementations share more code in common
- more informative exceptions are thrown on error
filters.py:
- added build_filter method for better readability
- more informative exceptions are thrown on error
  • Loading branch information
Deep Ganguli committed Jan 22, 2014
1 parent ac74bf6 commit 0d56b23
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 81 deletions.
114 changes: 36 additions & 78 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ def parse(self):
res = json.loads(self.result_json)
return res
else:
print("Empty query")
return None
raise IOError('{Error parsing result: {0} for {1} query'.format(self.result_json, self.query_type))

# --------- Export implementations ---------

def export_tsv(self,dest_path):

f = open(dest_path,'wb')
tsv_file = csv.writer(f, delimiter = '\t')

if(self.query_type == "timeseries"):
header = self.result[0]['result'].keys()
header.append('timestamp')
elif(self.query_type == "groupby"):
elif(self.query_type == "groupBy"):
header = self.result[0]['event'].keys()
header.append('timestamp')
header.append('version')
Expand Down Expand Up @@ -107,111 +105,71 @@ def export_pandas(self):
nres = [dict(v) for v in nres]
elif self.query_type == "topN":
nres = []
for item in self.result: # this is a {'result':list of dicts, 'timestap': timestamp}
for item in self.result:
timestamp = item['timestamp']
results = item['result']
tres = [dict(res.items() + [('timestamp', timestamp)]) for res in results]
nres += tres
elif self.query_type == "groupby":
elif self.query_type == "groupBy":
nres = [v['event'].items() + [('timestamp', v['timestamp'])] for v in self.result]
nres = [dict(v) for v in nres]
else:
print('query: {0} not implemented yet'.format(self.query_type))
return None
raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type))

df = pandas.DataFrame(nres)
return df

# --------- Query implementations ---------
def topN(self, **args):
query_dict = {"queryType" : "topN"}
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations','intervals', 'dimension', 'threshold', 'metric']

def validateQuery(self, valid_parts, args):
for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
elif key == "aggregations" :

def buildQuery(self, query_type, args):
query_dict = {'queryType' : query_type}

for key, val in args.iteritems():
if key == "aggregations" :
query_dict[key] = build_aggregators(val)
elif key == "postAggregations" :
query_dict[key] = build_post_aggregators(val)
elif key == "filter" :
query_dict[key] = val.filter['filter']
query_dict[key] = build_filter(val)
else:
query_dict[key] = val

self.query_dict = query_dict
self.query_type = "topN"
return self.post(query_dict)

def timeseries(self, **args):

query_dict = {"queryType" : "timeseries"}
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', 'intervals']
self.query_type = query_type

for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
elif key == "aggregations" :
query_dict[key] = build_aggregators(val)
elif key == "postAggregations":
query_dict[key] = build_post_aggregators(val)
elif key == "filter":
query_dict[key] = val.filter['filter']
else:
query_dict[key] = val
def topN(self, **args):
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations',
'intervals', 'dimension', 'threshold', 'metric']
self.validateQuery(valid_parts, args)
self.buildQuery('topN', args)
return self.post(self.query_dict)

self.query_dict = query_dict
self.query_type = 'timeseries'
return self.post(query_dict)
def timeseries(self, **args):
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations',
'intervals']
self.validateQuery(valid_parts, args)
self.buildQuery('timeseries', args)
return self.post(self.query_dict)

def groupBy(self, **args):

query_dict = {"queryType" : "groupBy"}
valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations',
'intervals', 'dimensions']

for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
elif key == "aggregations" :
query_dict[key] = build_aggregators(val)
elif key == "postAggregations":
query_dict[key] = build_post_aggregators(val)
elif key == "filter":
query_dict[key] = val.filter['filter']
else:
query_dict[key] = val

self.query_dict = query_dict
self.query_type = 'groupby'
return self.post(query_dict)
'intervals', 'dimensions']
self.validateQuery(valid_parts, args)
self.buildQuery('groupBy', args)
return self.post(self.query_dict)

def segmentMetadata(self, **args):

query_dict = {"queryType" : "segmentMetadata"}
valid_parts = ['dataSource', 'intervals']

for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
else:
query_dict[key] = val

self.query_dict = query_dict
self.query_type = 'segmentMetadata'
return self.post(query_dict)
self.validateQuery(valid_parts, args)
self.buildQuery('segmentMetaData', args)
return self.post(self.query_dict)

def timeBoundary(self, **args):

query_dict = {"queryType" : "timeBoundary"}
valid_parts = ['dataSource']

for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts))
else:
query_dict[key] = val

self.query_dict = query_dict
self.query_type = 'timeBoundary'
return self.post(query_dict)
self.validateQuery(valid_parts, args)
self.buildQuery('timeBoundary', args)
return self.post(self.query_dict)
9 changes: 6 additions & 3 deletions pydruid/utils/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, **args):
elif args["type"] == "and":
self.filter = {"filter": {"type" : "and",
"fields" : args["fields"]}}
# construct an or filter
# construct an or filter
elif args["type"] == "or":
self.filter = {"filter": {"type" : "or",
"fields" : args["fields"]}}
Expand All @@ -35,7 +35,7 @@ def __init__(self, **args):
self.filter = {"filter" : {"type" : "not",
"field" : args["field"]}}
else:
print("you've slain me. nevermind the teeth and the fingernails. the show must go on.")
raise NotImplemented('Filter type: {0} does not exist'.format(args['type']))

def show(self):
print(json.dumps(self.filter, indent = 4))
Expand All @@ -55,4 +55,7 @@ def __init__(self, dim):
self.dimension = dim

def __eq__(self,other):
return Filter(dimension = self.dimension, value = other)
return Filter(dimension = self.dimension, value = other)

def build_filter(filterObj):
return filterObj.filter['filter']

0 comments on commit 0d56b23

Please sign in to comment.