Skip to content

Commit

Permalink
Emit events from the kernels service and gateway client
Browse files Browse the repository at this point in the history
Co-authored-by: Zach Sailer <zsailer@apple.com>
  • Loading branch information
Raj Musuku and Zsailer committed Apr 12, 2023
1 parent ca4b062 commit 7d0f52c
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@
"ipython": ("https://ipython.readthedocs.io/en/stable/", None),
"nbconvert": ("https://nbconvert.readthedocs.io/en/stable/", None),
"nbformat": ("https://nbformat.readthedocs.io/en/stable/", None),
"jupyter_core": ("https://jupyter_core.readthedocs.io/en/stable/", None),
"jupyter_core": ("https://jupyter-core.readthedocs.io/en/stable/", None),
"tornado": ("https://www.tornadoweb.org/en/stable/", None),
"traitlets": ("https://traitlets.readthedocs.io/en/stable/", None),
}
Expand Down
40 changes: 40 additions & 0 deletions jupyter_server/event_schemas/gateway_client/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"$id": https://events.jupyter.org/jupyter_server/gateway_client/v1
version: 1
title: Gateway Client activities.
personal-data: true
description: |
Record events of a gateway client.
type: object
required:
- status
- msg
properties:
status:
enum:
- error
- success
description: |
Status received by Gateway client based on the rest api operation to gateway kernel.
This is a required field.
Possible values:
1. error
Error response from a rest api operation to gateway kernel.
2. success
Success response from a rest api operation to gateway kernel.
status_code:
type: number
description: |
Http response codes from a rest api operation to gateway kernel.
Examples: 200, 400, 502, 503, 599 etc.
msg:
type: string
description: |
Description of the event being emitted.
gateway_url:
type: string
description: |
Gateway url where the remote server exist.
71 changes: 71 additions & 0 deletions jupyter_server/event_schemas/kernel_actions/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"$id": https://events.jupyter.org/jupyter_server/kernel_actions/v1
version: 1
title: Kernel Manager activities
personal-data: true
description: |
Record events of a kernel manager.
type: object
required:
- action
- kernel_id
- msg
properties:
action:
enum:
- start
- interrupt
- shutdown
- restart
description: |
Action performed by the Kernel Manager.
This is a required field.
Possible values:
1. start
A kernel has been started with the given kernel id.
2. interrupt
A kernel has been interrupted for the given kernel id.
3. shutdown
A kernel has been shut down for the given kernel id.
4. restart
A kernel has been restarted for the given kernel id.
kernel_id:
type: string
description: |
Kernel id.
This is a required field.
kernel_name:
type: string
description: |
Name of the kernel.
status:
enum:
- error
- success
description: |
Status received from a rest api operation to kernel server.
This is a required field.
Possible values:
1. error
Error response from a rest api operation to kernel server.
2. success
Success response from a rest api operation to kernel server.
status_code:
type: number
description: |
Http response codes from a rest api operation to kernel server.
Examples: 200, 400, 502, 503, 599 etc
msg:
type: string
description: |
Description of the event specified in action.
61 changes: 57 additions & 4 deletions jupyter_server/gateway/gateway_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,31 @@
from http.cookies import SimpleCookie
from socket import gaierror

from jupyter_events import EventLogger
from tornado import web
from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse
from traitlets import Bool, Float, Int, TraitError, Type, Unicode, default, observe, validate
from traitlets import (
Bool,
Float,
Instance,
Int,
TraitError,
Type,
Unicode,
default,
observe,
validate,
)
from traitlets.config import LoggingConfigurable, SingletonConfigurable

from jupyter_server import DEFAULT_EVENTS_SCHEMA_PATH, JUPYTER_SERVER_EVENTS_URI

ERROR_STATUS = "error"
SUCCESS_STATUS = "success"
STATUS_KEY = "status"
STATUS_CODE_KEY = "status_code"
MESSAGE_KEY = "msg"

if ty.TYPE_CHECKING:
from http.cookies import Morsel

Expand Down Expand Up @@ -71,10 +91,30 @@ def get_token(
class GatewayClient(SingletonConfigurable):
"""This class manages the configuration. It's its own singleton class so
that we can share these values across all objects. It also contains some
helper methods to build request arguments out of the various config
options.
helper methods to build request arguments out of the various config
"""

event_schema_id = JUPYTER_SERVER_EVENTS_URI + "/gateway_client/v1"
event_logger = Instance(EventLogger).tag(config=True)

@default("event_logger")
def _default_event_logger(self):
if self.parent and hasattr(self.parent, "event_logger"):
# Event logger is attached from serverapp.
return self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "gateway_client" / "v1.yaml"
logger.register_event_schema(schema_path)
self.log.info("Event is registered in GatewayClient.")
return logger

def emit(self, data):
"""Emit event using the core event schema from Jupyter Server's Gateway Client."""
self.event_logger.emit(schema_id=self.event_schema_id, data=data)

url = Unicode(
default_value=None,
allow_none=True,
Expand All @@ -97,7 +137,9 @@ def _url_validate(self, proposal):
value = proposal["value"]
# Ensure value, if present, starts with 'http'
if value is not None and len(value) > 0 and not str(value).lower().startswith("http"):
raise TraitError("GatewayClient url must start with 'http': '%r'" % value)
message = "GatewayClient url must start with 'http': '%r'" % value
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
raise TraitError(message)
return value

ws_url = Unicode(
Expand All @@ -123,7 +165,9 @@ def _ws_url_validate(self, proposal):
value = proposal["value"]
# Ensure value, if present, starts with 'ws'
if value is not None and len(value) > 0 and not str(value).lower().startswith("ws"):
raise TraitError("GatewayClient ws_url must start with 'ws': '%r'" % value)
message = "GatewayClient ws_url must start with 'ws': '%r'" % value
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
raise TraitError(message)
return value

kernels_endpoint_default_value = "/api/kernels"
Expand Down Expand Up @@ -728,6 +772,9 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
# NOTE: We do this here since this handler is called during the server's startup and subsequent refreshes
# of the tree view.
except HTTPClientError as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: e.code, MESSAGE_KEY: str(e.message)}
)
error_reason = f"Exception while attempting to connect to Gateway server url '{GatewayClient.instance().url}'"
error_message = e.message
if e.response:
Expand All @@ -744,12 +791,18 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
"Ensure gateway url is valid and the Gateway instance is running.",
) from e
except ConnectionError as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 503, MESSAGE_KEY: str(e)}
)
raise web.HTTPError(
503,
f"ConnectionError was received from Gateway server url '{GatewayClient.instance().url}'. "
"Check to be sure the Gateway instance is running.",
) from e
except gaierror as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 404, MESSAGE_KEY: str(e)}
)
raise web.HTTPError(
404,
f"The Gateway server specified in the gateway_url '{GatewayClient.instance().url}' doesn't "
Expand Down
72 changes: 69 additions & 3 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import datetime
import json
import os
import pathlib
import typing as t
from logging import Logger
from queue import Empty, Queue
from threading import Thread
Expand All @@ -18,12 +20,15 @@
from jupyter_client.manager import AsyncKernelManager
from jupyter_client.managerabc import KernelManagerABC
from jupyter_core.utils import ensure_async
from jupyter_events import EventLogger
from jupyter_events.schema_registry import SchemaRegistryException
from tornado import web
from tornado.escape import json_decode, json_encode, url_escape, utf8
from traitlets import DottedObjectName, Instance, Type, default
from traitlets import DottedObjectName, Instance, List, Type, default

from .. import DEFAULT_EVENTS_SCHEMA_PATH
from .._tz import UTC
from ..services.kernels.kernelmanager import AsyncMappingKernelManager
from ..services.kernels.kernelmanager import AsyncMappingKernelManager, emit_kernel_action_event
from ..services.sessions.sessionmanager import SessionManager
from ..utils import url_path_join
from .gateway_client import GatewayClient, gateway_request
Expand Down Expand Up @@ -79,7 +84,6 @@ async def start_kernel(self, *, kernel_id=None, path=None, **kwargs):
await km.start_kernel(kernel_id=kernel_id, **kwargs)
kernel_id = km.kernel_id
self._kernels[kernel_id] = km

# Initialize culling if not already
if not self._initialized_culler:
self.initialize_culler()
Expand Down Expand Up @@ -290,6 +294,7 @@ async def list_kernel_specs(self):
response = await gateway_request(kernel_spec_url, method="GET")
kernel_specs = json_decode(response.body)
kernel_specs = self._replace_path_kernelspec_resources(kernel_specs)
self.log.debug(f"Retrieved list of kernel specs for the uri: {kernel_spec_url}")
return kernel_specs

async def get_kernel_spec(self, kernel_name, **kwargs):
Expand Down Expand Up @@ -376,6 +381,49 @@ class GatewayKernelManager(AsyncKernelManager):
def _default_cache_ports(self):
return False # no need to cache ports here

# A list of pathlib objects, each pointing at an event
# schema to register with this kernel manager's eventlogger.
# This trait should not be overridden.

@property
def core_event_schema_paths(self) -> t.List[pathlib.Path]:
return [DEFAULT_EVENTS_SCHEMA_PATH / "kernel_actions" / "v1.yaml"]

# This trait is intended for subclasses to override and define
# custom event schemas.
extra_event_schema_paths = List(
default_value=[],
help="""
A list of pathlib.Path objects pointing at to register with
the kernel manager's eventlogger.
""",
).tag(config=True)

event_logger = Instance(EventLogger)

@default("event_logger")
def _default_event_logger(self):
"""Initialize the logger and ensure all required events are present."""
if self.parent is not None and hasattr(self.parent, "event_logger"):
logger = self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
# Ensure that all the expected schemas are registered. If not, register them.
schemas = self.core_event_schema_paths + self.extra_event_schema_paths
for schema_path in schemas:
# Try registering the event.
try:
logger.register_event_schema(schema_path)
# Pass if it already exists.
except SchemaRegistryException:
pass
return logger

def emit(self, schema_id, data):
"""Emit an event from the kernel manager."""
self.event_logger.emit(schema_id=schema_id, data=data)

def __init__(self, **kwargs):
"""Initialize the gateway kernel manager."""
super().__init__(**kwargs)
Expand Down Expand Up @@ -458,6 +506,10 @@ async def refresh_model(self, model=None):
# Kernel management
# --------------------------------------------------------------------------

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was started.",
error_msg="Kernel {kernel_id} failed to start.",
)
async def start_kernel(self, **kwargs):
"""Starts a kernel via HTTP in an asynchronous manner.
Expand Down Expand Up @@ -509,6 +561,10 @@ async def start_kernel(self, **kwargs):
self.kernel = await self.refresh_model()
self.log.info(f"GatewayKernelManager using existing kernel: {self.kernel_id}")

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was shutdown.",
error_msg="Kernel {kernel_id} failed to shutdown.",
)
async def shutdown_kernel(self, now=False, restart=False):
"""Attempts to stop the kernel process cleanly via HTTP."""

Expand All @@ -523,6 +579,10 @@ async def shutdown_kernel(self, now=False, restart=False):
else:
raise

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was restarted.",
error_msg="Kernel {kernel_id} failed to restart.",
)
async def restart_kernel(self, **kw):
"""Restarts a kernel via HTTP."""
if self.has_kernel:
Expand All @@ -537,6 +597,10 @@ async def restart_kernel(self, **kw):
)
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was interrupted.",
error_msg="Kernel {kernel_id} failed to interrupt.",
)
async def interrupt_kernel(self):
"""Interrupts the kernel via an HTTP request."""
if self.has_kernel:
Expand All @@ -556,8 +620,10 @@ async def is_alive(self):
if self.has_kernel:
# Go ahead and issue a request to get the kernel
self.kernel = await self.refresh_model()
self.log.debug(f"The kernel: {self.kernel} is alive.")
return True
else: # we don't have a kernel
self.log.debug(f"The kernel: {self.kernel} no longer exists.")
return False

def cleanup_resources(self, restart=False):
Expand Down
2 changes: 2 additions & 0 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,8 @@ def init_event_logger(self):
# events URI, `JUPYTER_SERVER_EVENTS_URI`.
schema_ids = [
"https://events.jupyter.org/jupyter_server/contents_service/v1",
"https://events.jupyter.org/jupyter_server/gateway_client/v1",
"https://events.jupyter.org/jupyter_server/kernel_actions/v1",
]
for schema_id in schema_ids:
# Get the schema path from the schema ID.
Expand Down
Loading

0 comments on commit 7d0f52c

Please sign in to comment.