From b8daf7559bbe9c7080f7fb43b860f0a684074946 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Mon, 11 Dec 2017 21:31:45 -0800 Subject: [PATCH] Add support for 'scan' queries --- pydruid/client.py | 40 ++++++++++++++++++++++++++++++++++++++++ pydruid/query.py | 22 ++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/pydruid/client.py b/pydruid/client.py index 0ac6bd48..27973fd0 100755 --- a/pydruid/client.py +++ b/pydruid/client.py @@ -410,3 +410,43 @@ def _post(self, query): else: query.parse(data) return query + + def scan(self, **kwargs): + """ + A scan query returns raw Druid rows + + Required key/value pairs: + + :param str datasource: Data source to query + :param str granularity: Time bucket to aggregate data by hour, day, minute, etc. + :param int limit: The maximum number of rows to return + :param intervals: ISO-8601 intervals for which to run the query on + :type intervals: str or list + + Optional key/value pairs: + + :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query + :param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned + :param list metrics: The list of metrics to select. If left empty, all metrics are returned + :param dict context: A dict of query context options + + :return: The query result + :rtype: Query + + Example: + + .. code-block:: python + :linenos: + + >>> raw_data = client.scan( + datasource=twitterstream, + granularity='all', + intervals='2013-06-14/pt1h', + limit=1, + context={"timeout": 1000} + ) + >>> print raw_data + >>> [{u'segmentId': u'zzzz', u'columns': [u'__time', 'status', 'region'], 'events': [{u'status': u'ok', 'region': u'SF', u'__time': 1509494400000}]}] + """ + query = self.query_builder.scan(kwargs) + return self._post(query) diff --git a/pydruid/query.py b/pydruid/query.py index 1c70937f..17e3294d 100644 --- a/pydruid/query.py +++ b/pydruid/query.py @@ -172,6 +172,11 @@ def export_pandas(self): nres = [list(v['event'].items()) + [('timestamp', v['timestamp'])] for v in self.result] nres = [dict(v) for v in nres] + elif self.query_type == "scan": + nres = [] + print(self.result) + for item in self.result: + nres += [e for e in item.get('events')] else: raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type)) @@ -404,3 +409,20 @@ def search(self, args): ] self.validate_query(query_type, valid_parts, args) return self.build_query(query_type, args) + + def scan(self, args): + """ + A scan query returns raw Druid rows + + :param dict args: dict of args + + :return: select query + :rtype: Query + """ + query_type = 'scan' + valid_parts = [ + 'datasource', 'granularity', 'filter', 'dimensions', 'metrics', + 'intervals', 'limit', + ] + self.validate_query(query_type, valid_parts, args) + return self.build_query(query_type, args)