Skip to content

Commit

Permalink
Add support for concurrent kernel startup (and restarts)
Browse files Browse the repository at this point in the history
Updated start_kernel and restart_kernel methods to coroutines.

Converted various `time.sleep()` to `yield gen.sleep()` which enables
tornado to "slice-out" to a different request. As a result, converting
`confirm_remote_startup()` and `handle_timeout()` to coroutines was key.

Fixes jupyter-server#86
  • Loading branch information
kevin-bates committed Feb 20, 2019
1 parent f128840 commit a752f3f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 49 deletions.
6 changes: 4 additions & 2 deletions enterprise_gateway/services/kernels/remotemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def __init__(self, **kwargs):
self.user_overrides = {}
self.restarting = False # need to track whether we're in a restart situation or not

@gen.coroutine
def start_kernel(self, **kwargs):
"""Starts a kernel in a separate process.
Expand All @@ -222,7 +223,7 @@ def start_kernel(self, **kwargs):
process_proxy_class = import_item(process_proxy_class_name)
self.process_proxy = process_proxy_class(kernel_manager=self, proxy_config=process_proxy.get('config'))
self._capture_user_overrides(**kwargs)
super(RemoteKernelManager, self).start_kernel(**kwargs)
yield super(RemoteKernelManager, self).start_kernel(**kwargs)

def _capture_user_overrides(self, **kwargs):
"""
Expand Down Expand Up @@ -286,6 +287,7 @@ def request_shutdown(self, restart=False):
if isinstance(self.process_proxy, RemoteProcessProxy):
self.process_proxy.shutdown_listener()

@gen.coroutine
def restart_kernel(self, now=False, **kwargs):
"""Restarts a kernel with the arguments that were used to launch it.
Expand Down Expand Up @@ -319,7 +321,7 @@ def restart_kernel(self, now=False, **kwargs):
# Use the parent mapping kernel manager so activity monitoring and culling is also shutdown
self.parent.shutdown_kernel(kernel_id, now=now)
return
super(RemoteKernelManager, self).restart_kernel(now, **kwargs)
yield gen.maybe_future(super(RemoteKernelManager, self).restart_kernel(now, **kwargs))
if isinstance(self.process_proxy, RemoteProcessProxy): # for remote kernels...
# Re-establish activity watching...
if self._activity_stream:
Expand Down
19 changes: 12 additions & 7 deletions enterprise_gateway/services/processproxies/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import socket
import re

from tornado import gen

from jupyter_client import launch_kernel, localinterfaces

from .processproxy import RemoteProcessProxy
Expand All @@ -34,6 +36,7 @@ def __init__(self, kernel_manager, proxy_config):
self.conductor_endpoint = proxy_config.get('conductor_endpoint',
kernel_manager.parent.parent.conductor_endpoint)

@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
"""Launches the specified process within a Conductor cluster environment."""
super(ConductorClusterProcessProxy, self).launch_process(kernel_cmd, **kwargs)
Expand All @@ -55,9 +58,8 @@ def launch_process(self, kernel_cmd, **kwargs):
self.env = kwargs.get('env')
self.log.debug("Conductor cluster kernel launched using Conductor endpoint: {}, pid: {}, Kernel ID: {}, "
"cmd: '{}'".format(self.conductor_endpoint, self.local_proc.pid, self.kernel_id, kernel_cmd))
self.confirm_remote_startup()

return self
yield self.confirm_remote_startup()
raise gen.Return(self)

def _update_launch_info(self, kernel_cmd, **kwargs):
""" Dynamically assemble the spark-submit configuration passed from NB2KG."""
Expand Down Expand Up @@ -114,6 +116,7 @@ def send_signal(self, signum):
else:
return super(ConductorClusterProcessProxy, self).send_signal(signum)

@gen.coroutine
def kill(self):
"""Kill a kernel.
:return: None if the application existed and is not in RUNNING state, False otherwise.
Expand All @@ -127,7 +130,7 @@ def kill(self):
i = 1
state = self._query_app_state_by_driver_id(self.driver_id)
while state not in ConductorClusterProcessProxy.final_states and i <= max_poll_attempts:
time.sleep(poll_interval)
yield gen.sleep(poll_interval)
state = self._query_app_state_by_driver_id(self.driver_id)
i = i + 1

Expand All @@ -138,7 +141,7 @@ def kill(self):

self.log.debug("ConductorClusterProcessProxy.kill, application ID: {}, kernel ID: {}, state: {}"
.format(self.application_id, self.kernel_id, state))
return result
raise gen.Return(result)

def cleanup(self):
# we might have a defunct process (if using waitAppCompletion = false) - so poll, kill, wait when we have
Expand Down Expand Up @@ -173,6 +176,7 @@ def _parse_driver_submission_id(self, submission_response):
self.driver_id = driver_id[0]
self.log.debug("Driver ID: {}".format(driver_id[0]))

@gen.coroutine
def confirm_remote_startup(self):
""" Confirms the application is in a started state before returning. Should post-RUNNING states be
unexpectedly encountered ('FINISHED', 'KILLED', 'RECLAIMED') then we must throw, otherwise the rest
Expand All @@ -187,7 +191,7 @@ def confirm_remote_startup(self):
output = self.local_proc.stderr.read().decode("utf-8")
self._parse_driver_submission_id(output)
i += 1
self.handle_timeout()
yield self.handle_timeout()

if self._get_application_id(True):
# Once we have an application ID, start monitoring state, obtain assigned host and get connection info
Expand Down Expand Up @@ -223,9 +227,10 @@ def _get_application_state(self):
self.assigned_ip = socket.gethostbyname(self.assigned_host)
return app_state

@gen.coroutine
def handle_timeout(self):
"""Checks to see if the kernel launch timeout has been exceeded while awaiting connection info."""
time.sleep(poll_interval)
yield gen.sleep(poll_interval)
time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time())

if time_interval > self.kernel_launch_timeout:
Expand Down
11 changes: 7 additions & 4 deletions enterprise_gateway/services/processproxies/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import urllib3 # docker ends up using this and it causes lots of noise, so turn off warnings

from tornado import gen

from jupyter_client import launch_kernel, localinterfaces

from .processproxy import RemoteProcessProxy
Expand Down Expand Up @@ -51,6 +53,7 @@ def _determine_kernel_images(self, proxy_config):
self.kernel_executor_image = proxy_config.get('executor_image_name')
self.kernel_executor_image = os.environ.get('KERNEL_EXECUTOR_IMAGE', self.kernel_executor_image)

@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
"""Launches the specified process within the container environment."""
# Set env before superclass call so we see these in the debug output
Expand All @@ -73,9 +76,8 @@ def launch_process(self, kernel_cmd, **kwargs):
self.log.info("{}: kernel launched. Kernel image: {}, KernelID: {}, cmd: '{}'"
.format(self.__class__.__name__, self.kernel_image, self.kernel_id, kernel_cmd))

self.confirm_remote_startup()

return self
yield self.confirm_remote_startup()
raise gen.Return(self)

def _enforce_uid_gid_blacklists(self, **kwargs):
"""Determine UID and GID with which to launch container and ensure they do not appear in blacklist."""
Expand Down Expand Up @@ -153,14 +155,15 @@ def cleanup(self):
self.kill()
super(ContainerProcessProxy, self).cleanup()

@gen.coroutine
def confirm_remote_startup(self):
"""Confirms the container has started and returned necessary connection information."""
self.start_time = RemoteProcessProxy.get_current_time()
i = 0
ready_to_connect = False # we're ready to connect when we have a connection file to use
while not ready_to_connect:
i += 1
self.handle_timeout()
yield self.handle_timeout()

container_status = self.get_container_status(str(i))
if container_status:
Expand Down
24 changes: 6 additions & 18 deletions enterprise_gateway/services/processproxies/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import os
import json
import time

from tornado import gen
from subprocess import STDOUT
from socket import gethostbyname

Expand All @@ -29,6 +29,7 @@ def __init__(self, kernel_manager, proxy_config):
else:
self.hosts = kernel_manager.parent.parent.remote_hosts # from command line or env

@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
"""Launches a kernel process on a selected host."""
super(DistributedProcessProxy, self).launch_process(kernel_cmd, **kwargs)
Expand All @@ -48,9 +49,8 @@ def launch_process(self, kernel_cmd, **kwargs):
self.log.info("Kernel launched on '{}', pid: {}, ID: {}, Log file: {}:{}, Command: '{}'. ".
format(self.assigned_host, self.pid, self.kernel_id, self.assigned_host,
self.kernel_log, kernel_cmd))
self.confirm_remote_startup()

return self
yield self.confirm_remote_startup()
raise gen.Return(self)

def _launch_remote_process(self, kernel_cmd, **kwargs):
"""
Expand Down Expand Up @@ -121,30 +121,18 @@ def _determine_next_host(self):
DistributedProcessProxy.host_index += 1
return next_host

@gen.coroutine
def confirm_remote_startup(self):
""" Confirms the remote kernel has started by obtaining connection information from the remote host."""
self.start_time = RemoteProcessProxy.get_current_time()
i = 0
ready_to_connect = False # we're ready to connect when we have a connection file to use
while not ready_to_connect:
i += 1
self.handle_timeout()
yield self.handle_timeout()

self.log.debug("{}: Waiting to connect. Host: '{}', KernelID: '{}'".
format(i, self.assigned_host, self.kernel_id))

if self.assigned_host != '':
ready_to_connect = self.receive_connection_info()

def handle_timeout(self):
"""Checks to see if the kernel launch timeout has been exceeded while awaiting connection info."""
time.sleep(poll_interval)
time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time())

if time_interval > self.kernel_launch_timeout:
reason = "Waited too long ({}s) to get connection file. Check Enterprise Gateway log and kernel " \
"log ({}:{}) for more information.".\
format(self.kernel_launch_timeout, self.assigned_host, self.kernel_log)
timeout_message = "KernelID: '{}' launch timeout due to: {}".format(self.kernel_id, reason)
self.kill()
self.log_and_raise(http_status_code=500, reason=timeout_message)
22 changes: 14 additions & 8 deletions enterprise_gateway/services/processproxies/processproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import random

from socket import timeout, socket, gethostbyname, gethostname, AF_INET, SOCK_STREAM, SHUT_RDWR, SHUT_WR
from tornado import web
from tornado import web, gen
from calendar import timegm
from ipython_genutils.py3compat import with_metaclass
from jupyter_client import launch_kernel, localinterfaces
Expand Down Expand Up @@ -150,6 +150,7 @@ def __init__(self, kernel_manager, proxy_config):
self.pgid = 0

@abc.abstractmethod
@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
"""Provides basic implementation for launching the process corresponding to the process proxy.
Expand Down Expand Up @@ -208,17 +209,18 @@ def poll(self):

return self.send_signal(0)

@gen.coroutine
def wait(self):
"""Wait for the process to become inactive."""
# If we have a local_proc, call its wait method. This will cleanup any defunct processes when the kernel
# is shutdown (when using waitAppCompletion = false). Otherwise (if no local_proc) we'll use polling to
# determine if a (remote or revived) process is still active.
if self.local_proc:
return self.local_proc.wait()
raise gen.Return(self.local_proc.wait())

for i in range(max_poll_attempts):
if self.poll():
time.sleep(poll_interval)
yield gen.sleep(poll_interval)
else:
break
else:
Expand Down Expand Up @@ -252,6 +254,7 @@ def send_signal(self, signum):
result = self.remote_signal(signum)
return result

@gen.coroutine
def kill(self):
"""Terminate the process proxy process.
Expand All @@ -263,7 +266,7 @@ def kill(self):
result = self.terminate() # Send -15 signal first
i = 1
while self.poll() is None and i <= max_poll_attempts:
time.sleep(poll_interval)
yield gen.sleep(poll_interval)
i = i + 1
if i > max_poll_attempts: # Send -9 signal if process is still alive
if self.local_proc:
Expand All @@ -276,7 +279,7 @@ def kill(self):
else:
result = self.remote_signal(signal.SIGKILL)
self.log.debug("SIGKILL signal sent to pid: {}".format(self.pid))
return result
raise gen.Return(result)

def terminate(self):
"""Gracefully terminate the process proxy process.
Expand Down Expand Up @@ -632,11 +635,12 @@ def __init__(self, kernel_manager, proxy_config):
super(LocalProcessProxy, self).__init__(kernel_manager, proxy_config)
kernel_manager.ip = localinterfaces.LOCALHOST

@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
super(LocalProcessProxy, self).launch_process(kernel_cmd, **kwargs)

# launch the local run.sh
self.local_proc = launch_kernel(kernel_cmd, **kwargs)
self.local_proc = yield gen.maybe_future(launch_kernel(kernel_cmd, **kwargs))
self.pid = self.local_proc.pid
if hasattr(os, "getpgid"):
try:
Expand All @@ -646,7 +650,7 @@ def launch_process(self, kernel_cmd, **kwargs):
self.ip = local_ip
self.log.info("Local kernel launched on '{}', pid: {}, pgid: {}, KernelID: {}, cmd: '{}'"
.format(self.ip, self.pid, self.pgid, self.kernel_id, kernel_cmd))
return self
raise gen.Return(self)


class RemoteProcessProxy(with_metaclass(abc.ABCMeta, BaseProcessProxyABC)):
Expand All @@ -664,6 +668,7 @@ def __init__(self, kernel_manager, proxy_config):
self.tunnel_processes = {}
self._prepare_response_socket()

@gen.coroutine
def launch_process(self, kernel_cmd, **kwargs):
# Pass along port-range info to kernels...
kwargs['env']['EG_MIN_PORT_RANGE_SIZE'] = str(min_port_range_size)
Expand Down Expand Up @@ -968,9 +973,10 @@ def _extract_pid_info(self, connect_info):
self.ip = self.assigned_ip
self.local_proc = None

@gen.coroutine
def handle_timeout(self):
"""Checks to see if the kernel launch timeout has been exceeded while awaiting connection info."""
time.sleep(poll_interval)
yield gen.sleep(poll_interval)
time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time())

if time_interval > self.kernel_launch_timeout:
Expand Down
Loading

0 comments on commit a752f3f

Please sign in to comment.