Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for canceling query execution on KeyboardInterrupt #150

Merged
merged 1 commit into from
Jul 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyathena/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
retry_config,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE,
kill_on_interrupt=True,
):
super(AsyncCursor, self).__init__(
connection=connection,
Expand All @@ -40,6 +41,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._arraysize = arraysize
Expand Down
2 changes: 2 additions & 0 deletions pyathena/async_pandas_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
retry_config,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE,
kill_on_interrupt=True,
):
super(AsyncPandasCursor, self).__init__(
connection=connection,
Expand All @@ -40,6 +41,7 @@ def __init__(
retry_config=retry_config,
max_workers=max_workers,
arraysize=arraysize,
kill_on_interrupt=kill_on_interrupt,
)

def _collect_result_set(self, query_id):
Expand Down
16 changes: 15 additions & 1 deletion pyathena/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt,
**kwargs
):
super(BaseCursor, self).__init__(**kwargs)
Expand All @@ -97,6 +98,7 @@ def __init__(
self._converter = converter
self._formatter = formatter
self._retry_config = retry_config
self._kill_on_interrupt = kill_on_interrupt

@property
def connection(self):
Expand All @@ -117,7 +119,7 @@ def _get_query_execution(self, query_id):
else:
return AthenaQueryExecution(response)

def _poll(self, query_id):
def __poll(self, query_id):
while True:
query_execution = self._get_query_execution(query_id)
if query_execution.state in [
Expand All @@ -129,6 +131,18 @@ def _poll(self, query_id):
else:
time.sleep(self._poll_interval)

def _poll(self, query_id):
try:
query_execution = self.__poll(query_id)
except KeyboardInterrupt as e:
if self._kill_on_interrupt:
_logger.warning("Query canceled by user.")
self._cancel(query_id)
query_execution = self.__poll(query_id)
else:
raise e
return query_execution

def _build_start_query_execution_request(
self, query, work_group=None, s3_staging_dir=None
):
Expand Down
3 changes: 3 additions & 0 deletions pyathena/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
formatter=None,
retry_config=None,
cursor_class=Cursor,
kill_on_interrupt=True,
**kwargs
):
self._kwargs = kwargs
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
self._formatter = formatter if formatter else DefaultParameterFormatter()
self._retry_config = retry_config if retry_config else RetryConfig()
self.cursor_class = cursor_class
self.kill_on_interrupt = kill_on_interrupt

def _assume_role(
self, profile_name, region_name, role_arn, role_session_name, duration_seconds
Expand Down Expand Up @@ -171,6 +173,7 @@ def cursor(self, cursor=None, **kwargs):
converter=converter,
formatter=kwargs.pop("formatter", self._formatter),
retry_config=kwargs.pop("retry_config", self._retry_config),
kill_on_interrupt=kwargs.pop("kill_on_interrupt", self.kill_on_interrupt),
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions pyathena/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt=True,
**kwargs
):
super(Cursor, self).__init__(
Expand All @@ -38,6 +39,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions pyathena/pandas_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt=True,
**kwargs
):
super(PandasCursor, self).__init__(
Expand All @@ -40,6 +41,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
**kwargs
)

Expand Down