Skip to content

Commit

Permalink
Apply changes per review
Browse files Browse the repository at this point in the history
Add comments and rename self.super to self.pinned_superclass to
clarify intent.

Add run_sync() util method to clean up shutdown_all() invocation.
  • Loading branch information
kevin-bates committed Mar 27, 2020
1 parent 72d44c5 commit 99b0afd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 19 deletions.
8 changes: 2 additions & 6 deletions notebook/notebookapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
from notebook._sysinfo import get_sys_info

from ._tz import utcnow, utcfromtimestamp
from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url
from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url, run_sync

# Check if we can use async kernel management
try:
Expand Down Expand Up @@ -1801,11 +1801,7 @@ def cleanup_kernels(self):
n_kernels = len(self.kernel_manager.list_kernel_ids())
kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels)
self.log.info(kernel_msg % n_kernels)
# If we're using async kernel management, we need to invoke the async method via the event loop.
if isinstance(self.kernel_manager, AsyncMappingKernelManager):
asyncio.get_event_loop().run_until_complete(self.kernel_manager.shutdown_all())
else:
self.kernel_manager.shutdown_all()
run_sync(self.kernel_manager.shutdown_all())

def notebook_info(self, kernel_count=True):
"Return the current working directory and the server url information"
Expand Down
26 changes: 13 additions & 13 deletions notebook/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ def _default_kernel_buffers(self):
last_kernel_activity = Instance(datetime,
help="The last activity on any kernel, including shutting down a kernel")

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.last_kernel_activity = utcnow()

allowed_message_types = List(trait=Unicode(), config=True,
help="""White list of allowed kernel message types.
When the list is empty, all message types are allowed.
Expand All @@ -137,8 +133,11 @@ def __init__(self, **kwargs):
#-------------------------------------------------------------------------

def __init__(self, **kwargs):
self.super = MultiKernelManager
self.super.__init__(self, **kwargs)
# Pin the superclass to better control the MRO. This is needed by
# AsyncMappingKernelManager so that it can give priority to methods
# on AsyncMultiKernelManager over this superclass.
self.pinned_superclass = MultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()

def _handle_kernel_died(self, kernel_id):
Expand Down Expand Up @@ -173,7 +172,7 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
if kernel_id is None:
if path is not None:
kwargs['cwd'] = self.cwd_for_path(path)
kernel_id = await maybe_future(self.super.start_kernel(self, **kwargs))
kernel_id = await maybe_future(self.pinned_superclass.start_kernel(self, **kwargs))
self._kernel_connections[kernel_id] = 0
self.start_watching_activity(kernel_id)
self.log.info("Kernel started: %s" % kernel_id)
Expand Down Expand Up @@ -302,12 +301,12 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False):
type=self._kernels[kernel_id].kernel_name
).dec()

return self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart)
return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)

async def restart_kernel(self, kernel_id, now=False):
"""Restart a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
await maybe_future(self.super.restart_kernel(self, kernel_id, now=now))
await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now))
kernel = self.get_kernel(kernel_id)
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel.connect_shell()
Expand Down Expand Up @@ -374,7 +373,7 @@ def kernel_model(self, kernel_id):
def list_kernels(self):
"""Returns a list of kernel_id's of kernels running."""
kernels = []
kernel_ids = self.super.list_kernel_ids(self)
kernel_ids = self.pinned_superclass.list_kernel_ids(self)
for kernel_id in kernel_ids:
model = self.kernel_model(kernel_id)
kernels.append(model)
Expand Down Expand Up @@ -485,8 +484,9 @@ def _default_kernel_manager_class(self):
return "jupyter_client.ioloop.AsyncIOLoopKernelManager"

def __init__(self, **kwargs):
self.super = AsyncMultiKernelManager
self.super.__init__(self, **kwargs)
# Pin the superclass to better control the MRO.
self.pinned_superclass = AsyncMultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
Expand All @@ -505,4 +505,4 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False):
type=self._kernels[kernel_id].kernel_name
).dec()

return await self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart)
return await self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)
40 changes: 40 additions & 0 deletions notebook/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,43 @@ def maybe_future(obj):
f.set_result(obj)
return f


def run_sync(maybe_async):
"""If async, runs maybe_async and blocks until it has executed,
possibly creating an event loop.
If not async, just returns maybe_async as it is the result of something
that has already executed.
Parameters
----------
maybe_async : async or non-async object
The object to be executed, if it is async.
Returns
-------
result :
Whatever the async object returns, or the object itself.
"""
if not inspect.isawaitable(maybe_async):
# that was not something async, just return it
return maybe_async
# it is async, we need to run it in an event loop

def wrapped():
create_new_event_loop = False
try:
loop = asyncio.get_event_loop()
except RuntimeError:
create_new_event_loop = True
else:
if loop.is_closed():
create_new_event_loop = True
if create_new_event_loop:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(maybe_async)
except RuntimeError as e:
if str(e) == 'This event loop is already running':
# just return a Future, hoping that it will be awaited
result = asyncio.ensure_future(maybe_async)
return result
return wrapped()

0 comments on commit 99b0afd

Please sign in to comment.