Skip to content

Commit

Permalink
Merge pull request #732 from Zsailer/pending-state
Browse files Browse the repository at this point in the history
Further improvements to pending kernels managment
  • Loading branch information
Zsailer authored Jan 14, 2022
2 parents 659330f + a382498 commit 4428715
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 54 deletions.
1 change: 0 additions & 1 deletion .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ jobs:
strategy:
matrix:
python-version: ["3.9"]

steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ with Jupyter kernels.
kernels
wrapperkernels
provisioning
pending-kernels

.. toctree::
:maxdepth: 2
Expand Down
36 changes: 36 additions & 0 deletions docs/pending-kernels.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Pending Kernels
===============

*Added in 7.1.0*

In scenarios where an kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions.

This intermediate state is called a **"pending kernel"**.

How they work
-------------

When ``.start_kernel()`` or ``.shutdown_kernel()`` is called, a ``Future`` is created under the ``KernelManager.ready`` property. This property can be awaited anytime to ensure that the kernel moves out of its pending state, e.g.:

.. code-block:: python
# await a Kernel Manager's `.ready` property to
# block further action until the kernel is out
# of its pending state.
await kernel_manager.ready
Once the kernel is finished pending, ``.ready.done()`` will be ``True`` and either 1) ``.ready.result()`` will return ``None`` or 2) ``.ready.exception()`` will return a raised exception

Using pending kernels
---------------------

The most common way to interact with pending kernels is through the ``MultiKernelManager``—the object that manages a collection of kernels—by setting its ``use_pending_kernels`` trait to ``True``. Pending kernels are "opt-in"; they are not used by default in the ``MultiKernelManager``.

When ``use_pending_kernels`` is ``True``, the following changes are made to the ``MultiKernelManager``:

1. ``start_kernel`` and ``stop_kernel`` return immediately while running the pending task in a background thread.
2. The following methods raise a ``RuntimeError`` if a kernel is pending:
* ``restart_kernel``
* ``interrupt_kernel``
* ``shutdown_kernel``
3. ``shutdown_all`` will wait for all pending kernels to become ready before attempting to shut them down.
63 changes: 38 additions & 25 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import functools
import os
import re
import signal
Expand Down Expand Up @@ -51,6 +52,35 @@ class _ShutdownStatus(Enum):
SigkillRequest = "SigkillRequest"


def in_pending_state(method):
"""Sets the kernel to a pending state by
creating a fresh Future for the KernelManager's `ready`
attribute. Once the method is finished, set the Future's results.
"""

@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e

return wrapper


class KernelManager(ConnectionFileMixin):
"""Manages a single kernel in a subprocess on this host.
Expand All @@ -60,6 +90,7 @@ class KernelManager(ConnectionFileMixin):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
# Create a place holder future.
try:
self._ready = Future()
except RuntimeError:
Expand Down Expand Up @@ -329,6 +360,7 @@ async def _async_post_start_kernel(self, **kw) -> None:

post_start_kernel = run_sync(_async_post_start_kernel)

@in_pending_state
async def _async_start_kernel(self, **kw):
"""Starts a kernel on this host in a separate process.
Expand All @@ -341,25 +373,12 @@ async def _async_start_kernel(self, **kw):
keyword arguments that are passed down to build the kernel_cmd
and launching the kernel (e.g. Popen kwargs).
"""
done = self._ready.done()

try:
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
if not done:
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

except Exception as e:
if not done:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e
# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))

start_kernel = run_sync(_async_start_kernel)

Expand Down Expand Up @@ -434,6 +453,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

cleanup_resources = run_sync(_async_cleanup_resources)

@in_pending_state
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False):
"""Attempts to stop the kernel process cleanly.
Expand All @@ -452,10 +472,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
# Shutdown is a no-op for a kernel that had a failed startup
if self._ready.exception():
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -503,9 +519,6 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.")

if not self._ready.done():
raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.")

# Stop currently running kernel.
await ensure_async(self.shutdown_kernel(now=now, restart=True))

Expand Down
102 changes: 88 additions & 14 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:

context = Instance("zmq.Context")

_starting_kernels = Dict()
_pending_kernels = Dict()

@property
def _starting_kernels(self):
"""A shim for backwards compatibility."""
return self._pending_kernels

@default("context")
def _context_default(self) -> zmq.Context:
Expand Down Expand Up @@ -165,7 +170,22 @@ async def _add_kernel_when_ready(
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._starting_kernels.pop(kernel_id, None)
self._pending_kernels.pop(kernel_id, None)

async def _remove_kernel_when_ready(
self, kernel_id: str, kernel_awaitable: t.Awaitable
) -> None:
try:
await kernel_awaitable
self.remove_kernel(kernel_id)
finally:
self._pending_kernels.pop(kernel_id, None)

def _using_pending_kernels(self):
"""Returns a boolean; a clearer method for determining if
this multikernelmanager is using pending kernels or not
"""
return getattr(self, 'use_pending_kernels', False)

async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str:
"""Start a new kernel.
Expand All @@ -186,17 +206,38 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut

if getattr(self, 'use_pending_kernels', False):
self._pending_kernels[kernel_id] = fut
# Handling a Pending Kernel
if self._using_pending_kernels():
# If using pending kernels, do not block
# on the kernel start.
self._kernels[kernel_id] = km
else:
await fut
# raise an exception if one occurred during kernel startup.
if km.ready.exception():
raise km.ready.exception() # type: ignore

return kernel_id

start_kernel = run_sync(_async_start_kernel)

async def _shutdown_kernel_when_ready(
self,
kernel_id: str,
now: t.Optional[bool] = False,
restart: t.Optional[bool] = False,
) -> None:
"""Wait for a pending kernel to be ready
before shutting the kernel down.
"""
# Only do this if using pending kernels
if self._using_pending_kernels():
kernel = self._kernels[kernel_id]
await kernel.ready
# Once out of a pending state, we can call shutdown.
await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart))

async def _async_shutdown_kernel(
self,
kernel_id: str,
Expand All @@ -215,15 +256,31 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)
if kernel_id in self._starting_kernels:
# If we're using pending kernels, block shutdown when a kernel is pending.
if self._using_pending_kernels() and kernel_id in self._pending_kernels:
raise RuntimeError("Kernel is in a pending state. Cannot shutdown.")
# If the kernel is still starting, wait for it to be ready.
elif kernel_id in self._starting_kernels:
kernel = self._starting_kernels[kernel_id]
try:
await self._starting_kernels[kernel_id]
await kernel
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)
await ensure_async(km.shutdown_kernel(now, restart))
self.remove_kernel(kernel_id)
# If a pending kernel raised an exception, remove it.
if km.ready.exception():
self.remove_kernel(kernel_id)
return
stopper = ensure_async(km.shutdown_kernel(now, restart))
fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper))
self._pending_kernels[kernel_id] = fut
# Await the kernel if not using pending kernels.
if not self._using_pending_kernels():
await fut
# raise an exception if one occurred during kernel shutdown.
if km.ready.exception():
raise km.ready.exception() # type: ignore

shutdown_kernel = run_sync(_async_shutdown_kernel)

Expand Down Expand Up @@ -258,13 +315,17 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
kids += list(self._starting_kernels)
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
kids += list(self._pending_kernels)
futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)
# When using "shutdown all", all pending kernels
# should be awaited before exiting this method.
if self._using_pending_kernels():
for km in self._kernels.values():
await km.ready

shutdown_all = run_sync(_async_shutdown_all)

@kernel_method
def interrupt_kernel(self, kernel_id: str) -> None:
"""Interrupt (SIGINT) the kernel by its uuid.
Expand All @@ -273,7 +334,12 @@ def interrupt_kernel(self, kernel_id: str) -> None:
kernel_id : uuid
The id of the kernel to interrupt.
"""
kernel = self.get_kernel(kernel_id)
if not kernel.ready.done():
raise RuntimeError("Kernel is in a pending state. Cannot interrupt.")
out = kernel.interrupt_kernel()
self.log.info("Kernel interrupted: %s" % kernel_id)
return out

@kernel_method
def signal_kernel(self, kernel_id: str, signum: int) -> None:
Expand All @@ -291,8 +357,7 @@ def signal_kernel(self, kernel_id: str, signum: int) -> None:
"""
self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))

@kernel_method
def restart_kernel(self, kernel_id: str, now: bool = False) -> None:
async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None:
"""Restart a kernel by its uuid, keeping the same ports.
Parameters
Expand All @@ -307,7 +372,15 @@ def restart_kernel(self, kernel_id: str, now: bool = False) -> None:
In all cases the kernel is restarted, the only difference is whether
it is given a chance to perform a clean shutdown or not.
"""
kernel = self.get_kernel(kernel_id)
if self._using_pending_kernels():
if not kernel.ready.done():
raise RuntimeError("Kernel is in a pending state. Cannot restart.")
out = await ensure_async(kernel.restart_kernel(now=now))
self.log.info("Kernel restarted: %s" % kernel_id)
return out

restart_kernel = run_sync(_async_restart_kernel)

@kernel_method
def is_alive(self, kernel_id: str) -> bool:
Expand Down Expand Up @@ -475,5 +548,6 @@ class AsyncMultiKernelManager(MultiKernelManager):
).tag(config=True)

start_kernel = MultiKernelManager._async_start_kernel
restart_kernel = MultiKernelManager._async_restart_kernel
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
shutdown_all = MultiKernelManager._async_shutdown_all
Loading

0 comments on commit 4428715

Please sign in to comment.