Skip to content

Commit

Permalink
Merge pull request #737 from jupyter-incubator/devstein/ipykernel-6
Browse files Browse the repository at this point in the history
Support ipykernel >= 6
  • Loading branch information
devstein authored May 2, 2022
2 parents 691018f + bab2c21 commit b5246e8
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 83 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
sudo apt-get install -y libkrb5-dev
- name: Install package dependencies
run: |
python -m pip install --upgrade pip
pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils
pip install -r autovizwidget/requirements.txt -e autovizwidget
pip install -r sparkmagic/requirements.txt -e sparkmagic
Expand Down
2 changes: 1 addition & 1 deletion hdijupyterutils/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ipython>=4.0.2
nose
mock
ipywidgets>5.0.0
ipykernel>=4.2.2, <6.0.0
ipykernel>=4.2.2
jupyter>=1
pandas>=0.17.1
numpy>=1.16.5
Expand Down
2 changes: 1 addition & 1 deletion hdijupyterutils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def version(path):
"nose",
"mock",
"ipywidgets>5.0.0",
"ipykernel>=4.2.2,<6.0.0",
"ipykernel>=4.2.2",
"jupyter>=1",
"pandas>=0.17.1",
"numpy",
Expand Down
3 changes: 2 additions & 1 deletion sparkmagic/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ mock
pandas>=0.17.1
numpy
requests
ipykernel>=4.2.2, <6.0.0
ipykernel>=4.2.2
ipywidgets>5.0.0
notebook>=4.2
tornado>=4
requests_kerberos>=0.8.0
nest_asyncio==1.5.5
3 changes: 2 additions & 1 deletion sparkmagic/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ def version(path):
"pandas>=0.17.1",
"numpy",
"requests",
"ipykernel<6.0.0",
"ipykernel>=4.2.2",
"ipywidgets>5.0.0",
"notebook>=4.2",
"tornado>=4",
"requests_kerberos>=0.8.0",
"nest_asyncio==1.5.5",
],
)
128 changes: 104 additions & 24 deletions sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Copyright (c) 2015 aggftw@gmail.com
# Distributed under the terms of the Modified BSD License.
try:
from asyncio import Future
except ImportError:

class Future(object):
"""A class nothing will use."""
import asyncio
import inspect
import requests


import requests
from ipykernel.ipkernel import IPythonKernel
from hdijupyterutils.ipythondisplay import IpythonDisplay

Expand All @@ -17,6 +13,55 @@ class Future(object):
from sparkmagic.livyclientlib.exceptions import wrap_unexpected_exceptions
from sparkmagic.kernels.wrapperkernel.usercodeparser import UserCodeParser

# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
import nest_asyncio


# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
# Taken from: https://github.com/jupyter/notebook/blob/eb3a1c24839205afcef0ba65ace2309d38300a2b/notebook/utils.py#L332
def run_sync(maybe_async):
"""If async, runs maybe_async and blocks until it has executed,
possibly creating an event loop.
If not async, just returns maybe_async as it is the result of something
that has already executed.
Parameters
----------
maybe_async : async or non-async object
The object to be executed, if it is async.
Returns
-------
result :
Whatever the async object returns, or the object itself.
"""
if not inspect.isawaitable(maybe_async):
# that was not something async, just return it
return maybe_async
# it is async, we need to run it in an event loop

def wrapped():
create_new_event_loop = False
result = None
loop = None
try:
loop = asyncio.get_event_loop()
except RuntimeError:
create_new_event_loop = True
else:
if loop.is_closed():
create_new_event_loop = True
if create_new_event_loop:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(maybe_async)
except RuntimeError as e:
if str(e) == "This event loop is already running":
# just return a Future, hoping that it will be awaited
result = asyncio.ensure_future(maybe_async)
return result

return wrapped()


class SparkKernelBase(IPythonKernel):
def __init__(
Expand All @@ -40,7 +85,16 @@ def __init__(
# Override
self.session_language = session_language

super(SparkKernelBase, self).__init__(**kwargs)
# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6
# Patch loop.run_until_complete as early as possible
try:
nest_asyncio.apply()
except RuntimeError:
# nest_asyncio requires a running loop in order to patch.
# In tests the loop may not have been created yet.
pass

super().__init__(**kwargs)

self.logger = SparkLog("{}_jupyter_kernel".format(self.session_language))
self._fatal_error = None
Expand All @@ -54,11 +108,15 @@ def __init__(
# Disable warnings for test env in HDI
requests.packages.urllib3.disable_warnings()

if not kwargs.get("testing", False):
self._load_magics_extension()
self._change_language()
if conf.use_auto_viz():
self._register_auto_viz()
# Do not load magics in testing
if kwargs.get("testing", False):
return

# Load magics on init
self._load_magics_extension()
self._change_language()
if conf.use_auto_viz():
self._register_auto_viz()

def do_execute(
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
Expand All @@ -71,7 +129,9 @@ def f(self):
code, silent, store_history, user_expressions, allow_stdin
)

return wrap_unexpected_exceptions(f, self._complete_cell)(self)
# Execute the code and handle exceptions
wrapped = wrap_unexpected_exceptions(f, self._complete_cell)
return wrapped(self)

def do_shutdown(self, restart):
# Cleanup
Expand Down Expand Up @@ -164,30 +224,50 @@ def _execute_cell(
def _execute_cell_for_user(
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
):
result = super(SparkKernelBase, self).do_execute(
result = super().do_execute(
code, silent, store_history, user_expressions, allow_stdin
)
if isinstance(result, Future):
result = result.result()

# In ipykernel 6, this returns native asyncio coroutine
if asyncio.iscoroutine(result):
return run_sync(result)

# In ipykernel 5, this returns gen.coroutine
if asyncio.isfuture(result):
return result.result()

# In ipykernel 4, this func is synchronous
return result

def _do_shutdown_ipykernel(self, restart):
return super(SparkKernelBase, self).do_shutdown(restart)
result = super().do_shutdown(restart)

# In tests, super() calls this SparkKernelBase.do_shutdown, which is async
if asyncio.iscoroutine(result):
return run_sync(result)

return result

def _complete_cell(self):
"""A method that runs a cell with no effect. Call this and return the value it
returns when there's some sort of error preventing the user's cell from executing; this
will register the cell from the Jupyter UI as being completed."""
"""A method that runs a cell with no effect.
Call this and return the value it returns when there's some sort
of error preventing the user's cell from executing; this will
register the cell from the Jupyter UI as being completed.
"""
return self._execute_cell("None", False, True, None, False)

def _show_user_error(self, message):
self.logger.error(message)
self.ipython_display.send_error(message)

def _queue_fatal_error(self, message):
"""Queues up a fatal error to be thrown when the next cell is executed; does not
raise an error immediately. We use this for errors that happen on kernel startup,
since IPython crashes if we throw an exception in the __init__ method."""
"""Queues up a fatal error to be thrown when the next cell is executed;
does not raise an error immediately.
We use this for errors that happen on kernel startup, since
IPython crashes if we throw an exception in the __init__ method.
"""
self._fatal_error = message

def _abort_with_fatal_error(self, message):
Expand Down
98 changes: 76 additions & 22 deletions sparkmagic/sparkmagic/livyclientlib/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,67 @@
from __future__ import print_function
import asyncio
import sys
import traceback
from sparkmagic.utils.constants import EXPECTED_ERROR_MSG, INTERNAL_ERROR_MSG


# == EXCEPTIONS ==
class LivyClientLibException(Exception):
"""Base class for all LivyClientLib exceptions. All exceptions that are explicitly raised by
code in this package should be a subclass of LivyClientLibException. If you need to account for a
new error condition, either use one of the existing LivyClientLibException subclasses,
or create a new subclass with a descriptive name and add it to this file.
"""Base class for all LivyClientLib exceptions. All exceptions that are
explicitly raised by code in this package should be a subclass of
LivyClientLibException. If you need to account for a new error condition,
either use one of the existing LivyClientLibException subclasses, or create
a new subclass with a descriptive name and add it to this file.
We distinguish between "expected" errors, which represent errors that a user is likely
to encounter in normal use, and "internal" errors, which represents exceptions that happen
due to a bug in the library. Check EXPECTED_EXCEPTIONS to see which exceptions
are considered "expected"."""
We distinguish between "expected" errors, which represent errors
that a user is likely to encounter in normal use, and "internal"
errors, which represents exceptions that happen due to a bug in the
library. Check EXPECTED_EXCEPTIONS to see which exceptions are
considered "expected".
"""


class HttpClientException(LivyClientLibException):
"""An exception thrown by the HTTP client when it fails to make a request."""
"""An exception thrown by the HTTP client when it fails to make a
request."""


class LivyClientTimeoutException(LivyClientLibException):
"""An exception for timeouts while interacting with Livy."""


class DataFrameParseException(LivyClientLibException):
"""An internal error which suggests a bad implementation of dataframe parsing from JSON --
if we get a JSON parsing error when parsing the results from the Livy server, this exception
is thrown."""
"""An internal error which suggests a bad implementation of dataframe
parsing from JSON -- if we get a JSON parsing error when parsing the
results from the Livy server, this exception is thrown."""


class LivyUnexpectedStatusException(LivyClientLibException):
"""An exception that will be shown if some unexpected error happens on the Livy side."""
"""An exception that will be shown if some unexpected error happens on the
Livy side."""


class SessionManagementException(LivyClientLibException):
"""An exception that is thrown by the Session Manager when it is a
given session name is invalid in some way."""
"""An exception that is thrown by the Session Manager when it is a given
session name is invalid in some way."""


class BadUserConfigurationException(LivyClientLibException):
"""An exception that is thrown when configuration provided by the user is invalid
in some way."""
"""An exception that is thrown when configuration provided by the user is
invalid in some way."""


class BadUserDataException(LivyClientLibException):
"""An exception that is thrown when data provided by the user is invalid
in some way."""
"""An exception that is thrown when data provided by the user is invalid in
some way."""


class SqlContextNotFoundException(LivyClientLibException):
"""Exception that is thrown when the SQL context is not found."""


class SparkStatementException(LivyClientLibException):
"""Exception that is thrown when an error occurs while parsing or executing Spark statements."""
"""Exception that is thrown when an error occurs while parsing or executing
Spark statements."""


# It has to be a KeyboardInterrupt to interrupt the notebook
Expand Down Expand Up @@ -85,7 +91,8 @@ def _show_tb(exc_type, exc_val, tb):


class SparkStatementCancellationFailedException(KeyboardInterrupt):
"""Exception that is thrown when a Spark statement is interrupted but fails to be cancelled in Livy."""
"""Exception that is thrown when a Spark statement is interrupted but fails
to be cancelled in Livy."""


# == DECORATORS FOR EXCEPTION HANDLING ==
Expand Down Expand Up @@ -166,3 +173,50 @@ def wrapped(self, *args, **kwargs):
wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
return wrapped


# async_wrap_unexpected_exceptions was created to handle async behavior ipykernel >=6
# It was safer to create a separate async wrapper than modify the original to accommodate both use-cases
def async_wrap_unexpected_exceptions(f, execute_if_error=None):
"""A decorator that catches all exceptions from the async function f and alerts the user about them.
Self can be any object with a "logger" attribute and a "ipython_display" attribute.
All exceptions are logged as "unexpected" exceptions, and a request is made to the user to file an issue
at the Github repository. If there is an error, returns None if execute_if_error is None, or else
returns the output of the function execute_if_error.
Usage:
@async_wrap_unexpected_exceptions
async def fn(self, ...):
..etc"""
from sparkmagic.utils import configuration as conf

async def handle_exception(self, e):
self.logger.error(
"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format(
e, traceback.format_exc()
)
)
self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e))
if execute_if_error is None:
return None

result = execute_if_error()
if asyncio.iscoroutine(result):
return await result

return result

async def wrapped(self, *args, **kwargs):
try:
out = f(self, *args, **kwargs)
if asyncio.iscoroutine(out):
out = await out
except Exception as err:
if conf.all_errors_are_fatal():
raise err
return await handle_exception(self, err)
else:
return out

wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
return wrapped
Loading

0 comments on commit b5246e8

Please sign in to comment.