Skip to content

Commit

Permalink
Add support for 'scan' queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Dec 12, 2017
1 parent 0b3ae1b commit b8daf75
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
40 changes: 40 additions & 0 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,43 @@ def _post(self, query):
else:
query.parse(data)
return query

def scan(self, **kwargs):
"""
A scan query returns raw Druid rows
Required key/value pairs:
:param str datasource: Data source to query
:param str granularity: Time bucket to aggregate data by hour, day, minute, etc.
:param int limit: The maximum number of rows to return
:param intervals: ISO-8601 intervals for which to run the query on
:type intervals: str or list
Optional key/value pairs:
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned
:param list metrics: The list of metrics to select. If left empty, all metrics are returned
:param dict context: A dict of query context options
:return: The query result
:rtype: Query
Example:
.. code-block:: python
:linenos:
>>> raw_data = client.scan(
datasource=twitterstream,
granularity='all',
intervals='2013-06-14/pt1h',
limit=1,
context={"timeout": 1000}
)
>>> print raw_data
>>> [{u'segmentId': u'zzzz', u'columns': [u'__time', 'status', 'region'], 'events': [{u'status': u'ok', 'region': u'SF', u'__time': 1509494400000}]}]
"""
query = self.query_builder.scan(kwargs)
return self._post(query)
22 changes: 22 additions & 0 deletions pydruid/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ def export_pandas(self):
nres = [list(v['event'].items()) + [('timestamp', v['timestamp'])]
for v in self.result]
nres = [dict(v) for v in nres]
elif self.query_type == "scan":
nres = []
print(self.result)
for item in self.result:
nres += [e for e in item.get('events')]
else:
raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type))

Expand Down Expand Up @@ -404,3 +409,20 @@ def search(self, args):
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)

def scan(self, args):
"""
A scan query returns raw Druid rows
:param dict args: dict of args
:return: select query
:rtype: Query
"""
query_type = 'scan'
valid_parts = [
'datasource', 'granularity', 'filter', 'dimensions', 'metrics',
'intervals', 'limit',
]
self.validate_query(query_type, valid_parts, args)
return self.build_query(query_type, args)

0 comments on commit b8daf75

Please sign in to comment.