Skip to content

Commit

Permalink
Add reconnection to Gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
sd committed Jan 8, 2021
1 parent d4438a2 commit ee2b12d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
15 changes: 13 additions & 2 deletions jupyter_server/gateway/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import logging
import mimetypes
import random
import asyncio

from ..base.handlers import APIHandler, JupyterHandler
from ..utils import url_path_join
Expand Down Expand Up @@ -133,6 +135,7 @@ def __init__(self, **kwargs):
self.ws = None
self.ws_future = Future()
self.disconnected = False
self.retry = 0

async def _connect(self, kernel_id, message_callback):
# websocket is initialized before connection
Expand All @@ -159,6 +162,7 @@ async def _connect(self, kernel_id, message_callback):
def _connection_done(self, fut):
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result()
self.retry = 0
self.log.debug("Connection is ready: ws: {}".format(self.ws))
else:
self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
Expand Down Expand Up @@ -192,8 +196,15 @@ async def _read_messages(self, callback):
else: # ws cancelled - stop reading
break

if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
# NOTE(esevan): if websocket is not disconnected by client, try to reconnect.
if not self.disconnected and self.retry < GatewayClient.instance().gateway_retry_max:
jitter = random.randint(10, 100) * 0.01
retry_interval = min(GatewayClient.instance().gateway_retry_interval * (2 ** self.retry),
GatewayClient.instance().gateway_retry_interval_max) + jitter
self.retry += 1
self.log.info("Attempting to re-establish the connection to Gateway in %s secs (%s/%s): %s",
retry_interval, self.retry, GatewayClient.instance().gateway_retry_max, self.kernel_id)
await asyncio.sleep(retry_interval)
loop = IOLoop.current()
loop.spawn_callback(self._connect, self.kernel_id, callback)

Expand Down
34 changes: 33 additions & 1 deletion jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from jupyter_client.kernelspec import KernelSpecManager
from ..utils import url_path_join

from traitlets import Instance, Unicode, Float, Bool, default, validate, TraitError
from traitlets import Instance, Unicode, Int, Float, Bool, default, validate, TraitError
from traitlets.config import SingletonConfigurable


Expand Down Expand Up @@ -220,6 +220,38 @@ def __init__(self, **kwargs):
def _env_whitelist_default(self):
return os.environ.get(self.env_whitelist_env, self.env_whitelist_default_value)

gateway_retry_interval_default_value = 1.0
gateway_retry_interval_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL'
gateway_retry_interval = Float(default_value=gateway_retry_interval_default_value, config=True,
help="""The time allowed for HTTP reconnection with the Gateway server for the first time.
Next will be JUPYTER_GATEWAY_RETRY_INTERVAL multiplied by two in factor of numbers of retries
but less than JUPYTER_GATEWAY_RETRY_INTERVAL_MAX.
(JUPYTER_GATEWAY_RETRY_INTERVAL env var)""")

@default('gateway_retry_interval')
def gateway_retry_interval_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL', self.gateway_retry_interval_default_value))

gateway_retry_interval_max_default_value = 30.0
gateway_retry_interval_max_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL_MAX'
gateway_retry_interval_max = Float(default_value=gateway_retry_interval_max_default_value, config=True,
help="""The maximum time allowed for HTTP reconnection retry with the Gateway server.
(JUPYTER_GATEWAY_RETRY_INTERVAL_MAX env var)""")

@default('gateway_retry_interval_max')
def gateway_retry_interval_max_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL_MAX', self.gateway_retry_interval_max_default_value))

gateway_retry_max_default_value = 5
gateway_retry_max_env = 'JUPYTER_GATEWAY_RETRY_MAX'
gateway_retry_max = Int(default_value=gateway_retry_max_default_value, config=True,
help="""The maximum retries allowed for HTTP reconnection with the Gateway server.
(JUPYTER_GATEWAY_RETRY_MAX env var)""")

@default('gateway_retry_max')
def gateway_retry_max_default(self):
return int(os.environ.get('JUPYTER_GATEWAY_RETRY_MAX', self.gateway_retry_max_default_value))

@property
def gateway_enabled(self):
return bool(self.url is not None and len(self.url) > 0)
Expand Down

0 comments on commit ee2b12d

Please sign in to comment.