From 2db718aa5f5e64e90b10913ed4f6b35ff0635d1f Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 13 Mar 2019 16:25:39 -0700 Subject: [PATCH] Add support for AsyncMappingKernelManager Supports running against incompatible jupyter_client so long as the desired kernel_manager_class is not `AsyncMappingKernelManager`. --- notebook/notebookapp.py | 11 +- notebook/services/kernels/kernelmanager.py | 375 +++++++++++++----- .../kernels/tests/test_kernels_api.py | 28 ++ notebook/services/sessions/sessionmanager.py | 2 +- 4 files changed, 318 insertions(+), 98 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index bbbe90c3139..2193ad50a6c 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -37,7 +37,6 @@ except ImportError: #PY2 from base64 import encodestring as encodebytes - from jinja2 import Environment, FileSystemLoader from notebook.transutils import trans, _ @@ -78,7 +77,7 @@ from .base.handlers import Template404, RedirectWithParams from .log import log_request -from .services.kernels.kernelmanager import MappingKernelManager +from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager, MappingKernelManagerBase from .services.config import ConfigManager from .services.contents.manager import ContentsManager from .services.contents.filemanager import FileContentsManager @@ -1138,6 +1137,7 @@ def _update_mathjax_config(self, change): kernel_manager_class = Type( default_value=MappingKernelManager, + klass=MappingKernelManagerBase, config=True, help=_('The kernel manager class to use.') ) @@ -1349,6 +1349,13 @@ def init_configurables(self): connection_dir=self.runtime_dir, kernel_spec_manager=self.kernel_spec_manager, ) + # Ensure the appropriate jupyter_client is in place. + # TODO: remove once dependencies are updated. + if isinstance(self.kernel_manager, AsyncMappingKernelManager): + if not hasattr(self.kernel_manager, 'list_kernel_ids'): + raise RuntimeError("Using `AsyncMappingKernelManager` without an appropriate " + "jupyter_client installed! Upgrade jupyter_client and try again.") + self.contents_manager = self.contents_manager_class( parent=self, log=self.log, diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 5297bd4510a..c74bd7a26ca 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -28,13 +28,27 @@ from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL +# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate juptyer_client. +# This will be confirmed at runtime in notebookapp. +# TODO: remove once dependencies are updated. +try: + from jupyter_client.multikernelmanager import AsyncMultiKernelManager +except ImportError: + class AsyncMultiKernelManager(object): + pass -class MappingKernelManager(MultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling""" - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.IOLoopKernelManager" +class MappingKernelManagerBase(LoggingConfigurable): + """ + This class exists so that class-based traits relative to MappingKernelManager and AsyncMappingKernelManager + can be satisfied since AsyncMappingKernelManager doesn't derive from the former. It is only necessary until + we converge to using only async, but that requires subclasses to be updated. + + Since we have this class, we'll reduce duplication of code between the disjoint classes by using this + common superclass for configuration properties and static methods and local member variables. + + TODO - move contents back to appropriate mapping kernel manager once we converge to async only. + """ kernel_argv = List(Unicode()) @@ -66,7 +80,7 @@ def _update_root_dir(self, proposal): for users with poor network connections.""" ) - cull_interval_default = 300 # 5 minutes + cull_interval_default = 300 # 5 minutes cull_interval = Integer(cull_interval_default, config=True, help="""The interval (in seconds) on which to check for idle kernels exceeding the cull timeout value.""" ) @@ -107,34 +121,25 @@ def _update_root_dir(self, proposal): last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + allowed_message_types = List(trait=Unicode(), config=True, + help="""White list of allowed kernel message types. + When the list is empty, all message types are allowed. + """ + ) + # members used to hold composed instances buffering_manager = None kernel_culler = None activity_monitor = None def __init__(self, **kwargs): - super(MappingKernelManager, self).__init__(**kwargs) + super(MappingKernelManagerBase, self).__init__(**kwargs) self.last_kernel_activity = utcnow() self.buffering_manager = BufferingManager(parent=self) self.kernel_culler = KernelCuller(parent=self) self.activity_monitor = ActivityMonitor(parent=self) - allowed_message_types = List(trait=Unicode(), config=True, - help="""White list of allowed kernel message types. - When the list is empty, all message types are allowed. - """ - ) - - #------------------------------------------------------------------------- - # Methods for managing kernels and sessions - #------------------------------------------------------------------------- - - def _handle_kernel_died(self, kernel_id): - """notice that a kernel died""" - self.log.warning("Kernel %s died, removing from map.", kernel_id) - self.remove_kernel(kernel_id) - def cwd_for_path(self, path): """Turn API path into absolute OS path.""" os_path = to_os_path(path, self.root_dir) @@ -144,6 +149,100 @@ def cwd_for_path(self, path): os_path = os.path.dirname(os_path) return os_path + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ + self.buffering_manager.start_buffering(kernel_id, session_key, channels) + + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + """ + return self.buffering_manager.get_buffer(kernel_id, session_key) + + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self.buffering_manager.stop_buffering(kernel_id) + + def notify_connect(self, kernel_id): + """Notice a new connection to a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] += 1 + + def notify_disconnect(self, kernel_id): + """Notice a disconnection from a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] -= 1 + + # monitoring activity: + + def start_watching_activity(self, kernel_id): + """Start watching IOPub messages on a kernel for activity. Remove if no overrides + + - update last_activity on every message + - record execution_state from status messages + """ + self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + + def initialize_culler(self): + """Initial culler if not already. Remove if no overrides + """ + if self.kernel_culler is None: + self.kernel_culler = KernelCuller(parent=self) + + def cull_kernels(self): + # Defer to KernelCuller. Remove if no overrides + self.kernel_culler.cull_kernels() + + def cull_kernel_if_idle(self, kernel_id): + # Defer to KernelCuller. Remove if no overrides + self.kernel_culler.cull_kernel_if_idle(kernel_id) + + +class MappingKernelManager(MultiKernelManager, MappingKernelManagerBase): + """A KernelManager that handles notebook mapping and HTTP error handling""" + + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.IOLoopKernelManager" + + def __init__(self, **kwargs): + super(MappingKernelManager, self).__init__(**kwargs) + + # ------------------------------------------------------------------------- + # Methods for managing kernels and sessions + # ------------------------------------------------------------------------- + + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warning("Kernel %s died, removing from map.", kernel_id) + self.remove_kernel(kernel_id) + @gen.coroutine def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. @@ -190,46 +289,6 @@ def start_kernel(self, kernel_id=None, path=None, **kwargs): # py2-compat raise gen.Return(kernel_id) - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - self.buffering_manager.start_buffering(kernel_id, session_key, channels) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - return self.buffering_manager.get_buffer(kernel_id, session_key) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - """ - self.buffering_manager.stop_buffering(kernel_id) - def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) @@ -291,16 +350,6 @@ def on_restart_failed(): timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) raise gen.Return(future) - def notify_connect(self, kernel_id): - """Notice a new connection to a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] += 1 - - def notify_disconnect(self, kernel_id): - """Notice a disconnection from a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] -= 1 - def kernel_model(self, kernel_id): """Return a JSON-safe dict representing a kernel @@ -333,29 +382,164 @@ def _check_kernel_id(self, kernel_id): if kernel_id not in self: raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - # monitoring activity: - def start_watching_activity(self, kernel_id): - """Start watching IOPub messages on a kernel for activity. +class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBase): + """A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout""" - - update last_activity on every message - - record execution_state from status messages + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.AsyncIOLoopKernelManager" + + def __init__(self, **kwargs): + super(AsyncMappingKernelManager, self).__init__(**kwargs) + + # ------------------------------------------------------------------------- + # Methods for managing kernels and sessions + # ------------------------------------------------------------------------- + + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warning("Kernel %s died, removing from map.", kernel_id) + self.remove_kernel(kernel_id) + + @gen.coroutine + def start_kernel(self, kernel_id=None, path=None, **kwargs): + """Start a kernel for a session and return its kernel_id. + + Parameters + ---------- + kernel_id : uuid + The uuid to associate the new kernel with. If this + is not None, this kernel will be persistent whenever it is + requested. + path : API path + The API path (unicode, '/' delimited) for the cwd. + Will be transformed to an OS path relative to root_dir. + kernel_name : str + The name identifying which kernel spec to launch. This is ignored if + an existing kernel is returned, but it may be checked in the future. """ - self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + if kernel_id is None: + if path is not None: + kwargs['cwd'] = self.cwd_for_path(path) + kernel_id = yield super(AsyncMappingKernelManager, self).start_kernel(**kwargs) - def initialize_culler(self): - """Initial culler if not already. + self._kernel_connections[kernel_id] = 0 + self.start_watching_activity(kernel_id) + self.log.info("Kernel started (async): %s" % kernel_id) + self.log.debug("Kernel args: %r" % kwargs) + # register callback for failed auto-restart + self.add_restart_callback(kernel_id, + lambda: self._handle_kernel_died(kernel_id), + 'dead', + ) + + # Increase the metric of number of kernels running + # for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).inc() + + else: + self._check_kernel_id(kernel_id) + self.log.info("Using existing kernel: %s" % kernel_id) + + # py2-compat + raise gen.Return(kernel_id) + + @gen.coroutine + def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + if kernel._activity_stream: + kernel._activity_stream.close() + kernel._activity_stream = None + self.stop_buffering(kernel_id) + self._kernel_connections.pop(kernel_id, None) + self.last_kernel_activity = utcnow() + + # Decrease the metric of number of kernels + # running for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).dec() + + yield super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) + + @gen.coroutine + def restart_kernel(self, kernel_id, now=False): + """Restart a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + yield super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) + kernel = self.get_kernel(kernel_id) + # return a Future that will resolve when the kernel has successfully restarted + channel = kernel.connect_shell() + future = Future() + + def finish(): + """Common cleanup when restart finishes/fails for any reason.""" + if not channel.closed(): + channel.close() + loop.remove_timeout(timeout) + kernel.remove_restart_callback(on_restart_failed, 'dead') + + def on_reply(msg): + self.log.debug("Kernel info reply received: %s", kernel_id) + finish() + if not future.done(): + future.set_result(msg) + + def on_timeout(): + self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) + finish() + if not future.done(): + future.set_exception(gen.TimeoutError("Timeout waiting for restart")) + + def on_restart_failed(): + self.log.warning("Restarting kernel failed: %s", kernel_id) + finish() + if not future.done(): + future.set_exception(RuntimeError("Restart failed")) + + kernel.add_restart_callback(on_restart_failed, 'dead') + kernel.session.send(channel, "kernel_info_request") + channel.on_recv(on_reply) + loop = IOLoop.current() + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + raise gen.Return(future) + + def kernel_model(self, kernel_id): + """Return a JSON-safe dict representing a kernel + + For use in representing kernels in the JSON APIs. """ - if self.kernel_culler is None: - self.kernel_culler = KernelCuller(parent=self) + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] - def cull_kernels(self): - # Defer to KernelCuller - self.kernel_culler.cull_kernels() + model = { + "id":kernel_id, + "name": kernel.kernel_name, + "last_activity": isoformat(kernel.last_activity), + "execution_state": kernel.execution_state, + "connections": self._kernel_connections[kernel_id], + } + return model - def cull_kernel_if_idle(self, kernel_id): - # Defer to KernelCuller - self.kernel_culler.cull_kernel_if_idle(kernel_id) + def list_kernels(self): + """Returns a list of kernel models relative to the running kernels.""" + kernels = [] + kernel_ids = self.list_kernel_ids() + for kernel_id in kernel_ids: + model = self.kernel_model(kernel_id) + kernels.append(model) + return kernels + + # override _check_kernel_id to raise 404 instead of KeyError + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) class ActivityMonitor(LoggingConfigurable): @@ -363,9 +547,9 @@ class ActivityMonitor(LoggingConfigurable): def __init__(self, **kwargs): super(ActivityMonitor, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "ActivityMonitor requires an instance of MappingKernelManager!") + "ActivityMonitor requires an instance of MappingKernelManagerBase!") def start_watching_activity(self, kernel_id, kernel): """Start watching IOPub messages on a kernel for activity. @@ -409,9 +593,9 @@ def _default_kernel_buffers(self): def __init__(self, **kwargs): super(BufferingManager, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "BufferingManager requires an instance of MappingKernelManager!") + "BufferingManager requires an instance of MappingKernelManagerBase!") def _check_kernel_id(self, kernel_id): """Check a that a kernel_id exists and raise 404 if not.""" @@ -513,9 +697,9 @@ class KernelCuller(LoggingConfigurable): def __init__(self, **kwargs): super(KernelCuller, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "KernelCuller requires an instance of MappingKernelManager!") + "KernelCuller requires an instance of MappingKernelManagerBase!") self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging # Start idle culler if 'cull_idle_timeout' is greater than zero. @@ -559,6 +743,7 @@ def cull_kernels(self): self.log.exception("The following exception was encountered while checking the idle " "duration of kernel %s: %s", kernel_id, e) + @gen.coroutine def cull_kernel_if_idle(self, kernel_id): # Get the kernel model and use that to determine cullability... @@ -581,4 +766,4 @@ def cull_kernel_if_idle(self, kernel_id): idle_duration = int(dt_idle.total_seconds()) self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", model['execution_state'], model['name'], kernel_id, connections, idle_duration) - self.parent.shutdown_kernel(kernel_id) + yield maybe_future(self.parent.shutdown_kernel(kernel_id)) diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 83bfb0c3e07..0d4bc94aced 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -8,12 +8,19 @@ from tornado.httpclient import HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect +from unittest import SkipTest from jupyter_client.kernelspec import NATIVE_KERNEL_NAME from notebook.utils import url_path_join from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error +try: + from jupyter_client import AsyncMultiKernelManager + async_testing_enabled = True +except ImportError: + async_testing_enabled = False + class KernelAPI(object): """Wrapper for kernel REST API requests""" @@ -188,6 +195,27 @@ def test_connections(self): self.assertEqual(model['connections'], 0) +class AsyncKernelAPITest(KernelAPITest): + """Test the kernels web service API using the AsyncMappingKernelManager""" + + @classmethod + def get_argv(cls): + argv = super(AsyncKernelAPITest, cls).get_argv() + + # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. + if async_testing_enabled: + argv.extend(['--NotebookApp.kernel_manager_class=' + 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv + + def setUp(self): + if not async_testing_enabled: + raise SkipTest("AsyncKernelAPITest.{test_method} skipped due to down-level jupyter_client!". + format(test_method=self._testMethodName)) + super(AsyncKernelAPITest, self).setUp() + + class KernelFilterTest(NotebookTestBase): # A special install of NotebookTestBase where only `kernel_info_request` diff --git a/notebook/services/sessions/sessionmanager.py b/notebook/services/sessions/sessionmanager.py index 63e18448292..5ec35edd0d9 100644 --- a/notebook/services/sessions/sessionmanager.py +++ b/notebook/services/sessions/sessionmanager.py @@ -22,7 +22,7 @@ class SessionManager(LoggingConfigurable): - kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManager') + kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManagerBase') contents_manager = Instance('notebook.services.contents.manager.ContentsManager') # Session database initialized below