Skip to content

Commit

Permalink
Trigger a websocket reconnect event when scala kernels do not connect…
Browse files Browse the repository at this point in the history
… the first time. (jupyter-server#438)
  • Loading branch information
Zsailer authored and GitHub Enterprise committed Aug 4, 2022
1 parent 4991521 commit 4c9d1fc
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _dead_state(self):
def _disconnected_state(self):
self.log.warning(f"No heartbeat detected for: {self.kernel_manager.kernel_id}")
self._emit(
state=constants.KERNEL_STATE.NO_HEARTBEAT,
state=constants.KERNEL_STATE.DISCONNECTED,
msg="Kernel was found, but no heartbeat was detected.",
)
# Trigger a "kernel-disconnected" event.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
$id: event.datastudio.jupyter.com/kernel-scala-bandaid
version: 1
title: Scala Bandaid
description: |
Addresses a bug we see in Scala Kernels. They don't always connect
on the first go around. If this happens, we trigger an event
that JupyterLab can listen for and re-establish a websocket connection.
type: object
properties:
msg:
title: Message type
description: |
Message returned by the event.
required:
- msg
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"event.datastudio.jupyter.com/kernel-failed",
"event.datastudio.jupyter.com/kernel-no-heartbeat",
"event.datastudio.jupyter.com/syncing-state",
"event.datastudio.jupyter.com/kernel-scala-bandaid",
]


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import time

from jupyter_client import KernelManager
from jupyter_server._tz import utcnow
from jupyter_server.base.zmqhandlers import AuthenticatedZMQStreamHandler
Expand Down Expand Up @@ -67,6 +65,49 @@ def give_up():
# actually wait for it
await future

# This is a complete fork of the Jupyter Websocket handler
# method. We don't want to reply on this long time, but should
# work on getting this into Jupyter Telemetry.
def _on_zmq_reply(self, stream, msg_list):
idents, fed_msg_list = self.session.feed_identities(msg_list)

if self.subprotocol == "v1.kernel.websocket.jupyter.org":
msg = {"header": None, "parent_header": None, "content": None}
else:
msg = self.session.deserialize(fed_msg_list)

# This chunk is (the only) DataStudio customization for collecting hubble metrics.
# if this chunk fails, don't stop messages from going through. just log a warning.
try:
hubble_execution_time_and_count(msg)
except Exception as err:
self.log.warning(err)

channel = getattr(stream, "channel", None)
parts = fed_msg_list[1:]

self._on_error(channel, msg, parts)

if self._limit_rate(channel, msg, parts):
return

if self.subprotocol == "v1.kernel.websocket.jupyter.org":
AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, parts)
else:
AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, msg)

# def _soft_nudge(self):
# kernel: KernelManager = self.kernel_manager.get_kernel(self.kernel_id)
# self.log.debug(
# f"Kernel ({self.kernel_id}) says it's 'busy'. Trying a "
# "soft nudge to verify that the kernel is *truly* busy."
# )
# tmp_shell_channel = kernel.connect_shell()
# self.session.send(tmp_shell_channel, "kernel_info_request")
# time.sleep(3.0)

# tmp_shell_channel.close()

def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
Expand All @@ -75,7 +116,7 @@ def nudge(self):
sockets are fully connected, and kernel is responsive.
Keeps retrying kernel_info_request until these are both received.
"""
kernel = self.kernel_manager.get_kernel(self.kernel_id)
kernel: KernelManager = self.kernel_manager.get_kernel(self.kernel_id)
kernel._emit(
state=constants.KERNEL_STATE.CONNECTING,
msg="Connecting to the kernel's channels.",
Expand All @@ -87,8 +128,30 @@ def nudge(self):
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
if getattr(kernel, "execution_state", None) == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
f: Future = Future()
# Special case Scala kernels for initial connection issues.
# This triggers an event to frontend that retriggers a reconnect
# event in the UI. We only attempt this reconnect 1 time.
language = kernel.kernel_spec.language.lower()
if language == "scala" and not getattr(kernel, "scala_bandaid", False):

def apply_bandaid(_=None):
self.log.info(
"Scala kernel failed to connect this time. Triggering the bandaid event."
)
kernel.event_bus.record_event(
schema_name="event.datastudio.jupyter.com/kernel-scala-bandaid",
version=1,
event={"msg": "Kernel is reconnecting"},
)

# User a temporary property to track if this scala kernel
# has had the bandaid previously applied.
kernel.scala_bandaid = True
f.add_done_callback(apply_bandaid)
else:
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
self.log.debug("Kernel appears to be executing a long running cell?")
f.set_result(None)
return f

Expand Down Expand Up @@ -235,186 +298,6 @@ def nudge(count):
future.add_done_callback(finish)
return future

# This is a complete fork of the Jupyter Websocket handler
# method. We don't want to reply on this long time, but should
# work on getting this into Jupyter Telemetry.
def _on_zmq_reply(self, stream, msg_list):
idents, fed_msg_list = self.session.feed_identities(msg_list)

if self.subprotocol == "v1.kernel.websocket.jupyter.org":
msg = {"header": None, "parent_header": None, "content": None}
else:
msg = self.session.deserialize(fed_msg_list)

# This chunk is (the only) DataStudio customization for collecting hubble metrics.
# if this chunk fails, don't stop messages from going through. just log a warning.
try:
hubble_execution_time_and_count(msg)
except Exception as err:
self.log.warning(err)

channel = getattr(stream, "channel", None)
parts = fed_msg_list[1:]

self._on_error(channel, msg, parts)

if self._limit_rate(channel, msg, parts):
return

if self.subprotocol == "v1.kernel.websocket.jupyter.org":
AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, parts)
else:
AuthenticatedZMQStreamHandler._on_zmq_reply(self, stream, msg)

def _soft_nudge(self):
kernel: KernelManager = self.kernel_manager.get_kernel(self.kernel_id)
self.log.debug(
f"Kernel ({self.kernel_id}) says it's 'busy'. Trying a "
"soft nudge to verify that the kernel is *truly* busy."
)
tmp_shell_channel = kernel.connect_shell()
self.session.send(tmp_shell_channel, "kernel_info_request")
time.sleep(3.0)
tmp_shell_channel.close()

def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
a shell or control reply and at least one iopub message,
ensuring that zmq subscriptions are established,
sockets are fully connected, and kernel is responsive.
Keeps retrying kernel_info_request until these are both received.
"""
kernel: KernelManager = self.kernel_manager.get_kernel(self.kernel_id)

# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
# nudging in this case would cause a potentially very long wait
# before connections are opened,
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
# The IOPub used by the client, whose subscriptions we are verifying.
iopub_channel = self.channels["iopub"]

# Special case Scala kernels for initial connection issues.
if getattr(kernel, "execution_state", None) == "busy":
self._soft_nudge()
# If still busy, this is probl
if getattr(kernel, "execution_state", None) == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
self.log.debug("Kernel appears to be executing a long running cell?")
f: Future = Future()
f.set_result(None)
return f

# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
shell_channel = kernel.connect_shell()
# Use a transient control channel to prevent leaking
# control responses to the front-end.
control_channel = kernel.connect_control()

info_future: Future = Future()
iopub_future: Future = Future()
both_done = gen.multi([info_future, iopub_future])

def finish(_=None):
"""Ensure all futures are resolved
which in turn triggers cleanup
"""
for f in (info_future, iopub_future):
if not f.done():
f.set_result(None)

def cleanup(_=None):
"""Common cleanup"""
loop.remove_timeout(nudge_handle) # type:ignore[has-type]
iopub_channel.stop_on_recv()
if not shell_channel.closed():
shell_channel.close()
if not control_channel.closed():
control_channel.close()

# trigger cleanup when both message futures are resolved
both_done.add_done_callback(cleanup)

def on_shell_reply(msg):
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving shell future: %s", self.kernel_id)
info_future.set_result(None)

def on_control_reply(msg):
self.log.debug("Nudge: control info reply received: %s", self.kernel_id)
if not info_future.done():
self.log.debug("Nudge: resolving control future: %s", self.kernel_id)
info_future.set_result(None)

def on_iopub(msg):
self.log.debug("Nudge: IOPub received: %s", self.kernel_id)
if not iopub_future.done():
iopub_channel.stop_on_recv()
self.log.debug("Nudge: resolving iopub future: %s", self.kernel_id)
iopub_future.set_result(None)

iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
control_channel.on_recv(on_control_reply)
loop = IOLoop.current()

# Nudge the kernel with kernel info requests until we get an IOPub message
def nudge(count):
count += 1

# NOTE: this close check appears to never be True during on_open,
# even when the peer has closed the connection
if self.ws_connection is None or self.ws_connection.is_closing():
self.log.debug(
"Nudge: cancelling on closed websocket: %s", self.kernel_id
)
finish()
return

# check for stopped kernel
if self.kernel_id not in self.kernel_manager:
self.log.debug(
"Nudge: cancelling on stopped kernel: %s", self.kernel_id
)
finish()
return

# check for closed zmq socket
if shell_channel.closed():
self.log.debug(
"Nudge: cancelling on closed zmq socket: %s", self.kernel_id
)
finish()
return

# check for closed zmq socket
if control_channel.closed():
self.log.debug(
"Nudge: cancelling on closed zmq socket: %s", self.kernel_id
)
finish()
return

if not both_done.done():
log = self.log.warning if count % 10 == 0 else self.log.debug
log(f"Nudge: attempt {count} on kernel {self.kernel_id}")
self.session.send(shell_channel, "kernel_info_request")
self.session.send(control_channel, "kernel_info_request")
nonlocal nudge_handle # type:ignore[misc]
nudge_handle = loop.call_later(0.5, nudge, count)

nudge_handle = loop.call_later(0, nudge, count=0)

# resolve with a timeout if we get no response
future = gen.with_timeout(loop.time() + self.kernel_info_timeout, both_done)
# ensure we have no dangling resources or unresolved Futures in case of timeout
future.add_done_callback(finish)
return future


handlers = [
(
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { NotebookPixiedustShimPlugin } from './pixiedustshim';
import { KernelInfoPlugin } from './kernelinfo';
import { RunningTabPlugin } from './runningtab';
import { NoKernelHeartbeatPlugin } from './noheartbeat';
import { ScalaBandaidPlugin } from './scalakernelbandaid';

const plugins: JupyterFrontEndPlugin<any>[] = [
kernelStatusPlugin,
Expand All @@ -33,7 +34,8 @@ const plugins: JupyterFrontEndPlugin<any>[] = [
NotebookPixiedustShimPlugin,
KernelInfoPlugin,
RunningTabPlugin,
NoKernelHeartbeatPlugin
NoKernelHeartbeatPlugin,
ScalaBandaidPlugin
];

export default plugins;
32 changes: 32 additions & 0 deletions src/scalakernelbandaid.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {
JupyterFrontEnd,
JupyterFrontEndPlugin
} from '@jupyterlab/application';

import { EventListener } from './eventlistener';
import {
INotebookTracker
// NotebookPanel
// INotebookModel
} from '@jupyterlab/notebook';
/**
* A plugin to capture failure launching kernel and surface to the user.
*/
export const ScalaBandaidPlugin: JupyterFrontEndPlugin<void> = {
id: 'data_studio:scala_bandaid_plugin',
autoStart: true,
requires: [],
activate: async (app: JupyterFrontEnd, notebookTracker: INotebookTracker) => {
console.log('JupyterLab extension "Scala Bandaid Dialog" is activated!');
// Create a listener for kernel-failed events.
const listener = EventListener.getInstance();

listener.addCallback(
'event.datastudio.jupyter.com/kernel-scala-bandaid',
(data: any) => {
// Reconnect to the kernel
app.commands.execute('reconnect-to-kernel');
}
);
}
};
2 changes: 1 addition & 1 deletion src/status.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ const StatusBanner = (props: IStatus) => {
break;
}

case 'No heartbeat': {
case 'Disconnected': {
content = (
<ThemeProvider theme={theme}>
<Text textStyle="bodyEmph" display="inline-flex" alignItems="center">
Expand Down

0 comments on commit 4c9d1fc

Please sign in to comment.