Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ipykernel >= 6 #737

Merged
merged 21 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ jobs:
sudo apt-get install -y libkrb5-dev
- name: Install package dependencies
run: |
python -m pip install --upgrade pip
pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils
pip install -r autovizwidget/requirements.txt -e autovizwidget
pip install -r sparkmagic/requirements.txt -e sparkmagic
pip install aiounittest
- name: Run hdijupyterutils tests
run: |
nosetests hdijupyterutils
Expand Down
2 changes: 1 addition & 1 deletion hdijupyterutils/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ipython>=4.0.2
nose
mock
ipywidgets>5.0.0
ipykernel>=4.2.2, <6.0.0
ipykernel>=4.2.2
jupyter>=1
pandas>=0.17.1
numpy>=1.16.5
Expand Down
2 changes: 1 addition & 1 deletion hdijupyterutils/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def version(path):
"nose",
"mock",
"ipywidgets>5.0.0",
"ipykernel>=4.2.2,<6.0.0",
"ipykernel>=4.2.2",
"jupyter>=1",
"pandas>=0.17.1",
"numpy",
Expand Down
2 changes: 1 addition & 1 deletion sparkmagic/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mock
pandas>=0.17.1
numpy
requests
ipykernel>=4.2.2, <6.0.0
ipykernel>=4.2.2
ipywidgets>5.0.0
notebook>=4.2
tornado>=4
Expand Down
4 changes: 2 additions & 2 deletions sparkmagic/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def read(path, encoding="utf-8"):


def version(path):
"""Obtain the package version from a python file e.g. pkg/__init__.py
"""Obtain the package version from a python file e.g. pkg/__init__.py.

See <https://packaging.python.org/en/latest/single_source_version.html>.
"""
Expand Down Expand Up @@ -89,7 +89,7 @@ def version(path):
"pandas>=0.17.1",
"numpy",
"requests",
"ipykernel<6.0.0",
"ipykernel>=4.2.2",
"ipywidgets>5.0.0",
"notebook>=4.2",
"tornado>=4",
Expand Down
201 changes: 140 additions & 61 deletions sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
# Copyright (c) 2015 aggftw@gmail.com
# Distributed under the terms of the Modified BSD License.
try:
from asyncio import Future
except ImportError:
class Future(object):
"""A class nothing will use."""

import asyncio
import requests


from ipykernel.ipkernel import IPythonKernel
from hdijupyterutils.ipythondisplay import IpythonDisplay

import sparkmagic.utils.configuration as conf
from sparkmagic.utils.sparklogger import SparkLog
from sparkmagic.livyclientlib.exceptions import wrap_unexpected_exceptions
from sparkmagic.livyclientlib.exceptions import async_wrap_unexpected_exceptions
from sparkmagic.kernels.wrapperkernel.usercodeparser import UserCodeParser


class SparkKernelBase(IPythonKernel):
def __init__(self, implementation, implementation_version, language, language_version, language_info,
session_language, user_code_parser=None, **kwargs):
def __init__(
self,
implementation,
implementation_version,
language,
language_version,
language_info,
session_language,
user_code_parser=None,
**kwargs
):
# Required by Jupyter - Override
self.implementation = implementation
self.implementation_version = implementation_version
Expand All @@ -29,7 +35,7 @@ def __init__(self, implementation, implementation_version, language, language_ve
# Override
self.session_language = session_language

super(SparkKernelBase, self).__init__(**kwargs)
super().__init__(**kwargs)

self.logger = SparkLog(u"{}_jupyter_kernel".format(self.session_language))
self._fatal_error = None
Expand All @@ -43,108 +49,181 @@ def __init__(self, implementation, implementation_version, language, language_ve
# Disable warnings for test env in HDI
requests.packages.urllib3.disable_warnings()

# Do not load magics in testing
if not kwargs.get("testing", False):
self._load_magics_extension()
self._change_language()
# Get and use asyncio event loop to run async functions
# from a synchronous init function
# Python 3.6 compatibility
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._load_magics_extension())
loop.run_until_complete(self._change_language())
if conf.use_auto_viz():
self._register_auto_viz()
loop.run_until_complete(self._register_auto_viz())

def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False):
def f(self):
async def do_execute(
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
):
async def f(self):
if self._fatal_error is not None:
return self._repeat_fatal_error()
return await self._repeat_fatal_error()

return await self._do_execute(
code, silent, store_history, user_expressions, allow_stdin
)

return self._do_execute(code, silent, store_history, user_expressions, allow_stdin)
return wrap_unexpected_exceptions(f, self._complete_cell)(self)
# Execute the code and handle exceptions
wrapped = async_wrap_unexpected_exceptions(f, self._complete_cell)
return await wrapped(self)

def do_shutdown(self, restart):
async def do_shutdown(self, restart):
# Cleanup
self._delete_session()
await self._delete_session()

return self._do_shutdown_ipykernel(restart)
return await self._do_shutdown_ipykernel(restart)

def _do_execute(self, code, silent, store_history, user_expressions, allow_stdin):
async def _do_execute(
self, code, silent, store_history, user_expressions, allow_stdin
):
code_to_run = self.user_code_parser.get_code_to_run(code)

res = self._execute_cell(code_to_run, silent, store_history, user_expressions, allow_stdin)
res = await self._execute_cell(
code_to_run, silent, store_history, user_expressions, allow_stdin
)

return res

def _load_magics_extension(self):
async def _load_magics_extension(self):
register_magics_code = "%load_ext sparkmagic.kernels"
self._execute_cell(register_magics_code, True, False, shutdown_if_error=True,
log_if_error="Failed to load the Spark kernels magics library.")
await self._execute_cell(
register_magics_code,
True,
False,
shutdown_if_error=True,
log_if_error="Failed to load the Spark kernels magics library.",
)
self.logger.debug("Loaded magics.")

def _change_language(self):
register_magics_code = "%%_do_not_call_change_language -l {}\n ".format(self.session_language)
self._execute_cell(register_magics_code, True, False, shutdown_if_error=True,
log_if_error="Failed to change language to {}.".format(self.session_language))
async def _change_language(self):
register_magics_code = "%%_do_not_call_change_language -l {}\n ".format(
self.session_language
)
await self._execute_cell(
register_magics_code,
True,
False,
shutdown_if_error=True,
log_if_error="Failed to change language to {}.".format(
self.session_language
),
)
self.logger.debug("Changed language.")

def _register_auto_viz(self):
async def _register_auto_viz(self):
from sparkmagic.utils.sparkevents import get_spark_events_handler
import autovizwidget.utils.configuration as c

handler = get_spark_events_handler()
c.override("events_handler", handler)

register_auto_viz_code = """from autovizwidget.widget.utils import display_dataframe
ip = get_ipython()
ip.display_formatter.ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)"""
self._execute_cell(register_auto_viz_code, True, False, shutdown_if_error=True,
log_if_error="Failed to register auto viz for notebook.")
await self._execute_cell(
register_auto_viz_code,
True,
False,
shutdown_if_error=True,
log_if_error="Failed to register auto viz for notebook.",
)
self.logger.debug("Registered auto viz.")

def _delete_session(self):
async def _delete_session(self):
code = "%%_do_not_call_delete_session\n "
self._execute_cell_for_user(code, True, False)

def _execute_cell(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False,
shutdown_if_error=False, log_if_error=None):
reply_content = self._execute_cell_for_user(code, silent, store_history, user_expressions, allow_stdin)
await self._execute_cell_for_user(code, True, False)

async def _execute_cell(
self,
code,
silent,
store_history=True,
user_expressions=None,
allow_stdin=False,
shutdown_if_error=False,
log_if_error=None,
):
reply_content = await self._execute_cell_for_user(
code, silent, store_history, user_expressions, allow_stdin
)

if shutdown_if_error and reply_content[u"status"] == u"error":
error_from_reply = reply_content[u"evalue"]
if log_if_error is not None:
message = "{}\nException details:\n\t\"{}\"".format(log_if_error, error_from_reply)
return self._abort_with_fatal_error(message)
message = '{}\nException details:\n\t"{}"'.format(
log_if_error, error_from_reply
)
return await self._abort_with_fatal_error(message)

return reply_content

def _execute_cell_for_user(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False):
result = super(SparkKernelBase, self).do_execute(code, silent, store_history, user_expressions, allow_stdin)
if isinstance(result, Future):
result = result.result()
async def _execute_cell_for_user(
self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
):
result = super().do_execute(
code, silent, store_history, user_expressions, allow_stdin
)

# In ipykernel 6, this returns native asyncio coroutine
if asyncio.iscoroutine(result):
return await result

# In ipykernel 5, this returns gen.coroutine
if asyncio.isfuture(result):
return result.result()

# In ipykernel 4, this func is synchronous
return result

def _do_shutdown_ipykernel(self, restart):
return super(SparkKernelBase, self).do_shutdown(restart)
async def _do_shutdown_ipykernel(self, restart):
# INVESTIGATE: Inspect if should await
result = super().do_shutdown(restart)

def _complete_cell(self):
"""A method that runs a cell with no effect. Call this and return the value it
returns when there's some sort of error preventing the user's cell from executing; this
will register the cell from the Jupyter UI as being completed."""
return self._execute_cell("None", False, True, None, False)
# In tests, super() calls this SparkKernelBase.do_shutdown, which is async
if asyncio.iscoroutine(result):
return await result

return result

async def _complete_cell(self):
"""A method that runs a cell with no effect.

Call this and return the value it returns when there's some sort
of error preventing the user's cell from executing; this will
register the cell from the Jupyter UI as being completed.
"""
return await self._execute_cell("None", False, True, None, False)

def _show_user_error(self, message):
self.logger.error(message)
self.ipython_display.send_error(message)

def _queue_fatal_error(self, message):
"""Queues up a fatal error to be thrown when the next cell is executed; does not
raise an error immediately. We use this for errors that happen on kernel startup,
since IPython crashes if we throw an exception in the __init__ method."""
"""Queues up a fatal error to be thrown when the next cell is executed;
does not raise an error immediately.

We use this for errors that happen on kernel startup, since
IPython crashes if we throw an exception in the __init__ method.
"""
self._fatal_error = message

def _abort_with_fatal_error(self, message):
async def _abort_with_fatal_error(self, message):
"""Queues up a fatal error and throws it immediately."""
self._queue_fatal_error(message)
return self._repeat_fatal_error()
return await self._repeat_fatal_error()

def _repeat_fatal_error(self):
async def _repeat_fatal_error(self):
"""Throws an error that has already been queued."""
error = conf.fatal_error_suggestion().format(self._fatal_error)
self.logger.error(error)
self.ipython_display.send_error(error)
return self._complete_cell()
return await self._complete_cell()
Loading