From f8fa9f98fa3273f6f7319cb720dbd16f9e79b636 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Thu, 7 Mar 2019 14:49:00 -0800 Subject: [PATCH] Add support for async kernel starts --- jupyter_client/manager.py | 63 ++++++++++++++++++++++++---- jupyter_client/multikernelmanager.py | 37 ++++++++++++---- 2 files changed, 85 insertions(+), 15 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index bf94ad002..354f105c2 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -13,7 +13,8 @@ import time import zmq - +from concurrent.futures import Future +from tornado import gen from ipython_genutils.importstring import import_item from .localinterfaces import is_local_ip, local_ips from traitlets import ( @@ -192,6 +193,15 @@ def _launch_kernel(self, kernel_cmd, **kw): """ return launch_kernel(kernel_cmd, **kw) + @gen.coroutine + def launch_kernel_async(self, kernel_cmd, **kw): + """actually launch the kernel + + override in a subclass to launch kernel subprocesses differently + """ + res = yield gen.maybe_future(launch_kernel(kernel_cmd, **kw)) + raise gen.Return(res) + # Control socket used for polite kernel shutdown def _connect_control_socket(self): @@ -205,8 +215,8 @@ def _close_control_socket(self): self._control_socket.close() self._control_socket = None - def start_kernel(self, **kw): - """Starts a kernel on this host in a separate process. + def pre_start_kernel(self, **kw): + """Prepares a kernel for startup in a separate process. If random ports (port=0) are being used, this method must be called before the channels are created. @@ -243,14 +253,53 @@ def start_kernel(self, **kw): env.update(self.kernel_spec.env or {}) elif self.extra_env: env.update(self.extra_env) + kw['env'] = env - # launch the kernel subprocess - self.log.debug("Starting kernel: %s", kernel_cmd) - self.kernel = self._launch_kernel(kernel_cmd, env=env, - **kw) + return kernel_cmd, kw + + def post_start_kernel(self, **kw): self.start_restarter() self._connect_control_socket() + def start_kernel(self, **kw): + """Starts a kernel on this host in a separate process. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters + ---------- + `**kw` : optional + keyword arguments that are passed down to build the kernel_cmd + and launching the kernel (e.g. Popen kwargs). + """ + kernel_cmd, kw = self.pre_start_kernel(**kw) + + # launch the kernel subprocess + self.log.debug("Starting kernel: %s", kernel_cmd) + self.kernel = self._launch_kernel(kernel_cmd, **kw) + self.post_start_kernel(**kw) + + @gen.coroutine + def start_kernel_async(self, **kw): + """Starts a kernel in a separate process in an asynchronous manner. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters + ---------- + `**kw` : optional + keyword arguments that are passed down to build the kernel_cmd + and launching the kernel (e.g. Popen kwargs). + """ + kernel_cmd, kw = self.pre_start_kernel(**kw) + + # launch the kernel subprocess + self.log.debug("Starting kernel (async): %s", kernel_cmd) + self.kernel = yield self.launch_kernel_async(kernel_cmd, **kw) + self.post_start_kernel(**kw) + def request_shutdown(self, restart=False): """Send a shutdown request via control channel """ diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index c2b61bde0..ea8c5347e 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -10,6 +10,8 @@ import zmq +from concurrent.futures import Future +from tornado import gen from traitlets.config.configurable import LoggingConfigurable from ipython_genutils.importstring import import_item from traitlets import ( @@ -82,14 +84,8 @@ def __len__(self): def __contains__(self, kernel_id): return kernel_id in self._kernels - def start_kernel(self, kernel_name=None, **kwargs): - """Start a new kernel. - - The caller can pick a kernel_id by passing one in as a keyword arg, - otherwise one will be generated using new_kernel_id(). - - The kernel ID for the newly started kernel is returned. - """ + def pre_start_kernel(self, kernel_name=None, **kwargs): + """Prepare kernel manager for startup """ kernel_id = kwargs.pop('kernel_id', self.new_kernel_id(**kwargs)) if kernel_id in self: raise DuplicateKernelError('Kernel already exists: %s' % kernel_id) @@ -107,10 +103,35 @@ def start_kernel(self, kernel_name=None, **kwargs): parent=self, log=self.log, kernel_name=kernel_name, **constructor_kwargs ) + return kernel_id, km + + def start_kernel(self, kernel_name=None, **kwargs): + """Start a new kernel. + + The caller can pick a kernel_id by passing one in as a keyword arg, + otherwise one will be generated using new_kernel_id(). + + The kernel ID for the newly started kernel is returned. + """ + kernel_id, km = self.pre_start_kernel(kernel_name, **kwargs) km.start_kernel(**kwargs) self._kernels[kernel_id] = km return kernel_id + @gen.coroutine + def start_kernel_async(self, kernel_name=None, **kwargs): + """Start a new kernel asynchronously. + + The caller can pick a kernel_id by passing one in as a keyword arg, + otherwise one will be generated using new_kernel_id(). + + The kernel ID for the newly started kernel is returned. + """ + kernel_id, km = self.pre_start_kernel(kernel_name, **kwargs) + yield km.start_kernel_async(**kwargs) + self._kernels[kernel_id] = km + raise gen.Return(kernel_id) + @kernel_method def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by its kernel uuid.