diff --git a/pydruid/client.py b/pydruid/client.py index 1c4719fb..d00798d4 100755 --- a/pydruid/client.py +++ b/pydruid/client.py @@ -60,20 +60,18 @@ def parse(self): res = json.loads(self.result_json) return res else: - print("Empty query") - return None + raise IOError('{Error parsing result: {0} for {1} query'.format(self.result_json, self.query_type)) # --------- Export implementations --------- def export_tsv(self,dest_path): - f = open(dest_path,'wb') tsv_file = csv.writer(f, delimiter = '\t') if(self.query_type == "timeseries"): header = self.result[0]['result'].keys() header.append('timestamp') - elif(self.query_type == "groupby"): + elif(self.query_type == "groupBy"): header = self.result[0]['event'].keys() header.append('timestamp') header.append('version') @@ -107,111 +105,71 @@ def export_pandas(self): nres = [dict(v) for v in nres] elif self.query_type == "topN": nres = [] - for item in self.result: # this is a {'result':list of dicts, 'timestap': timestamp} + for item in self.result: timestamp = item['timestamp'] results = item['result'] tres = [dict(res.items() + [('timestamp', timestamp)]) for res in results] nres += tres - elif self.query_type == "groupby": + elif self.query_type == "groupBy": nres = [v['event'].items() + [('timestamp', v['timestamp'])] for v in self.result] nres = [dict(v) for v in nres] else: - print('query: {0} not implemented yet'.format(self.query_type)) - return None + raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type)) df = pandas.DataFrame(nres) return df # --------- Query implementations --------- - def topN(self, **args): - query_dict = {"queryType" : "topN"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations','intervals', 'dimension', 'threshold', 'metric'] - + def validateQuery(self, valid_parts, args): for key, val in args.iteritems(): if key not in valid_parts: raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : + + def buildQuery(self, query_type, args): + query_dict = {'queryType' : query_type} + + for key, val in args.iteritems(): + if key == "aggregations" : query_dict[key] = build_aggregators(val) elif key == "postAggregations" : query_dict[key] = build_post_aggregators(val) elif key == "filter" : - query_dict[key] = val.filter['filter'] + query_dict[key] = build_filter(val) else: query_dict[key] = val self.query_dict = query_dict - self.query_type = "topN" - return self.post(query_dict) - - def timeseries(self, **args): - - query_dict = {"queryType" : "timeseries"} - valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', 'intervals'] + self.query_type = query_type - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val + def topN(self, **args): + valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', + 'intervals', 'dimension', 'threshold', 'metric'] + self.validateQuery(valid_parts, args) + self.buildQuery('topN', args) + return self.post(self.query_dict) - self.query_dict = query_dict - self.query_type = 'timeseries' - return self.post(query_dict) + def timeseries(self, **args): + valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', + 'intervals'] + self.validateQuery(valid_parts, args) + self.buildQuery('timeseries', args) + return self.post(self.query_dict) def groupBy(self, **args): - - query_dict = {"queryType" : "groupBy"} valid_parts = ['dataSource', 'granularity', 'filter', 'aggregations', 'postAggregations', - 'intervals', 'dimensions'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - elif key == "aggregations" : - query_dict[key] = build_aggregators(val) - elif key == "postAggregations": - query_dict[key] = build_post_aggregators(val) - elif key == "filter": - query_dict[key] = val.filter['filter'] - else: - query_dict[key] = val - - self.query_dict = query_dict - self.query_type = 'groupby' - return self.post(query_dict) + 'intervals', 'dimensions'] + self.validateQuery(valid_parts, args) + self.buildQuery('groupBy', args) + return self.post(self.query_dict) def segmentMetadata(self, **args): - - query_dict = {"queryType" : "segmentMetadata"} valid_parts = ['dataSource', 'intervals'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - self.query_dict = query_dict - self.query_type = 'segmentMetadata' - return self.post(query_dict) + self.validateQuery(valid_parts, args) + self.buildQuery('segmentMetaData', args) + return self.post(self.query_dict) def timeBoundary(self, **args): - - query_dict = {"queryType" : "timeBoundary"} valid_parts = ['dataSource'] - - for key, val in args.iteritems(): - if key not in valid_parts: - raise ValueError('{0} is not a valid query component. The list of valid components is: \n {1}'.format(key, valid_parts)) - else: - query_dict[key] = val - - self.query_dict = query_dict - self.query_type = 'timeBoundary' - return self.post(query_dict) + self.validateQuery(valid_parts, args) + self.buildQuery('timeBoundary', args) + return self.post(self.query_dict) diff --git a/pydruid/utils/filters.py b/pydruid/utils/filters.py index 562cf78d..a10a5a7c 100644 --- a/pydruid/utils/filters.py +++ b/pydruid/utils/filters.py @@ -26,7 +26,7 @@ def __init__(self, **args): elif args["type"] == "and": self.filter = {"filter": {"type" : "and", "fields" : args["fields"]}} - # construct an or filter + # construct an or filter elif args["type"] == "or": self.filter = {"filter": {"type" : "or", "fields" : args["fields"]}} @@ -35,7 +35,7 @@ def __init__(self, **args): self.filter = {"filter" : {"type" : "not", "field" : args["field"]}} else: - print("you've slain me. nevermind the teeth and the fingernails. the show must go on.") + raise NotImplemented('Filter type: {0} does not exist'.format(args['type'])) def show(self): print(json.dumps(self.filter, indent = 4)) @@ -55,4 +55,7 @@ def __init__(self, dim): self.dimension = dim def __eq__(self,other): - return Filter(dimension = self.dimension, value = other) \ No newline at end of file + return Filter(dimension = self.dimension, value = other) + +def build_filter(filterObj): + return filterObj.filter['filter']