From c927a3aa9190a1c010a3f7cde40432c879f0953a Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 25 Mar 2020 10:01:58 -0700 Subject: [PATCH] Switch class hierarchy to not use mixin base This commit uses the approach used in jupyter_server #191 first proposed by David Brochart. This reduces code duplication and alleviates redundancy relative to configurable options. Also, the startup message now includes the version information. Co-authored-by: David Brochart --- notebook/notebookapp.py | 11 +- notebook/services/kernels/kernelmanager.py | 683 ++++++------------ .../kernels/tests/test_kernels_api.py | 1 + notebook/services/sessions/sessionmanager.py | 2 +- .../sessions/tests/test_sessions_api.py | 3 +- 5 files changed, 242 insertions(+), 458 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 0c42449269e..9b7319e713a 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -76,7 +76,7 @@ from .base.handlers import Template404, RedirectWithParams from .log import log_request -from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager, MappingKernelManagerBase +from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager from .services.config import ConfigManager from .services.contents.manager import ContentsManager from .services.contents.filemanager import FileContentsManager @@ -109,7 +109,7 @@ from ._tz import utcnow, utcfromtimestamp from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url -# Check if we can user async kernel management +# Check if we can use async kernel management try: from jupyter_client import AsyncMultiKernelManager async_kernel_mgmt_available = True @@ -584,7 +584,7 @@ class NotebookApp(JupyterApp): flags = flags classes = [ - KernelManager, Session, MappingKernelManager, AsyncMappingKernelManager, KernelSpecManager, + KernelManager, Session, MappingKernelManager, KernelSpecManager, ContentsManager, FileContentsManager, NotebookNotary, GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient, ] @@ -1185,7 +1185,7 @@ def _update_mathjax_config(self, change): kernel_manager_class = Type( default_value=MappingKernelManager, - klass=MappingKernelManagerBase, + klass=MappingKernelManager, config=True, help=_('The kernel manager class to use.') ) @@ -1816,7 +1816,8 @@ def notebook_info(self, kernel_count=True): info += kernel_msg % n_kernels info += "\n" # Format the info so that the URL fits on a single line in 80 char display - info += _("The Jupyter Notebook is running at:\n%s") % self.display_url + info += _("Jupyter Notebook {version} is running at:\n{url}". + format(version=NotebookApp.version, url=self.display_url)) if self.gateway_config.gateway_enabled: info += _("\nKernels will be managed by the Gateway server running at:\n%s") % self.gateway_config.url return info diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 346e27712c3..6cb5d20f9d1 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -1,5 +1,4 @@ """A MultiKernelManager for use in the notebook webserver - - raises HTTPErrors - creates REST API models """ @@ -12,15 +11,15 @@ from functools import partial import os -from tornado import gen, web +from tornado import web from tornado.concurrent import Future from tornado.ioloop import IOLoop, PeriodicCallback from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer, - Float, Instance, default, validate) -from traitlets.config.configurable import LoggingConfigurable + Float, Instance, default, validate +) from notebook.utils import maybe_future, to_os_path, exists from notebook._tz import utcnow, isoformat @@ -28,9 +27,9 @@ from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL -# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate juptyer_client. -# This will be confirmed at runtime in notebookapp. -# TODO: remove once dependencies are updated. +# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jupyter_client. +# This will be confirmed at runtime in notebookapp. The following block can be removed once the jupyter_client's +# floor has been updated. try: from jupyter_client.multikernelmanager import AsyncMultiKernelManager except ImportError: @@ -39,15 +38,12 @@ class AsyncMultiKernelManager(object): pass -class MappingKernelManagerBase(LoggingConfigurable): - """ - This class exists so that class-based traits relative to MappingKernelManager and AsyncMappingKernelManager - can be satisfied since AsyncMappingKernelManager doesn't derive from the former. It is only necessary until - we converge to using only async, but that requires subclasses to be updated. +class MappingKernelManager(MultiKernelManager): + """A KernelManager that handles notebook mapping and HTTP error handling""" - Since we have this class, we'll reduce duplication of code between the disjoint classes by using this - common superclass for configuration properties and static methods and local member variables. - """ + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.IOLoopKernelManager" kernel_argv = List(Unicode()) @@ -55,6 +51,10 @@ class MappingKernelManagerBase(LoggingConfigurable): _kernel_connections = Dict() + _culler_callback = None + + _initialized_culler = False + @default('root_dir') def _default_root_dir(self): try: @@ -96,10 +96,8 @@ def _update_root_dir(self, proposal): buffer_offline_messages = Bool(True, config=True, help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory. - When True (default), messages are buffered and replayed on reconnect, avoiding lost messages due to interrupted connectivity. - Disable if long-running kernels will produce too much output while no frontends are connected. """ @@ -107,7 +105,6 @@ def _update_root_dir(self, proposal): kernel_info_timeout = Float(60, config=True, help="""Timeout for giving up on a kernel (in seconds). - On starting and restarting kernels, we check whether the kernel is running and responsive by sending kernel_info_requests. This sets the timeout in seconds for how long the kernel can take @@ -117,65 +114,38 @@ def _update_root_dir(self, proposal): """ ) + _kernel_buffers = Any() + @default('_kernel_buffers') + def _default_kernel_buffers(self): + return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) + last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.last_kernel_activity = utcnow() + allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. """ ) - # members used to hold composed instances - buffering_manager = None - kernel_culler = None - activity_monitor = None + #------------------------------------------------------------------------- + # Methods for managing kernels and sessions + #------------------------------------------------------------------------- def __init__(self, **kwargs): - super(MappingKernelManagerBase, self).__init__(**kwargs) + self.super = MultiKernelManager + self.super.__init__(self, **kwargs) self.last_kernel_activity = utcnow() - self.buffering_manager = BufferingManager(parent=self) - self.kernel_culler = KernelCuller(parent=self) - self.activity_monitor = ActivityMonitor(parent=self) - def _handle_kernel_died(self, kernel_id): """notice that a kernel died""" self.log.warning("Kernel %s died, removing from map.", kernel_id) self.remove_kernel(kernel_id) - def kernel_model(self, kernel_id): - """Return a JSON-safe dict representing a kernel - - For use in representing kernels in the JSON APIs. - """ - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - - model = { - "id":kernel_id, - "name": kernel.kernel_name, - "last_activity": isoformat(kernel.last_activity), - "execution_state": kernel.execution_state, - "connections": self._kernel_connections[kernel_id], - } - return model - - def list_kernels(self): - """Returns a list of kernel models relative to the running kernels.""" - kernels = [] - kernel_ids = self.list_kernel_ids() - for kernel_id in kernel_ids: - model = self.kernel_model(kernel_id) - kernels.append(model) - return kernels - - # override _check_kernel_id to raise 404 instead of KeyError - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - def cwd_for_path(self, path): """Turn API path into absolute OS path.""" os_path = to_os_path(path, self.root_dir) @@ -185,98 +155,8 @@ def cwd_for_path(self, path): os_path = os.path.dirname(os_path) return os_path - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - self.buffering_manager.start_buffering(kernel_id, session_key, channels) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - return self.buffering_manager.get_buffer(kernel_id, session_key) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - """ - self.buffering_manager.stop_buffering(kernel_id) - - def notify_connect(self, kernel_id): - """Notice a new connection to a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] += 1 - - def notify_disconnect(self, kernel_id): - """Notice a disconnection from a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] -= 1 - - # monitoring activity: - - def start_watching_activity(self, kernel_id): - """Start watching IOPub messages on a kernel for activity. Remove if no overrides - - - update last_activity on every message - - record execution_state from status messages - """ - self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) - - def initialize_culler(self): - """Initial culler if not already. Remove if no overrides - """ - if self.kernel_culler is None: - self.kernel_culler = KernelCuller(parent=self) - - def cull_kernels(self): - # Defer to KernelCuller. Remove if no overrides - self.kernel_culler.cull_kernels() - - def cull_kernel_if_idle(self, kernel_id): - # Defer to KernelCuller. Remove if no overrides - self.kernel_culler.cull_kernel_if_idle(kernel_id) - - -class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling""" - - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.IOLoopKernelManager" - - def __init__(self, **kwargs): - super(MappingKernelManager, self).__init__(**kwargs) - - # ------------------------------------------------------------------------- - # Methods for managing kernels and sessions - # ------------------------------------------------------------------------- - async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. - Parameters ---------- kernel_id : uuid @@ -293,9 +173,7 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await maybe_future( - super(MappingKernelManager, self).start_kernel(**kwargs) - ) + kernel_id = await maybe_future(self.super.start_kernel(self, **kwargs)) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) self.log.info("Kernel started: %s" % kernel_id) @@ -316,127 +194,99 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs): self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - return kernel_id + # Initialize culling if not already + if not self._initialized_culler: + self.initialize_culler() - def shutdown_kernel(self, kernel_id, now=False): - """Shutdown a kernel by kernel_id""" - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - if kernel._activity_stream: - kernel._activity_stream.close() - kernel._activity_stream = None - self.stop_buffering(kernel_id) - self._kernel_connections.pop(kernel_id, None) + return kernel_id - # Decrease the metric of number of kernels - # running for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels( - type=self._kernels[kernel_id].kernel_name - ).dec() + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + Parameters + ---------- + kernel_id : str + The id of the kernel to start buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ - return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) + if not self.buffer_offline_messages: + for channel, stream in channels.items(): + stream.close() + return - async def restart_kernel(self, kernel_id): - """Restart a kernel by kernel_id""" + self.log.info("Starting buffering for %s", session_key) self._check_kernel_id(kernel_id) - await maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id)) - kernel = self.get_kernel(kernel_id) - # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() - future = Future() - - def finish(): - """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() - loop.remove_timeout(timeout) - kernel.remove_restart_callback(on_restart_failed, 'dead') - - def on_reply(msg): - self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) - - def on_timeout(): - self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(gen.TimeoutError("Timeout waiting for restart")) - - def on_restart_failed(): - self.log.warning("Restarting kernel failed: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(RuntimeError("Restart failed")) - - kernel.add_restart_callback(on_restart_failed, 'dead') - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) - loop = IOLoop.current() - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) - # wait for restart to complete - await future - - -class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout""" - - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.AsyncIOLoopKernelManager" - - def __init__(self, **kwargs): - super(AsyncMappingKernelManager, self).__init__(**kwargs) + # clear previous buffering state + self.stop_buffering(kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + # record the session key because only one session can buffer + buffer_info['session_key'] = session_key + # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple + buffer_info['buffer'] = [] + buffer_info['channels'] = channels - # ------------------------------------------------------------------------- - # Methods for managing kernels and sessions - # ------------------------------------------------------------------------- + # forward any future messages to the internal buffer + def buffer_msg(channel, msg_parts): + self.log.debug("Buffering msg on %s:%s", kernel_id, channel) + buffer_info['buffer'].append((channel, msg_parts)) - async def start_kernel(self, kernel_id=None, path=None, **kwargs): - """Start a kernel for a session and return its kernel_id. + for channel, stream in channels.items(): + stream.on_recv(partial(buffer_msg, channel)) + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel Parameters ---------- - kernel_id : uuid - The uuid to associate the new kernel with. If this - is not None, this kernel will be persistent whenever it is - requested. - path : API path - The API path (unicode, '/' delimited) for the cwd. - Will be transformed to an OS path relative to root_dir. - kernel_name : str - The name identifying which kernel spec to launch. This is ignored if - an existing kernel is returned, but it may be checked in the future. + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. """ - if kernel_id is None: - if path is not None: - kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await super(AsyncMappingKernelManager, self).start_kernel(**kwargs) + self.log.debug("Getting buffer for %s", kernel_id) + if kernel_id not in self._kernel_buffers: + return - self._kernel_connections[kernel_id] = 0 - self.start_watching_activity(kernel_id) - self.log.info("Kernel started (async): %s" % kernel_id) - self.log.debug("Kernel args: %r" % kwargs) - # register callback for failed auto-restart - self.add_restart_callback(kernel_id, - lambda: self._handle_kernel_died(kernel_id), - 'dead', - ) + buffer_info = self._kernel_buffers[kernel_id] + if buffer_info['session_key'] == session_key: + # remove buffer + self._kernel_buffers.pop(kernel_id) + # only return buffer_info if it's a match + return buffer_info + else: + self.stop_buffering(kernel_id) - # Increase the metric of number of kernels running - # for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels( - type=self._kernels[kernel_id].kernel_name - ).inc() + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self.log.debug("Clearing buffer for %s", kernel_id) + self._check_kernel_id(kernel_id) - else: - self._check_kernel_id(kernel_id) - self.log.info("Using existing kernel: %s" % kernel_id) + if kernel_id not in self._kernel_buffers: + return + buffer_info = self._kernel_buffers.pop(kernel_id) + # close buffering streams + for stream in buffer_info['channels'].values(): + if not stream.closed(): + stream.on_recv(None) + stream.close() - return kernel_id + msg_buffer = buffer_info['buffer'] + if msg_buffer: + self.log.info("Discarding %s buffered messages for %s", + len(msg_buffer), buffer_info['session_key']) - async def shutdown_kernel(self, kernel_id, now=False, restart=False): + def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) kernel = self._kernels[kernel_id] @@ -445,7 +295,6 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False): kernel._activity_stream = None self.stop_buffering(kernel_id) self._kernel_connections.pop(kernel_id, None) - self.last_kernel_activity = utcnow() # Decrease the metric of number of kernels # running for the relevant kernel type by 1 @@ -453,12 +302,12 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False): type=self._kernels[kernel_id].kernel_name ).dec() - await super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) + return self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - await super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) + await maybe_future(self.super.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -481,7 +330,7 @@ def on_timeout(): self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) finish() if not future.done(): - future.set_exception(gen.TimeoutError("Timeout waiting for restart")) + future.set_exception(TimeoutError("Timeout waiting for restart")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) @@ -496,22 +345,55 @@ def on_restart_failed(): timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future + def notify_connect(self, kernel_id): + """Notice a new connection to a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] += 1 -class ActivityMonitor(LoggingConfigurable): - """Establishes activity recorder for each active kernel""" + def notify_disconnect(self, kernel_id): + """Notice a disconnection from a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] -= 1 - def __init__(self, **kwargs): - super(ActivityMonitor, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "ActivityMonitor requires an instance of MappingKernelManagerBase!") + def kernel_model(self, kernel_id): + """Return a JSON-safe dict representing a kernel + For use in representing kernels in the JSON APIs. + """ + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] - def start_watching_activity(self, kernel_id, kernel): - """Start watching IOPub messages on a kernel for activity. + model = { + "id": kernel_id, + "name": kernel.kernel_name, + "last_activity": isoformat(kernel.last_activity), + "execution_state": kernel.execution_state, + "connections": self._kernel_connections[kernel_id], + } + return model + + def list_kernels(self): + """Returns a list of kernel_id's of kernels running.""" + kernels = [] + kernel_ids = self.super.list_kernel_ids(self) + for kernel_id in kernel_ids: + model = self.kernel_model(kernel_id) + kernels.append(model) + return kernels + + # override _check_kernel_id to raise 404 instead of KeyError + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) + # monitoring activity: + + def start_watching_activity(self, kernel_id): + """Start watching IOPub messages on a kernel for activity. - update last_activity on every message - record execution_state from status messages """ + kernel = self._kernels[kernel_id] # add busy/activity markers: kernel.execution_state = 'starting' kernel.last_activity = utcnow() @@ -523,7 +405,7 @@ def start_watching_activity(self, kernel_id, kernel): def record_activity(msg_list): """Record an IOPub message arriving from a kernel""" - self.parent.last_kernel_activity = kernel.last_activity = utcnow() + self.last_kernel_activity = kernel.last_activity = utcnow() idents, fed_msg_list = session.feed_identities(msg_list) msg = session.deserialize(fed_msg_list) @@ -537,191 +419,90 @@ def record_activity(msg_list): kernel._activity_stream.on_recv(record_activity) - -class BufferingManager(LoggingConfigurable): - """Manages buffering across the active kernels""" - _kernel_buffers = Any() - - @default('_kernel_buffers') - def _default_kernel_buffers(self): - return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) - - def __init__(self, **kwargs): - super(BufferingManager, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "BufferingManager requires an instance of MappingKernelManagerBase!") - - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self.parent: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - - if not self.parent.buffer_offline_messages: - for channel, stream in channels.items(): - stream.close() - return - - self._check_kernel_id(kernel_id) - self.log.info("Starting buffering for %s", session_key) - # clear previous buffering state - self.parent.stop_buffering(kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - # record the session key because only one session can buffer - buffer_info['session_key'] = session_key - # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple - buffer_info['buffer'] = [] - buffer_info['channels'] = channels - - # forward any future messages to the internal buffer - def buffer_msg(channel, msg_parts): - self.log.debug("Buffering msg on %s:%s", kernel_id, channel) - buffer_info['buffer'].append((channel, msg_parts)) - - for channel, stream in channels.items(): - stream.on_recv(partial(buffer_msg, channel)) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - if kernel_id not in self._kernel_buffers: - return - - self.log.debug("Getting buffer for %s", kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - if buffer_info['session_key'] == session_key: - # remove buffer - self._kernel_buffers.pop(kernel_id) - # only return buffer_info if it's a match - return buffer_info - else: - self.parent.stop_buffering(kernel_id) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. + def initialize_culler(self): + """Start idle culler if 'cull_idle_timeout' is greater than zero. + Regardless of that value, set flag that we've been here. """ - self._check_kernel_id(kernel_id) - - if kernel_id not in self._kernel_buffers: - return - self.log.debug("Clearing buffer for %s", kernel_id) - buffer_info = self._kernel_buffers.pop(kernel_id) - # close buffering streams - for stream in buffer_info['channels'].values(): - if not stream.closed(): - stream.on_recv(None) - stream.close() - - msg_buffer = buffer_info['buffer'] - if msg_buffer: - self.log.info("Discarding %s buffered messages for %s", - len(msg_buffer), buffer_info['session_key']) + if not self._initialized_culler and self.cull_idle_timeout > 0: + if self._culler_callback is None: + loop = IOLoop.current() + if self.cull_interval <= 0: # handle case where user set invalid value + self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", + self.cull_interval, self.cull_interval_default) + self.cull_interval = self.cull_interval_default + self._culler_callback = PeriodicCallback( + self.cull_kernels, 1000*self.cull_interval) + self.log.info("Culling kernels with idle durations > %s seconds at %s second intervals ...", + self.cull_idle_timeout, self.cull_interval) + if self.cull_busy: + self.log.info("Culling kernels even if busy") + if self.cull_connected: + self.log.info("Culling kernels even with connected clients") + self._culler_callback.start() + + self._initialized_culler = True + async def cull_kernels(self): + self.log.debug("Polling every %s seconds for kernels idle > %s seconds...", + self.cull_interval, self.cull_idle_timeout) + """Create a separate list of kernels to avoid conflicting updates while iterating""" + for kernel_id in list(self._kernels): + try: + await self.cull_kernel_if_idle(kernel_id) + except Exception as e: + self.log.exception("The following exception was encountered while checking the " + "idle duration of kernel {}: {}".format(kernel_id, e)) -class KernelCuller(LoggingConfigurable): - """Handles culling responsibilities for active kernels""" + async def cull_kernel_if_idle(self, kernel_id): + try: + kernel = self._kernels[kernel_id] + except KeyError: + return # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. + + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, kernel.kernel_name, kernel.last_activity) + if kernel.last_activity is not None: + dt_now = utcnow() + dt_idle = dt_now - kernel.last_activity + # Compute idle properties + is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout) + is_idle_execute = self.cull_busy or (kernel.execution_state != 'busy') + connections = self._kernel_connections.get(kernel_id, 0) + is_idle_connected = self.cull_connected or not connections + # Cull the kernel if all three criteria are met + if (is_idle_time and is_idle_execute and is_idle_connected): + idle_duration = int(dt_idle.total_seconds()) + self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", + kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration) + await maybe_future(self.shutdown_kernel(kernel_id)) + + +# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, overriding +# only what is different. +class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.AsyncIOLoopKernelManager" def __init__(self, **kwargs): - super(KernelCuller, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "KernelCuller requires an instance of MappingKernelManagerBase!") - self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging - - # Start idle culler if 'cull_idle_timeout' is greater than zero. - # Regardless of that value, set flag that we've been here. - if self.parent.cull_idle_timeout > 0: - if self.parent.cull_interval <= 0: # handle case where user set invalid value - self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", - self.parent.cull_interval, self.parent.cull_interval_default) - self.parent.cull_interval = self.parent.cull_interval_default - self._culler_callback = PeriodicCallback( - self.parent.cull_kernels, 1000*self.parent.cull_interval) - self._culler_callback.start() - self._log_info() - - def _log_info(self): - """Builds a single informational message relative to the culling configuration (logged at startup).""" - log_msg = list() - log_msg.append("Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals". - format(cull_state=self.cull_state, timeout=self.parent.cull_idle_timeout, - interval=self.parent.cull_interval)) - if self.parent.cull_busy or self.parent.cull_connected: - log_msg.append(" - including") - if self.parent.cull_busy: - log_msg.append(" busy") - if self.parent.cull_connected: - log_msg.append(" and") - if self.parent.cull_connected: - log_msg.append(" connected") - log_msg.append(" kernels") - log_msg.append(".") - self.log.info(''.join(log_msg)) + self.super = AsyncMultiKernelManager + self.super.__init__(self, **kwargs) + self.last_kernel_activity = utcnow() - async def cull_kernels(self): - self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", - self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) - # Get a separate list of kernels to avoid conflicting updates while iterating - for kernel_id in self.parent.list_kernel_ids(): - await self.parent.cull_kernel_if_idle(kernel_id) + async def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + if kernel._activity_stream: + kernel._activity_stream.close() + kernel._activity_stream = None + self.stop_buffering(kernel_id) + self._kernel_connections.pop(kernel_id, None) - async def cull_kernel_if_idle(self, kernel_id): + # Decrease the metric of number of kernels + # running for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).dec() - # Get the kernel model and use that to determine cullability... - try: - model = self.parent.kernel_model(kernel_id) - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", - kernel_id, model['name'], model['last_activity']) - - if model['last_activity'] is not None: - # Convert dates to compatible formats. Since both are UTC, strip the TZ info from - # the current time and convert the last_activity string to a datetime. - dt_now = utcnow().replace(tzinfo=None) - dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") - dt_idle = dt_now - dt_last_activity - # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) - is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') - connections = model.get('connections', 0) - is_idle_connected = self.parent.cull_connected or connections == 0 - # Cull the kernel if all three criteria are met - if is_idle_time and is_idle_execute and is_idle_connected: - idle_duration = int(dt_idle.total_seconds()) - self.log.warning( - "Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - model['execution_state'], model['name'], kernel_id, connections, idle_duration) - await maybe_future(self.parent.shutdown_kernel(kernel_id)) - except KeyError: - pass # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. - except Exception as e: # other errors are not as expected, so we'll make some noise, but continue. - self.log.exception("The following exception was encountered while checking the idle " - "duration of kernel %s: %s", kernel_id, e) + return await self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 0d4bc94aced..ff8111c1b3a 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -227,6 +227,7 @@ class KernelFilterTest(NotebookTestBase): } } }) + # Sanity check verifying that the configurable was properly set. def test_config(self): self.assertEqual(self.notebook.kernel_manager.allowed_message_types, ['kernel_info_request']) diff --git a/notebook/services/sessions/sessionmanager.py b/notebook/services/sessions/sessionmanager.py index 5ec35edd0d9..63e18448292 100644 --- a/notebook/services/sessions/sessionmanager.py +++ b/notebook/services/sessions/sessionmanager.py @@ -22,7 +22,7 @@ class SessionManager(LoggingConfigurable): - kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManagerBase') + kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManager') contents_manager = Instance('notebook.services.contents.manager.ContentsManager') # Session database initialized below diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 9323b5966fd..3a02f2535bc 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -273,11 +273,12 @@ class AsyncSessionAPITest(SessionAPITest): def get_argv(cls): argv = super(AsyncSessionAPITest, cls).get_argv() - # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # Before we extend the argv with the class, ensure that appropriate jupyter_client is available. # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. if async_testing_enabled: argv.extend(['--NotebookApp.kernel_manager_class=' 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv def setUp(self):