diff --git a/docs/environment.yml b/docs/environment.yml index 5d77bc7bb4..61eee5ed3f 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -13,4 +13,5 @@ dependencies: - sphinxcontrib_github_alt - sphinxcontrib-openapi - sphinxemoji + - terminado - git+https://github.com/pandas-dev/pydata-sphinx-theme.git@master \ No newline at end of file diff --git a/jupyter_server/serverapp.py b/jupyter_server/serverapp.py index 78f2efcefd..43d830a6f9 100755 --- a/jupyter_server/serverapp.py +++ b/jupyter_server/serverapp.py @@ -116,6 +116,13 @@ from jupyter_server.extension.config import ExtensionConfigManager from jupyter_server.traittypes import TypeFromClasses +# Tolerate missing terminado package. +try: + from .terminal import TerminalManager + terminado_available = True +except ImportError: + terminado_available = False + #----------------------------------------------------------------------------- # Module globals #----------------------------------------------------------------------------- @@ -284,7 +291,7 @@ def init_settings(self, jupyter_app, kernel_manager, contents_manager, allow_password_change=jupyter_app.allow_password_change, server_root_dir=root_dir, jinja2_env=env, - terminals_available=False, # Set later if terminals are available + terminals_available=terminado_available and jupyter_app.terminals_enabled, serverapp=jupyter_app ) @@ -589,6 +596,8 @@ class ServerApp(JupyterApp): ContentsManager, FileContentsManager, AsyncContentsManager, AsyncFileContentsManager, NotebookNotary, GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient ] + if terminado_available: # Only necessary when terminado is available + classes.append(TerminalManager) subcommands = dict( list=(JupyterServerListApp, JupyterServerListApp.description.splitlines()[0]), @@ -1329,6 +1338,15 @@ def _update_server_extensions(self, change): is not available. """)) + # Since use of terminals is also a function of whether the terminado package is + # available, this variable holds the "final indication" of whether terminal functionality + # should be considered (particularly during shutdown/cleanup). It is enabled only + # once both the terminals "service" can be initialized and terminals_enabled is True. + # Note: this variable is slightly different from 'terminals_available' in the web settings + # in that this variable *could* remain false if terminado is available, yet the terminal + # service's initialization still fails. As a result, this variable holds the truth. + terminals_available = False + authenticate_prometheus = Bool( True, help="""" @@ -1547,7 +1565,7 @@ def init_terminals(self): try: from .terminal import initialize initialize(self.web_app, self.root_dir, self.connection_url, self.terminado_settings) - self.web_app.settings['terminals_available'] = True + self.terminals_available = True except ImportError as e: self.log.warning(_i18n("Terminals not available (error was %s)"), e) @@ -1693,11 +1711,8 @@ def shutdown_no_activity(self): if len(km) != 0: return # Kernels still running - try: + if self.terminals_available: term_mgr = self.web_app.settings['terminal_manager'] - except KeyError: - pass # Terminals not enabled - else: if term_mgr.terminals: return # Terminals still running @@ -1846,6 +1861,21 @@ def cleanup_kernels(self): self.log.info(kernel_msg % n_kernels) run_sync(self.kernel_manager.shutdown_all()) + def cleanup_terminals(self): + """Shutdown all terminals. + + The terminals will shutdown themselves when this process no longer exists, + but explicit shutdown allows the TerminalManager to cleanup. + """ + if not self.terminals_available: + return + + terminal_manager = self.web_app.settings['terminal_manager'] + n_terminals = len(terminal_manager.list()) + terminal_msg = trans.ngettext('Shutting down %d terminal', 'Shutting down %d terminals', n_terminals) + self.log.info(terminal_msg % n_terminals) + run_sync(terminal_manager.terminate_all()) + def running_server_info(self, kernel_count=True): "Return the current working directory and the server url information" info = self.contents_manager.info_string() + "\n" @@ -2076,6 +2106,7 @@ def _cleanup(self): self.remove_server_info_file() self.remove_browser_open_files() self.cleanup_kernels() + self.cleanup_terminals() def start_ioloop(self): """Start the IO Loop.""" diff --git a/jupyter_server/services/api/api.yaml b/jupyter_server/services/api/api.yaml index 956367bdfc..6ca22a0a69 100644 --- a/jupyter_server/services/api/api.yaml +++ b/jupyter_server/services/api/api.yaml @@ -568,7 +568,7 @@ paths: schema: type: array items: - $ref: '#/definitions/Terminal_ID' + $ref: '#/definitions/Terminal' 403: description: Forbidden to access 404: @@ -582,7 +582,7 @@ paths: 200: description: Succesfully created a new terminal schema: - $ref: '#/definitions/Terminal_ID' + $ref: '#/definitions/Terminal' 403: description: Forbidden to access 404: @@ -599,7 +599,7 @@ paths: 200: description: Terminal session with given id schema: - $ref: '#/definitions/Terminal_ID' + $ref: '#/definitions/Terminal' 403: description: Forbidden to access 404: @@ -845,12 +845,18 @@ definitions: type: string description: Last modified timestamp format: dateTime - Terminal_ID: - description: A Terminal_ID object + Terminal: + description: A Terminal object type: object required: - name properties: name: type: string - description: name of terminal ID + description: name of terminal + last_activity: + type: string + description: | + ISO 8601 timestamp for the last-seen activity on this terminal. Use + this to identify which terminals have been inactive since a given time. + Timestamps will be UTC, indicated 'Z' suffix. diff --git a/jupyter_server/terminal/__init__.py b/jupyter_server/terminal/__init__.py index ea51ab1353..6e9171cb97 100644 --- a/jupyter_server/terminal/__init__.py +++ b/jupyter_server/terminal/__init__.py @@ -8,11 +8,10 @@ raise ImportError("terminado >= 0.8.3 required, found %s" % terminado.__version__) from ipython_genutils.py3compat import which -from terminado import NamedTermManager -from tornado.log import app_log from jupyter_server.utils import url_path_join as ujoin from . import api_handlers from .handlers import TermSocket +from .terminalmanager import TerminalManager def initialize(webapp, root_dir, connection_url, settings): @@ -33,13 +32,14 @@ def initialize(webapp, root_dir, connection_url, settings): # the user has specifically set a preferred shell command. if os.name != 'nt' and shell_override is None and not sys.stdout.isatty(): shell.append('-l') - terminal_manager = webapp.settings['terminal_manager'] = NamedTermManager( + terminal_manager = webapp.settings['terminal_manager'] = TerminalManager( shell_command=shell, extra_env={'JUPYTER_SERVER_ROOT': root_dir, 'JUPYTER_SERVER_URL': connection_url, }, + parent=webapp.settings['serverapp'], ) - terminal_manager.log = app_log + terminal_manager.log = webapp.settings['serverapp'].log base_url = webapp.settings['base_url'] handlers = [ (ujoin(base_url, r"/terminals/websocket/(\w+)"), TermSocket, diff --git a/jupyter_server/terminal/api_handlers.py b/jupyter_server/terminal/api_handlers.py index 3967692092..92bb624289 100644 --- a/jupyter_server/terminal/api_handlers.py +++ b/jupyter_server/terminal/api_handlers.py @@ -1,33 +1,22 @@ import json from tornado import web from ..base.handlers import APIHandler -from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL - class TerminalRootHandler(APIHandler): @web.authenticated def get(self): - tm = self.terminal_manager - terms = [{'name': name} for name in tm.terminals] - self.finish(json.dumps(terms)) - - # Update the metric below to the length of the list 'terms' - TERMINAL_CURRENTLY_RUNNING_TOTAL.set( - len(terms) - ) + models = self.terminal_manager.list() + self.finish(json.dumps(models)) @web.authenticated def post(self): """POST /terminals creates a new terminal and redirects to it""" data = self.get_json_body() or {} - name, _ = self.terminal_manager.new_named_terminal(**data) - self.finish(json.dumps({'name': name})) - - # Increase the metric by one because a new terminal was created - TERMINAL_CURRENTLY_RUNNING_TOTAL.inc() + model = self.terminal_manager.create(**data) + self.finish(json.dumps(model)) class TerminalHandler(APIHandler): @@ -35,23 +24,11 @@ class TerminalHandler(APIHandler): @web.authenticated def get(self, name): - tm = self.terminal_manager - if name in tm.terminals: - self.finish(json.dumps({'name': name})) - else: - raise web.HTTPError(404, "Terminal not found: %r" % name) + model = self.terminal_manager.get(name) + self.finish(json.dumps(model)) @web.authenticated async def delete(self, name): - tm = self.terminal_manager - if name in tm.terminals: - await tm.terminate(name, force=True) - self.set_status(204) - self.finish() - - # Decrease the metric below by one - # because a terminal has been shutdown - TERMINAL_CURRENTLY_RUNNING_TOTAL.dec() - - else: - raise web.HTTPError(404, "Terminal not found: %r" % name) + await self.terminal_manager.terminate(name, force=True) + self.set_status(204) + self.finish() diff --git a/jupyter_server/terminal/handlers.py b/jupyter_server/terminal/handlers.py index 4b5f2e65a5..d01edd38e0 100644 --- a/jupyter_server/terminal/handlers.py +++ b/jupyter_server/terminal/handlers.py @@ -26,8 +26,14 @@ def get(self, *args, **kwargs): def on_message(self, message): super(TermSocket, self).on_message(message) - self.application.settings['terminal_last_activity'] = utcnow() + self._update_activity() def write_message(self, message, binary=False): super(TermSocket, self).write_message(message, binary=binary) - self.application.settings['terminal_last_activity'] = utcnow() \ No newline at end of file + self._update_activity() + + def _update_activity(self): + self.application.settings['terminal_last_activity'] = utcnow() + # terminal may not be around on deletion/cull + if self.term_name in self.terminal_manager.terminals: + self.terminal_manager.terminals[self.term_name].last_activity = utcnow() diff --git a/jupyter_server/terminal/terminalmanager.py b/jupyter_server/terminal/terminalmanager.py new file mode 100644 index 0000000000..38144855b2 --- /dev/null +++ b/jupyter_server/terminal/terminalmanager.py @@ -0,0 +1,150 @@ +"""A MultiTerminalManager for use in the notebook webserver +- raises HTTPErrors +- creates REST API models +""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +import terminado + +from datetime import timedelta +from jupyter_server._tz import utcnow, isoformat +from tornado import web +from tornado.ioloop import IOLoop, PeriodicCallback +from traitlets import Integer +from traitlets.config import LoggingConfigurable +from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL + + +class TerminalManager(LoggingConfigurable, terminado.NamedTermManager): + """ """ + + _culler_callback = None + + _initialized_culler = False + + cull_inactive_timeout = Integer(0, config=True, + help="""Timeout (in seconds) in which a terminal has been inactive and ready to be culled. + Values of 0 or lower disable culling.""" + ) + + cull_interval_default = 300 # 5 minutes + cull_interval = Integer(cull_interval_default, config=True, + help="""The interval (in seconds) on which to check for terminals exceeding the inactive timeout value.""" + ) + + # ------------------------------------------------------------------------- + # Methods for managing terminals + # ------------------------------------------------------------------------- + def __init__(self, *args, **kwargs): + super(TerminalManager, self).__init__(*args, **kwargs) + + def create(self, **kwargs): + """Create a new terminal.""" + name, term = self.new_named_terminal(**kwargs) + # Monkey-patch last-activity, similar to kernels. Should we need + # more functionality per terminal, we can look into possible sub- + # classing or containment then. + term.last_activity = utcnow() + model = self.get_terminal_model(name) + # Increase the metric by one because a new terminal was created + TERMINAL_CURRENTLY_RUNNING_TOTAL.inc() + # Ensure culler is initialized + self._initialize_culler() + return model + + def get(self, name): + """Get terminal 'name'.""" + model = self.get_terminal_model(name) + return model + + def list(self): + """Get a list of all running terminals.""" + models = [self.get_terminal_model(name) for name in self.terminals] + + # Update the metric below to the length of the list 'terms' + TERMINAL_CURRENTLY_RUNNING_TOTAL.set( + len(models) + ) + return models + + async def terminate(self, name, force=False): + """Terminate terminal 'name'.""" + self._check_terminal(name) + await super(TerminalManager, self).terminate(name, force=force) + + # Decrease the metric below by one + # because a terminal has been shutdown + TERMINAL_CURRENTLY_RUNNING_TOTAL.dec() + + async def terminate_all(self): + """Terminate all terminals.""" + terms = [name for name in self.terminals] + for term in terms: + await self.terminate(term, force=True) + + def get_terminal_model(self, name): + """Return a JSON-safe dict representing a terminal. + For use in representing terminals in the JSON APIs. + """ + self._check_terminal(name) + term = self.terminals[name] + model = { + "name": name, + "last_activity": isoformat(term.last_activity), + } + return model + + def _check_terminal(self, name): + """Check a that terminal 'name' exists and raise 404 if not.""" + if name not in self.terminals: + raise web.HTTPError(404, u'Terminal not found: %s' % name) + + def _initialize_culler(self): + """Start culler if 'cull_inactive_timeout' is greater than zero. + Regardless of that value, set flag that we've been here. + """ + if not self._initialized_culler and self.cull_inactive_timeout > 0: + if self._culler_callback is None: + loop = IOLoop.current() + if self.cull_interval <= 0: # handle case where user set invalid value + self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", + self.cull_interval, self.cull_interval_default) + self.cull_interval = self.cull_interval_default + self._culler_callback = PeriodicCallback( + self._cull_terminals, 1000 * self.cull_interval) + self.log.info("Culling terminals with inactivity > %s seconds at %s second intervals ...", + self.cull_inactive_timeout, self.cull_interval) + self._culler_callback.start() + + self._initialized_culler = True + + async def _cull_terminals(self): + self.log.debug("Polling every %s seconds for terminals inactive for > %s seconds...", + self.cull_interval, self.cull_inactive_timeout) + # Create a separate list of terminals to avoid conflicting updates while iterating + for name in list(self.terminals): + try: + await self._cull_inactive_terminal(name) + except Exception as e: + self.log.exception("The following exception was encountered while checking the " + "activity of terminal {}: {}".format(name, e)) + + async def _cull_inactive_terminal(self, name): + try: + term = self.terminals[name] + except KeyError: + return # KeyErrors are somewhat expected since the terminal can be terminated as the culling check is made. + + self.log.debug("name=%s, last_activity=%s", name, term.last_activity) + if hasattr(term, 'last_activity'): + dt_now = utcnow() + dt_inactive = dt_now - term.last_activity + # Compute idle properties + is_time = dt_inactive > timedelta(seconds=self.cull_inactive_timeout) + # Cull the kernel if all three criteria are met + if (is_time): + inactivity = int(dt_inactive.total_seconds()) + self.log.warning("Culling terminal '%s' due to %s seconds of inactivity.", name, inactivity) + await self.terminate(name, force=True) diff --git a/jupyter_server/tests/test_terminal.py b/jupyter_server/tests/test_terminal.py index c8534868e1..18b776f5f2 100644 --- a/jupyter_server/tests/test_terminal.py +++ b/jupyter_server/tests/test_terminal.py @@ -5,6 +5,9 @@ import asyncio import sys +from tornado.httpclient import HTTPClientError +from traitlets.config import Config + # Skip this whole module on Windows. The terminal API leads # to timeouts on Windows CI. if sys.platform.startswith('win'): @@ -30,12 +33,42 @@ def terminal_path(tmp_path): shutil.rmtree(str(subdir), ignore_errors=True) +CULL_TIMEOUT = 2 +CULL_INTERVAL = 3 + + +@pytest.fixture +def jp_server_config(): + return Config({ + 'ServerApp': { + 'TerminalManager': { + 'cull_inactive_timeout': CULL_TIMEOUT, + 'cull_interval': CULL_INTERVAL + } + } + }) + + +async def test_no_terminals(jp_fetch): + resp_list = await jp_fetch( + 'api', 'terminals', + method='GET', + allow_nonstandard_methods=True, + ) + + data = json.loads(resp_list.body.decode()) + + assert len(data) == 0 + + async def test_terminal_create(jp_fetch, kill_all): - await jp_fetch( + resp = await jp_fetch( 'api', 'terminals', method='POST', allow_nonstandard_methods=True, ) + term = json.loads(resp.body.decode()) + assert term['name'] == "1" resp_list = await jp_fetch( 'api', 'terminals', @@ -46,6 +79,7 @@ async def test_terminal_create(jp_fetch, kill_all): data = json.loads(resp_list.body.decode()) assert len(data) == 1 + assert data[0] == term await kill_all() @@ -104,3 +138,41 @@ async def test_terminal_create_with_cwd(jp_fetch, jp_ws_fetch, terminal_path): ws.close() assert str(terminal_path) in message_stdout + + +async def test_culling_config(jp_server_config, jp_configurable_serverapp): + terminal_mgr_config = jp_configurable_serverapp().config.ServerApp.TerminalManager + assert terminal_mgr_config.cull_inactive_timeout == CULL_TIMEOUT + assert terminal_mgr_config.cull_interval == CULL_INTERVAL + terminal_mgr_settings = jp_configurable_serverapp().web_app.settings['terminal_manager'] + assert terminal_mgr_settings.cull_inactive_timeout == CULL_TIMEOUT + assert terminal_mgr_settings.cull_interval == CULL_INTERVAL + + +async def test_culling(jp_server_config, jp_fetch): + # POST request + resp = await jp_fetch( + 'api', 'terminals', + method='POST', + allow_nonstandard_methods=True, + ) + term = json.loads(resp.body.decode()) + term_1 = term['name'] + last_activity = term['last_activity'] + + culled = False + for i in range(10): # Culling should occur in a few seconds + try: + resp = await jp_fetch( + 'api', 'terminals', term_1, + method='GET', + allow_nonstandard_methods=True, + ) + except HTTPClientError as e: + assert e.code == 404 + culled = True + break + else: + await asyncio.sleep(1) + + assert culled