From 8fcaf0e859de6e25463e1ddce02ccf1f1ea0f9f2 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 9 Jul 2020 12:00:54 +0200 Subject: [PATCH 1/7] Check if kernel is alive during cell execution --- nbclient/client.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index 7b0b40ea..ff0793d3 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -310,6 +310,7 @@ def __init__( def reset_execution_trackers(self) -> None: """Resets any per-execution trackers. """ + self.task_poll_for_reply: t.Optional[asyncio.Future] = None self.code_cells_executed = 0 self._display_id_map = {} self.widget_state: t.Dict[str, t.Dict] = {} @@ -584,7 +585,8 @@ async def _async_poll_for_reply( msg_id: str, cell: NotebookNode, timeout: t.Optional[int], - task_poll_output_msg: asyncio.Future) -> t.Dict: + task_poll_output_msg: asyncio.Future, + task_poll_kernel_alive: asyncio.Future) -> t.Dict: assert self.kc is not None new_timeout: t.Optional[float] = None @@ -601,11 +603,13 @@ async def _async_poll_for_reply( await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout) except (asyncio.TimeoutError, Empty): if self.raise_on_iopub_timeout: + task_poll_kernel_alive.cancel() raise CellTimeoutError.error_from_timeout_and_cell( "Timeout waiting for IOPub output", self.iopub_timeout, cell ) else: self.log.warning("Timeout waiting for IOPub output") + task_poll_kernel_alive.cancel() return msg else: if new_timeout is not None: @@ -613,6 +617,7 @@ async def _async_poll_for_reply( except Empty: # received no message, check if kernel is still alive assert timeout is not None + task_poll_kernel_alive.cancel() await self._async_check_alive() await self._async_handle_timeout(timeout, cell) @@ -632,6 +637,15 @@ async def _async_poll_output_msg( except CellExecutionComplete: return + async def _async_poll_kernel_alive(self) -> None: + while True: + await asyncio.sleep(1) + try: + await self._async_check_alive() + except DeadKernelError: + self.task_poll_for_reply.cancel() + return + def _get_timeout(self, cell: t.Optional[NotebookNode]) -> int: if self.timeout_func is not None and cell is not None: timeout = self.timeout_func(cell) @@ -775,13 +789,23 @@ async def async_execute_cell( cell.outputs = [] self.clear_before_next_output = False + task_poll_kernel_alive = asyncio.ensure_future( + self._async_poll_kernel_alive() + ) task_poll_output_msg = asyncio.ensure_future( self._async_poll_output_msg(parent_msg_id, cell, cell_index) ) - try: - exec_reply = await self._async_poll_for_reply( - parent_msg_id, cell, exec_timeout, task_poll_output_msg + self.task_poll_for_reply = asyncio.ensure_future( + self._async_poll_for_reply( + parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive ) + ) + try: + exec_reply = await self.task_poll_for_reply + except asyncio.CancelledError: + # can only be cancelled by task_poll_kernel_alive when the kernel is dead + task_poll_output_msg.cancel() + raise DeadKernelError("Kernel died") except Exception as e: # Best effort to cancel request if it hasn't been resolved try: From 17a07fbb9b21e64dca04f5117ce0d4466dd0454a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 9 Jul 2020 15:07:24 +0200 Subject: [PATCH 2/7] Add test --- nbclient/tests/files/Autokill.ipynb | 37 +++++++++++++++++++++++++++++ nbclient/tests/test_client.py | 15 ++++++++++-- 2 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 nbclient/tests/files/Autokill.ipynb diff --git a/nbclient/tests/files/Autokill.ipynb b/nbclient/tests/files/Autokill.ipynb new file mode 100644 index 00000000..06ceade3 --- /dev/null +++ b/nbclient/tests/files/Autokill.ipynb @@ -0,0 +1,37 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import signal\n", + "pid = os.getpid()\n", + "os.kill(pid, signal.SIGTERM)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 37466395..a20fb09a 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -530,8 +530,8 @@ def timeout_func(source): with pytest.raises(TimeoutError): run_notebook(filename, dict(timeout_func=timeout_func), res) - def test_kernel_death(self): - """Check that an error is raised when the kernel is_alive is false""" + def test_kernel_death_after_timeout(self): + """Check that an error is raised when the kernel is_alive is false after a cell timed out""" filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb') with io.open(filename, 'r') as f: input_nb = nbformat.read(f, 4) @@ -552,6 +552,17 @@ async def is_alive(): with pytest.raises(RuntimeError): input_nb, output_nb = executor.execute() + def test_kernel_death_during_execution(self): + """Check that an error is raised when the kernel is_alive is false during a cell execution""" + filename = os.path.join(current_dir, 'files', 'Autokill.ipynb') + with io.open(filename, 'r') as f: + input_nb = nbformat.read(f, 4) + + executor = NotebookClient(input_nb) + + with pytest.raises(RuntimeError): + output_nb = executor.execute() + def test_allow_errors(self): """ Check that conversion halts if ``allow_errors`` is False. From 448961d35e4588e5e7850c4cb2fe7edcd57d0185 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 9 Jul 2020 16:35:44 +0200 Subject: [PATCH 3/7] Delete kernel manager in _async_cleanup_kernel --- nbclient/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nbclient/client.py b/nbclient/client.py index ff0793d3..599df441 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -358,6 +358,7 @@ async def _async_cleanup_kernel(self) -> None: if getattr(self, "kc") and self.kc is not None: await ensure_async(self.kc.stop_channels()) self.kc = None + self.km = None _cleanup_kernel = run_sync(_async_cleanup_kernel) From 7e811ee5b30205d1fa71884f19cf84df6b35da10 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 9 Jul 2020 16:48:36 +0200 Subject: [PATCH 4/7] Fix flake8 and mypy --- nbclient/client.py | 1 + nbclient/tests/test_client.py | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index 599df441..0778cfa2 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -644,6 +644,7 @@ async def _async_poll_kernel_alive(self) -> None: try: await self._async_check_alive() except DeadKernelError: + assert self.task_poll_for_reply is not None self.task_poll_for_reply.cancel() return diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index a20fb09a..fcac7a50 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -541,7 +541,7 @@ def test_kernel_death_after_timeout(self): executor = NotebookClient(input_nb, timeout=1) with pytest.raises(TimeoutError): - output_nb = executor.execute() + executor.execute() km = executor.start_kernel_manager() async def is_alive(): @@ -553,7 +553,9 @@ async def is_alive(): input_nb, output_nb = executor.execute() def test_kernel_death_during_execution(self): - """Check that an error is raised when the kernel is_alive is false during a cell execution""" + """Check that an error is raised when the kernel is_alive is false during a cell + execution. + """ filename = os.path.join(current_dir, 'files', 'Autokill.ipynb') with io.open(filename, 'r') as f: input_nb = nbformat.read(f, 4) @@ -561,7 +563,7 @@ def test_kernel_death_during_execution(self): executor = NotebookClient(input_nb) with pytest.raises(RuntimeError): - output_nb = executor.execute() + executor.execute() def test_allow_errors(self): """ From b6366ccf70249108712521d5b6ea8802a066c350 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 9 Jul 2020 17:00:45 +0200 Subject: [PATCH 5/7] Use jupyter_client's cleanup_resources method --- nbclient/client.py | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index 0778cfa2..cab2e473 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -354,7 +354,7 @@ async def _async_cleanup_kernel(self) -> None: raise finally: # Remove any state left over even if we failed to stop the kernel - await ensure_async(self.km.cleanup()) + await ensure_async(self.km.cleanup_resources()) if getattr(self, "kc") and self.kc is not None: await ensure_async(self.kc.stop_channels()) self.kc = None diff --git a/requirements.txt b/requirements.txt index 19441cd4..4f25058b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ traitlets>=4.2 -jupyter_client>=6.1.0 +jupyter_client>=6.1.5 nbformat>=5.0 async_generator nest_asyncio From 915069f6b51f3bfc75ed0ce38757817056b4ee4b Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 10 Jul 2020 09:54:58 +0200 Subject: [PATCH 6/7] Suppress warning in test_many_parallel_notebooks --- nbclient/tests/test_client.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index fcac7a50..51234222 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -6,6 +6,7 @@ import threading import asyncio import datetime +import warnings import nbformat import sys @@ -303,11 +304,15 @@ def test_many_parallel_notebooks(capfd): res = NBClientTestsBase().build_resources() res["metadata"]["path"] = os.path.join(current_dir, "files") - # run once, to trigger creating the original context - run_notebook(input_file, opts, res) + with warnings.catch_warnings(): + # suppress warning from jupyter_client's deprecated cleanup() + warnings.simplefilter(action='ignore', category=FutureWarning) + + # run once, to trigger creating the original context + run_notebook(input_file, opts, res) - with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: - executor.map(run_notebook_wrapper, [(input_file, opts, res) for i in range(8)]) + with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: + executor.map(run_notebook_wrapper, [(input_file, opts, res) for i in range(8)]) captured = capfd.readouterr() assert captured.err == "" From c37f73a1e9c5ea077f3b807ea850ab51cab871a9 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 10 Jul 2020 10:07:45 +0200 Subject: [PATCH 7/7] Again --- nbclient/tests/test_client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 51234222..d3f74909 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -74,9 +74,12 @@ def run_notebook(filename, opts, resources=None): opts = {'resources': resources, **opts} executor = NotebookClient(cleaned_input_nb, **opts) - # Override terminal size to standardise traceback format - with modified_env({'COLUMNS': '80', 'LINES': '24'}): - output_nb = executor.execute() + with warnings.catch_warnings(): + # suppress warning from jupyter_client's deprecated cleanup() + warnings.simplefilter(action='ignore', category=FutureWarning) + # Override terminal size to standardise traceback format + with modified_env({'COLUMNS': '80', 'LINES': '24'}): + output_nb = executor.execute() return input_nb, output_nb