Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if kernel is alive during cell execution #90

Merged
merged 7 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions nbclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def __init__(
def reset_execution_trackers(self) -> None:
"""Resets any per-execution trackers.
"""
self.task_poll_for_reply: t.Optional[asyncio.Future] = None
self.code_cells_executed = 0
self._display_id_map = {}
self.widget_state: t.Dict[str, t.Dict] = {}
Expand Down Expand Up @@ -353,10 +354,11 @@ async def _async_cleanup_kernel(self) -> None:
raise
finally:
# Remove any state left over even if we failed to stop the kernel
await ensure_async(self.km.cleanup())
await ensure_async(self.km.cleanup_resources())
if getattr(self, "kc") and self.kc is not None:
await ensure_async(self.kc.stop_channels())
self.kc = None
self.km = None

_cleanup_kernel = run_sync(_async_cleanup_kernel)

Expand Down Expand Up @@ -584,7 +586,8 @@ async def _async_poll_for_reply(
msg_id: str,
cell: NotebookNode,
timeout: t.Optional[int],
task_poll_output_msg: asyncio.Future) -> t.Dict:
task_poll_output_msg: asyncio.Future,
task_poll_kernel_alive: asyncio.Future) -> t.Dict:

assert self.kc is not None
new_timeout: t.Optional[float] = None
Expand All @@ -601,18 +604,21 @@ async def _async_poll_for_reply(
await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout)
except (asyncio.TimeoutError, Empty):
if self.raise_on_iopub_timeout:
task_poll_kernel_alive.cancel()
raise CellTimeoutError.error_from_timeout_and_cell(
"Timeout waiting for IOPub output", self.iopub_timeout, cell
)
else:
self.log.warning("Timeout waiting for IOPub output")
task_poll_kernel_alive.cancel()
return msg
else:
if new_timeout is not None:
new_timeout = max(0, deadline - monotonic())
except Empty:
# received no message, check if kernel is still alive
assert timeout is not None
task_poll_kernel_alive.cancel()
await self._async_check_alive()
await self._async_handle_timeout(timeout, cell)

Expand All @@ -632,6 +638,16 @@ async def _async_poll_output_msg(
except CellExecutionComplete:
return

async def _async_poll_kernel_alive(self) -> None:
while True:
await asyncio.sleep(1)
try:
await self._async_check_alive()
except DeadKernelError:
assert self.task_poll_for_reply is not None
self.task_poll_for_reply.cancel()
return

def _get_timeout(self, cell: t.Optional[NotebookNode]) -> int:
if self.timeout_func is not None and cell is not None:
timeout = self.timeout_func(cell)
Expand Down Expand Up @@ -775,13 +791,23 @@ async def async_execute_cell(
cell.outputs = []
self.clear_before_next_output = False

task_poll_kernel_alive = asyncio.ensure_future(
self._async_poll_kernel_alive()
)
task_poll_output_msg = asyncio.ensure_future(
self._async_poll_output_msg(parent_msg_id, cell, cell_index)
)
try:
exec_reply = await self._async_poll_for_reply(
parent_msg_id, cell, exec_timeout, task_poll_output_msg
self.task_poll_for_reply = asyncio.ensure_future(
self._async_poll_for_reply(
parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive
)
)
try:
exec_reply = await self.task_poll_for_reply
except asyncio.CancelledError:
# can only be cancelled by task_poll_kernel_alive when the kernel is dead
task_poll_output_msg.cancel()
raise DeadKernelError("Kernel died")
except Exception as e:
# Best effort to cancel request if it hasn't been resolved
try:
Expand Down
37 changes: 37 additions & 0 deletions nbclient/tests/files/Autokill.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import signal\n",
"pid = os.getpid()\n",
"os.kill(pid, signal.SIGTERM)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
41 changes: 31 additions & 10 deletions nbclient/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
import asyncio
import datetime
import warnings

import nbformat
import sys
Expand Down Expand Up @@ -73,9 +74,12 @@ def run_notebook(filename, opts, resources=None):
opts = {'resources': resources, **opts}
executor = NotebookClient(cleaned_input_nb, **opts)

# Override terminal size to standardise traceback format
with modified_env({'COLUMNS': '80', 'LINES': '24'}):
output_nb = executor.execute()
with warnings.catch_warnings():
# suppress warning from jupyter_client's deprecated cleanup()
warnings.simplefilter(action='ignore', category=FutureWarning)
# Override terminal size to standardise traceback format
with modified_env({'COLUMNS': '80', 'LINES': '24'}):
output_nb = executor.execute()

return input_nb, output_nb

Expand Down Expand Up @@ -303,11 +307,15 @@ def test_many_parallel_notebooks(capfd):
res = NBClientTestsBase().build_resources()
res["metadata"]["path"] = os.path.join(current_dir, "files")

# run once, to trigger creating the original context
run_notebook(input_file, opts, res)
with warnings.catch_warnings():
# suppress warning from jupyter_client's deprecated cleanup()
warnings.simplefilter(action='ignore', category=FutureWarning)

with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
executor.map(run_notebook_wrapper, [(input_file, opts, res) for i in range(8)])
# run once, to trigger creating the original context
run_notebook(input_file, opts, res)

with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
executor.map(run_notebook_wrapper, [(input_file, opts, res) for i in range(8)])

captured = capfd.readouterr()
assert captured.err == ""
Expand Down Expand Up @@ -530,8 +538,8 @@ def timeout_func(source):
with pytest.raises(TimeoutError):
run_notebook(filename, dict(timeout_func=timeout_func), res)

def test_kernel_death(self):
"""Check that an error is raised when the kernel is_alive is false"""
def test_kernel_death_after_timeout(self):
"""Check that an error is raised when the kernel is_alive is false after a cell timed out"""
filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
with io.open(filename, 'r') as f:
input_nb = nbformat.read(f, 4)
Expand All @@ -541,7 +549,7 @@ def test_kernel_death(self):
executor = NotebookClient(input_nb, timeout=1)

with pytest.raises(TimeoutError):
output_nb = executor.execute()
executor.execute()
km = executor.start_kernel_manager()

async def is_alive():
Expand All @@ -552,6 +560,19 @@ async def is_alive():
with pytest.raises(RuntimeError):
input_nb, output_nb = executor.execute()

def test_kernel_death_during_execution(self):
"""Check that an error is raised when the kernel is_alive is false during a cell
execution.
"""
filename = os.path.join(current_dir, 'files', 'Autokill.ipynb')
with io.open(filename, 'r') as f:
input_nb = nbformat.read(f, 4)

executor = NotebookClient(input_nb)

with pytest.raises(RuntimeError):
executor.execute()

def test_allow_errors(self):
"""
Check that conversion halts if ``allow_errors`` is False.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
traitlets>=4.2
jupyter_client>=6.1.0
jupyter_client>=6.1.5
nbformat>=5.0
async_generator
nest_asyncio