Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronus cursor (refs #19) #21

Merged
merged 13 commits into from
Sep 24, 2017
129 changes: 129 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,135 @@ As Pandas DataFrame:
df = as_pandas(cursor)
print(df.describe())

Asynchronous Cursor
~~~~~~~~~~~~~~~~~~~

Asynchronous cursor is a simple implementation using the concurrent.futures package.
Python 2.7 uses `backport of the concurrent.futures`_ package.
This cursor is not `DB API 2.0 (PEP 249)`_ compliant.

You can use the asynchronous cursor by specifying the ``cursor_class``
with the connect method or connection object.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

It can also be used by specifying the cursor class when calling the connection object's cursor method.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor(AsyncCursor)

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor(AsyncCursor)

The default number of workers is 5 or cpu number * 5.
If you want to change the number of workers you can specify like the following.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor(max_workers=10)

The execute method of the asynchronous cursor returns the tuple of the query ID and the `future object`_.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")

The return value of the `future object`_ is an ``AthenaResultSet`` object.
This object has an interface that can fetch and iterate query results similar to synchronous cursors.
It also has information on the result of query execution.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.state)
print(result_set.state_change_reason)
print(result_set.completion_date_time)
print(result_set.submission_date_time)
print(result_set.data_scanned_in_bytes)
print(result_set.execution_time_in_millis)
print(result_set.output_location)
print(result_set.description)
for row in result_set:
print(row)

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.fetchall())

A query ID is required to cancel a query with the asynchronous cursor.

.. code:: python

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)

NOTE: The cancel method of the `future object`_ does not cancel the query.

.. _`backport of the concurrent.futures`: https://pypi.python.org/pypi/futures
.. _`future object`: https://docs.python.org/3/library/concurrent.futures.html#future-objects

Credentials
-----------

Expand Down
14 changes: 2 additions & 12 deletions pyathena/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@ def __cmp__(self, other):
Timestamp = datetime.datetime


def connect(s3_staging_dir=None, region_name=None, schema_name='default',
poll_interval=1, encryption_option=None, kms_key=None, profile_name=None,
converter=None, formatter=None,
retry_exceptions=('ThrottlingException', 'TooManyRequestsException'),
retry_attempt=5, retry_multiplier=1,
retry_max_delay=1800, retry_exponential_base=2,
**kwargs):
def connect(*args, **kwargs):
from pyathena.connection import Connection
return Connection(s3_staging_dir, region_name, schema_name,
poll_interval, encryption_option, kms_key, profile_name,
converter, formatter, retry_exceptions, retry_attempt,
retry_multiplier, retry_max_delay, retry_exponential_base,
**kwargs)
return Connection(*args, **kwargs)
80 changes: 80 additions & 0 deletions pyathena/async_cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
import logging

from concurrent.futures.thread import ThreadPoolExecutor

from pyathena.common import CursorIterator
from pyathena.cursor import BaseCursor
from pyathena.error import ProgrammingError, NotSupportedError
from pyathena.result_set import AthenaResultSet

try:
from multiprocessing import cpu_count
except ImportError:
def cpu_count():
return None


_logger = logging.getLogger(__name__)


class AsyncCursor(BaseCursor):

def __init__(self, client, s3_staging_dir, schema_name, poll_interval,
encryption_option, kms_key, converter, formatter,
retry_exceptions, retry_attempt, retry_multiplier,
retry_max_delay, retry_exponential_base,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE):
super(AsyncCursor, self).__init__(client, s3_staging_dir, schema_name, poll_interval,
encryption_option, kms_key, converter, formatter,
retry_exceptions, retry_attempt, retry_multiplier,
retry_max_delay, retry_exponential_base)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._arraysize = arraysize

@property
def arraysize(self):
return self._arraysize

@arraysize.setter
def arraysize(self, value):
if value <= 0 or value > CursorIterator.DEFAULT_FETCH_SIZE:
raise ProgrammingError('MaxResults is more than maximum allowed length {0}.'.format(
CursorIterator.DEFAULT_FETCH_SIZE))
self._arraysize = value

def close(self, wait=False):
self._executor.shutdown(wait=wait)

def _description(self, query_id):
result_set = self._collect_result_set(query_id)
return result_set.description

def description(self, query_id):
return self._executor.submit(self._description, query_id)

def query_execution(self, query_id):
return self._executor.submit(self._query_execution, query_id)

def poll(self, query_id):
return self._executor.submit(self._poll, query_id)

def _collect_result_set(self, query_id):
query_execution = self._poll(query_id)
return AthenaResultSet(
self._connection, self._converter, query_execution, self._arraysize,
self.retry_exceptions, self.retry_attempt, self.retry_multiplier,
self.retry_max_delay, self.retry_exponential_base)

def execute(self, operation, parameters=None):
query_id = self._execute(operation, parameters)
return query_id, self._executor.submit(self._collect_result_set, query_id)

def executemany(self, operation, seq_of_parameters):
raise NotSupportedError

def cancel(self, query_id):
return self._executor.submit(self._cancel, query_id)
Loading