diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py index cc6ec8edc..33ac6073d 100644 --- a/jupyter_server/services/kernels/kernelmanager.py +++ b/jupyter_server/services/kernels/kernelmanager.py @@ -232,11 +232,7 @@ async def _async_start_kernel( # type:ignore[override] kwargs["kernel_id"] = kernel_id kernel_id = await self.pinned_superclass._async_start_kernel(self, **kwargs) self._kernel_connections[kernel_id] = 0 - task = asyncio.create_task(self._finish_kernel_start(kernel_id)) - if not getattr(self, "use_pending_kernels", None): - await task - else: - self._pending_kernel_tasks[kernel_id] = task + # add busy/activity markers: kernel = self.get_kernel(kernel_id) kernel.execution_state = "starting" # type:ignore[attr-defined] @@ -250,6 +246,12 @@ async def _async_start_kernel( # type:ignore[override] if env and isinstance(env, dict): # type:ignore[unreachable] self.log.debug("Kernel argument 'env' passed with: %r", list(env.keys())) # type:ignore[unreachable] + task = asyncio.create_task(self._finish_kernel_start(kernel_id)) + if not getattr(self, "use_pending_kernels", None): + await task + else: + self._pending_kernel_tasks[kernel_id] = task + # Increase the metric of number of kernels running # for the relevant kernel type by 1 KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).inc() @@ -533,7 +535,7 @@ def list_kernels(self): # override _check_kernel_id to raise 404 instead of KeyError def _check_kernel_id(self, kernel_id): """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: + if kernel_id != "waiting" and kernel_id not in self: raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id) # monitoring activity: diff --git a/jupyter_server/services/sessions/handlers.py b/jupyter_server/services/sessions/handlers.py index 5f2429a0c..072787b77 100644 --- a/jupyter_server/services/sessions/handlers.py +++ b/jupyter_server/services/sessions/handlers.py @@ -175,10 +175,11 @@ async def patch(self, session_id): await sm.update_session(session_id, **changes) s_model = await sm.get_session(session_id=session_id) - if s_model["kernel"]["id"] != before["kernel"]["id"]: + before_id = before["kernel"]["id"] + if before_id != "waiting" and s_model["kernel"]["id"] != before_id: # kernel_id changed because we got a new kernel # shutdown the old one - fut = asyncio.ensure_future(ensure_async(km.shutdown_kernel(before["kernel"]["id"]))) + fut = asyncio.ensure_future(ensure_async(km.shutdown_kernel(before_id))) # If we are not using pending kernels, wait for the kernel to shut down if not getattr(km, "use_pending_kernels", None): await fut diff --git a/jupyter_server/services/sessions/sessionmanager.py b/jupyter_server/services/sessions/sessionmanager.py index 69a1763c3..f5c01c715 100644 --- a/jupyter_server/services/sessions/sessionmanager.py +++ b/jupyter_server/services/sessions/sessionmanager.py @@ -364,14 +364,19 @@ async def start_kernel_for_session( kernel_name : str the name of the kernel specification to use. The default kernel name will be used if not provided. """ + # allow contents manager to specify kernels cwd if self.fut_kernel_id_dict is not None: if session_id in self.fut_kernel_id_dict: fut_kernel_id = self.fut_kernel_id_dict[session_id] if fut_kernel_id.done(): - kernel_id = await fut_kernel_id - self.fut_kernel_id_dict.pop(session_id) - return kernel_id + try: + kernel_id = await fut_kernel_id + self.fut_kernel_id_dict.pop(session_id) + return kernel_id + except: + self.log.error(f"kernel start failed, retying ${path} ${kernel_name}") + await self.start_kernel_async(session_id, path, kernel_name) else: await self.start_kernel_async(session_id, path, kernel_name) kernel_id = "waiting" @@ -442,11 +447,14 @@ async def save_session(self, session_id, path=None, name=None, type=None, kernel model : dict a dictionary of the session model """ - self.cursor.execute( - "INSERT INTO session VALUES (?,?,?,?,?)", - (session_id, path, name, type, kernel_id), - ) - result = await self.get_session(session_id=session_id) + if kernel_id != "waiting": + self.cursor.execute( + "INSERT INTO session VALUES (?,?,?,?,?)", + (session_id, path, name, type, kernel_id), + ) + result = await self.get_session(session_id=session_id) + else: + result = self.waiting_session(session_id, path, type, name) return result async def get_session(self, **kwargs): @@ -538,8 +546,11 @@ async def update_session(self, session_id, **kwargs): self.cursor.execute( "SELECT path, name, kernel_id FROM session WHERE session_id=?", [session_id] ) - path, name, kernel_id = self.cursor.fetchone() - self.kernel_manager.update_env(kernel_id=kernel_id, env=self.get_kernel_env(path, name)) + try: + path, name, kernel_id = self.cursor.fetchone() + self.kernel_manager.update_env(kernel_id=kernel_id, env=self.get_kernel_env(path, name)) + except TypeError: + self.kernel_manager.update_env(kernel_id="waiting", env=self.get_kernel_env("Untitled.ipynb", "Waiting for kernel")) async def kernel_culled(self, kernel_id: str) -> bool: """Checks if the kernel is still considered alive and returns true if its not found.""" @@ -547,28 +558,36 @@ async def kernel_culled(self, kernel_id: str) -> bool: async def row_to_model(self, row, tolerate_culled=False): """Takes sqlite database session row and turns it into a dictionary""" - kernel_culled: bool = await ensure_async(self.kernel_culled(row["kernel_id"])) - if kernel_culled: - # The kernel was culled or died without deleting the session. - # We can't use delete_session here because that tries to find - # and shut down the kernel - so we'll delete the row directly. - # - # If caller wishes to tolerate culled kernels, log a warning - # and return None. Otherwise, raise KeyError with a similar - # message. - self.cursor.execute("DELETE FROM session WHERE session_id=?", (row["session_id"],)) - msg = ( - "Kernel '{kernel_id}' appears to have been culled or died unexpectedly, " - "invalidating session '{session_id}'. The session has been removed.".format( - kernel_id=row["kernel_id"], session_id=row["session_id"] - ) - ) + kernel_id = row["kernel_id"] + if kernel_id == "waiting": if tolerate_culled: - self.log.warning(f"{msg} Continuing...") return None - raise KeyError(msg) + else: + kernel_model = self.waiting_kernel() + else: + kernel_culled: bool = await ensure_async(self.kernel_culled(kernel_id)) + if kernel_culled: + # The kernel was culled or died without deleting the session. + # We can't use delete_session here because that tries to find + # and shut down the kernel - so we'll delete the row directly. + # + # If caller wishes to tolerate culled kernels, log a warning + # and return None. Otherwise, raise KeyError with a similar + # message. + self.cursor.execute("DELETE FROM session WHERE session_id=?", (row["session_id"],)) + msg = ( + "Kernel '{kernel_id}' appears to have been culled or died unexpectedly, " + "invalidating session '{session_id}'. The session has been removed.".format( + kernel_id=row["kernel_id"], session_id=row["session_id"] + ) + ) + if tolerate_culled: + self.log.warning(f"{msg} Continuing...") + return None + raise KeyError(msg) + + kernel_model = await ensure_async(self.kernel_manager.kernel_model(kernel_id)) - kernel_model = await ensure_async(self.kernel_manager.kernel_model(row["kernel_id"])) model = { "id": row["session_id"], "path": row["path"], @@ -591,7 +610,8 @@ async def list_sessions(self): for row in c.fetchall(): try: model = await self.row_to_model(row) - result.append(model) + if model["kernel"]["id"] != "waiting": + result.append(model) except KeyError: pass return result