Skip to content

Commit

Permalink
handle pseudo waiting kernels when switching kernels (#19)
Browse files Browse the repository at this point in the history
* handle pseudo waiting kernels when switching kernels
  • Loading branch information
sigmarkarl authored Oct 9, 2024
1 parent 5628e18 commit ddf0b98
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 38 deletions.
14 changes: 8 additions & 6 deletions jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ async def _async_start_kernel( # type:ignore[override]
kwargs["kernel_id"] = kernel_id
kernel_id = await self.pinned_superclass._async_start_kernel(self, **kwargs)
self._kernel_connections[kernel_id] = 0
task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await task
else:
self._pending_kernel_tasks[kernel_id] = task

# add busy/activity markers:
kernel = self.get_kernel(kernel_id)
kernel.execution_state = "starting" # type:ignore[attr-defined]
Expand All @@ -250,6 +246,12 @@ async def _async_start_kernel( # type:ignore[override]
if env and isinstance(env, dict): # type:ignore[unreachable]
self.log.debug("Kernel argument 'env' passed with: %r", list(env.keys())) # type:ignore[unreachable]

task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await task
else:
self._pending_kernel_tasks[kernel_id] = task

# Increase the metric of number of kernels running
# for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).inc()
Expand Down Expand Up @@ -533,7 +535,7 @@ def list_kernels(self):
# override _check_kernel_id to raise 404 instead of KeyError
def _check_kernel_id(self, kernel_id):
"""Check a that a kernel_id exists and raise 404 if not."""
if kernel_id not in self:
if kernel_id != "waiting" and kernel_id not in self:
raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id)

# monitoring activity:
Expand Down
5 changes: 3 additions & 2 deletions jupyter_server/services/sessions/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ async def patch(self, session_id):
await sm.update_session(session_id, **changes)
s_model = await sm.get_session(session_id=session_id)

if s_model["kernel"]["id"] != before["kernel"]["id"]:
before_id = before["kernel"]["id"]
if before_id != "waiting" and s_model["kernel"]["id"] != before_id:
# kernel_id changed because we got a new kernel
# shutdown the old one
fut = asyncio.ensure_future(ensure_async(km.shutdown_kernel(before["kernel"]["id"])))
fut = asyncio.ensure_future(ensure_async(km.shutdown_kernel(before_id)))
# If we are not using pending kernels, wait for the kernel to shut down
if not getattr(km, "use_pending_kernels", None):
await fut
Expand Down
80 changes: 50 additions & 30 deletions jupyter_server/services/sessions/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,19 @@ async def start_kernel_for_session(
kernel_name : str
the name of the kernel specification to use. The default kernel name will be used if not provided.
"""

# allow contents manager to specify kernels cwd
if self.fut_kernel_id_dict is not None:
if session_id in self.fut_kernel_id_dict:
fut_kernel_id = self.fut_kernel_id_dict[session_id]
if fut_kernel_id.done():
kernel_id = await fut_kernel_id
self.fut_kernel_id_dict.pop(session_id)
return kernel_id
try:
kernel_id = await fut_kernel_id
self.fut_kernel_id_dict.pop(session_id)
return kernel_id
except:
self.log.error(f"kernel start failed, retying ${path} ${kernel_name}")
await self.start_kernel_async(session_id, path, kernel_name)
else:
await self.start_kernel_async(session_id, path, kernel_name)
kernel_id = "waiting"
Expand Down Expand Up @@ -442,11 +447,14 @@ async def save_session(self, session_id, path=None, name=None, type=None, kernel
model : dict
a dictionary of the session model
"""
self.cursor.execute(
"INSERT INTO session VALUES (?,?,?,?,?)",
(session_id, path, name, type, kernel_id),
)
result = await self.get_session(session_id=session_id)
if kernel_id != "waiting":
self.cursor.execute(
"INSERT INTO session VALUES (?,?,?,?,?)",
(session_id, path, name, type, kernel_id),
)
result = await self.get_session(session_id=session_id)
else:
result = self.waiting_session(session_id, path, type, name)
return result

async def get_session(self, **kwargs):
Expand Down Expand Up @@ -538,37 +546,48 @@ async def update_session(self, session_id, **kwargs):
self.cursor.execute(
"SELECT path, name, kernel_id FROM session WHERE session_id=?", [session_id]
)
path, name, kernel_id = self.cursor.fetchone()
self.kernel_manager.update_env(kernel_id=kernel_id, env=self.get_kernel_env(path, name))
try:
path, name, kernel_id = self.cursor.fetchone()
self.kernel_manager.update_env(kernel_id=kernel_id, env=self.get_kernel_env(path, name))
except TypeError:
self.kernel_manager.update_env(kernel_id="waiting", env=self.get_kernel_env("Untitled.ipynb", "Waiting for kernel"))

async def kernel_culled(self, kernel_id: str) -> bool:
"""Checks if the kernel is still considered alive and returns true if its not found."""
return kernel_id not in self.kernel_manager

async def row_to_model(self, row, tolerate_culled=False):
"""Takes sqlite database session row and turns it into a dictionary"""
kernel_culled: bool = await ensure_async(self.kernel_culled(row["kernel_id"]))
if kernel_culled:
# The kernel was culled or died without deleting the session.
# We can't use delete_session here because that tries to find
# and shut down the kernel - so we'll delete the row directly.
#
# If caller wishes to tolerate culled kernels, log a warning
# and return None. Otherwise, raise KeyError with a similar
# message.
self.cursor.execute("DELETE FROM session WHERE session_id=?", (row["session_id"],))
msg = (
"Kernel '{kernel_id}' appears to have been culled or died unexpectedly, "
"invalidating session '{session_id}'. The session has been removed.".format(
kernel_id=row["kernel_id"], session_id=row["session_id"]
)
)
kernel_id = row["kernel_id"]
if kernel_id == "waiting":
if tolerate_culled:
self.log.warning(f"{msg} Continuing...")
return None
raise KeyError(msg)
else:
kernel_model = self.waiting_kernel()
else:
kernel_culled: bool = await ensure_async(self.kernel_culled(kernel_id))
if kernel_culled:
# The kernel was culled or died without deleting the session.
# We can't use delete_session here because that tries to find
# and shut down the kernel - so we'll delete the row directly.
#
# If caller wishes to tolerate culled kernels, log a warning
# and return None. Otherwise, raise KeyError with a similar
# message.
self.cursor.execute("DELETE FROM session WHERE session_id=?", (row["session_id"],))
msg = (
"Kernel '{kernel_id}' appears to have been culled or died unexpectedly, "
"invalidating session '{session_id}'. The session has been removed.".format(
kernel_id=row["kernel_id"], session_id=row["session_id"]
)
)
if tolerate_culled:
self.log.warning(f"{msg} Continuing...")
return None
raise KeyError(msg)

kernel_model = await ensure_async(self.kernel_manager.kernel_model(kernel_id))

kernel_model = await ensure_async(self.kernel_manager.kernel_model(row["kernel_id"]))
model = {
"id": row["session_id"],
"path": row["path"],
Expand All @@ -591,7 +610,8 @@ async def list_sessions(self):
for row in c.fetchall():
try:
model = await self.row_to_model(row)
result.append(model)
if model["kernel"]["id"] != "waiting":
result.append(model)
except KeyError:
pass
return result
Expand Down

0 comments on commit ddf0b98

Please sign in to comment.