Skip to content

Commit

Permalink
Support for canceling query execution on KeyboardInterrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
laughingman7743 committed Jul 10, 2020
1 parent 9a81cf8 commit 72c10aa
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pyathena/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
retry_config,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE,
kill_on_interrupt=True,
):
super(AsyncCursor, self).__init__(
connection=connection,
Expand All @@ -40,6 +41,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._arraysize = arraysize
Expand Down
2 changes: 2 additions & 0 deletions pyathena/async_pandas_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
retry_config,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE,
kill_on_interrupt=True,
):
super(AsyncPandasCursor, self).__init__(
connection=connection,
Expand All @@ -40,6 +41,7 @@ def __init__(
retry_config=retry_config,
max_workers=max_workers,
arraysize=arraysize,
kill_on_interrupt=kill_on_interrupt,
)

def _collect_result_set(self, query_id):
Expand Down
16 changes: 15 additions & 1 deletion pyathena/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt,
**kwargs
):
super(BaseCursor, self).__init__(**kwargs)
Expand All @@ -97,6 +98,7 @@ def __init__(
self._converter = converter
self._formatter = formatter
self._retry_config = retry_config
self._kill_on_interrupt = kill_on_interrupt

@property
def connection(self):
Expand All @@ -117,7 +119,7 @@ def _get_query_execution(self, query_id):
else:
return AthenaQueryExecution(response)

def _poll(self, query_id):
def __poll(self, query_id):
while True:
query_execution = self._get_query_execution(query_id)
if query_execution.state in [
Expand All @@ -129,6 +131,18 @@ def _poll(self, query_id):
else:
time.sleep(self._poll_interval)

def _poll(self, query_id):
try:
query_execution = self.__poll(query_id)
except KeyboardInterrupt as e:
if self._kill_on_interrupt:
_logger.warning("Query canceled by user.")
self._cancel(query_id)
query_execution = self.__poll(query_id)
else:
raise e
return query_execution

def _build_start_query_execution_request(
self, query, work_group=None, s3_staging_dir=None
):
Expand Down
3 changes: 3 additions & 0 deletions pyathena/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
formatter=None,
retry_config=None,
cursor_class=Cursor,
kill_on_interrupt=True,
**kwargs
):
self._kwargs = kwargs
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
self._formatter = formatter if formatter else DefaultParameterFormatter()
self._retry_config = retry_config if retry_config else RetryConfig()
self.cursor_class = cursor_class
self.kill_on_interrupt = kill_on_interrupt

def _assume_role(
self, profile_name, region_name, role_arn, role_session_name, duration_seconds
Expand Down Expand Up @@ -171,6 +173,7 @@ def cursor(self, cursor=None, **kwargs):
converter=converter,
formatter=kwargs.pop("formatter", self._formatter),
retry_config=kwargs.pop("retry_config", self._retry_config),
kill_on_interrupt=kwargs.pop("kill_on_interrupt", self.kill_on_interrupt),
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions pyathena/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt=True,
**kwargs
):
super(Cursor, self).__init__(
Expand All @@ -38,6 +39,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions pyathena/pandas_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
converter,
formatter,
retry_config,
kill_on_interrupt=True,
**kwargs
):
super(PandasCursor, self).__init__(
Expand All @@ -40,6 +41,7 @@ def __init__(
converter=converter,
formatter=formatter,
retry_config=retry_config,
kill_on_interrupt=kill_on_interrupt,
**kwargs
)

Expand Down

0 comments on commit 72c10aa

Please sign in to comment.