diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6887b049..e8a90d2c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,6 +21,7 @@ jobs: sudo apt-get install -y libkrb5-dev - name: Install package dependencies run: | + python -m pip install --upgrade pip pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils pip install -r autovizwidget/requirements.txt -e autovizwidget pip install -r sparkmagic/requirements.txt -e sparkmagic diff --git a/hdijupyterutils/requirements.txt b/hdijupyterutils/requirements.txt index 279f124b..6554662c 100644 --- a/hdijupyterutils/requirements.txt +++ b/hdijupyterutils/requirements.txt @@ -2,7 +2,7 @@ ipython>=4.0.2 nose mock ipywidgets>5.0.0 -ipykernel>=4.2.2, <6.0.0 +ipykernel>=4.2.2 jupyter>=1 pandas>=0.17.1 numpy>=1.16.5 diff --git a/hdijupyterutils/setup.py b/hdijupyterutils/setup.py index 51d1b30d..807afcc5 100644 --- a/hdijupyterutils/setup.py +++ b/hdijupyterutils/setup.py @@ -60,7 +60,7 @@ def version(path): "nose", "mock", "ipywidgets>5.0.0", - "ipykernel>=4.2.2,<6.0.0", + "ipykernel>=4.2.2", "jupyter>=1", "pandas>=0.17.1", "numpy", diff --git a/sparkmagic/requirements.txt b/sparkmagic/requirements.txt index 654d5b24..426d7b2b 100644 --- a/sparkmagic/requirements.txt +++ b/sparkmagic/requirements.txt @@ -6,8 +6,9 @@ mock pandas>=0.17.1 numpy requests -ipykernel>=4.2.2, <6.0.0 +ipykernel>=4.2.2 ipywidgets>5.0.0 notebook>=4.2 tornado>=4 requests_kerberos>=0.8.0 +nest_asyncio==1.5.5 diff --git a/sparkmagic/setup.py b/sparkmagic/setup.py index cef28b86..06166615 100644 --- a/sparkmagic/setup.py +++ b/sparkmagic/setup.py @@ -88,10 +88,11 @@ def version(path): "pandas>=0.17.1", "numpy", "requests", - "ipykernel<6.0.0", + "ipykernel>=4.2.2", "ipywidgets>5.0.0", "notebook>=4.2", "tornado>=4", "requests_kerberos>=0.8.0", + "nest_asyncio==1.5.5", ], ) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index bd45b2ef..8bdb3f59 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -1,14 +1,10 @@ # Copyright (c) 2015 aggftw@gmail.com # Distributed under the terms of the Modified BSD License. -try: - from asyncio import Future -except ImportError: - - class Future(object): - """A class nothing will use.""" +import asyncio +import inspect +import requests -import requests from ipykernel.ipkernel import IPythonKernel from hdijupyterutils.ipythondisplay import IpythonDisplay @@ -17,6 +13,55 @@ class Future(object): from sparkmagic.livyclientlib.exceptions import wrap_unexpected_exceptions from sparkmagic.kernels.wrapperkernel.usercodeparser import UserCodeParser +# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 +import nest_asyncio + + +# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 +# Taken from: https://github.com/jupyter/notebook/blob/eb3a1c24839205afcef0ba65ace2309d38300a2b/notebook/utils.py#L332 +def run_sync(maybe_async): + """If async, runs maybe_async and blocks until it has executed, + possibly creating an event loop. + If not async, just returns maybe_async as it is the result of something + that has already executed. + Parameters + ---------- + maybe_async : async or non-async object + The object to be executed, if it is async. + Returns + ------- + result : + Whatever the async object returns, or the object itself. + """ + if not inspect.isawaitable(maybe_async): + # that was not something async, just return it + return maybe_async + # it is async, we need to run it in an event loop + + def wrapped(): + create_new_event_loop = False + result = None + loop = None + try: + loop = asyncio.get_event_loop() + except RuntimeError: + create_new_event_loop = True + else: + if loop.is_closed(): + create_new_event_loop = True + if create_new_event_loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(maybe_async) + except RuntimeError as e: + if str(e) == "This event loop is already running": + # just return a Future, hoping that it will be awaited + result = asyncio.ensure_future(maybe_async) + return result + + return wrapped() + class SparkKernelBase(IPythonKernel): def __init__( @@ -40,7 +85,16 @@ def __init__( # Override self.session_language = session_language - super(SparkKernelBase, self).__init__(**kwargs) + # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 + # Patch loop.run_until_complete as early as possible + try: + nest_asyncio.apply() + except RuntimeError: + # nest_asyncio requires a running loop in order to patch. + # In tests the loop may not have been created yet. + pass + + super().__init__(**kwargs) self.logger = SparkLog("{}_jupyter_kernel".format(self.session_language)) self._fatal_error = None @@ -54,11 +108,15 @@ def __init__( # Disable warnings for test env in HDI requests.packages.urllib3.disable_warnings() - if not kwargs.get("testing", False): - self._load_magics_extension() - self._change_language() - if conf.use_auto_viz(): - self._register_auto_viz() + # Do not load magics in testing + if kwargs.get("testing", False): + return + + # Load magics on init + self._load_magics_extension() + self._change_language() + if conf.use_auto_viz(): + self._register_auto_viz() def do_execute( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False @@ -71,7 +129,9 @@ def f(self): code, silent, store_history, user_expressions, allow_stdin ) - return wrap_unexpected_exceptions(f, self._complete_cell)(self) + # Execute the code and handle exceptions + wrapped = wrap_unexpected_exceptions(f, self._complete_cell) + return wrapped(self) def do_shutdown(self, restart): # Cleanup @@ -164,20 +224,37 @@ def _execute_cell( def _execute_cell_for_user( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False ): - result = super(SparkKernelBase, self).do_execute( + result = super().do_execute( code, silent, store_history, user_expressions, allow_stdin ) - if isinstance(result, Future): - result = result.result() + + # In ipykernel 6, this returns native asyncio coroutine + if asyncio.iscoroutine(result): + return run_sync(result) + + # In ipykernel 5, this returns gen.coroutine + if asyncio.isfuture(result): + return result.result() + + # In ipykernel 4, this func is synchronous return result def _do_shutdown_ipykernel(self, restart): - return super(SparkKernelBase, self).do_shutdown(restart) + result = super().do_shutdown(restart) + + # In tests, super() calls this SparkKernelBase.do_shutdown, which is async + if asyncio.iscoroutine(result): + return run_sync(result) + + return result def _complete_cell(self): - """A method that runs a cell with no effect. Call this and return the value it - returns when there's some sort of error preventing the user's cell from executing; this - will register the cell from the Jupyter UI as being completed.""" + """A method that runs a cell with no effect. + + Call this and return the value it returns when there's some sort + of error preventing the user's cell from executing; this will + register the cell from the Jupyter UI as being completed. + """ return self._execute_cell("None", False, True, None, False) def _show_user_error(self, message): @@ -185,9 +262,12 @@ def _show_user_error(self, message): self.ipython_display.send_error(message) def _queue_fatal_error(self, message): - """Queues up a fatal error to be thrown when the next cell is executed; does not - raise an error immediately. We use this for errors that happen on kernel startup, - since IPython crashes if we throw an exception in the __init__ method.""" + """Queues up a fatal error to be thrown when the next cell is executed; + does not raise an error immediately. + + We use this for errors that happen on kernel startup, since + IPython crashes if we throw an exception in the __init__ method. + """ self._fatal_error = message def _abort_with_fatal_error(self, message): diff --git a/sparkmagic/sparkmagic/livyclientlib/exceptions.py b/sparkmagic/sparkmagic/livyclientlib/exceptions.py index b048aa49..404269b8 100644 --- a/sparkmagic/sparkmagic/livyclientlib/exceptions.py +++ b/sparkmagic/sparkmagic/livyclientlib/exceptions.py @@ -1,4 +1,4 @@ -from __future__ import print_function +import asyncio import sys import traceback from sparkmagic.utils.constants import EXPECTED_ERROR_MSG, INTERNAL_ERROR_MSG @@ -6,19 +6,23 @@ # == EXCEPTIONS == class LivyClientLibException(Exception): - """Base class for all LivyClientLib exceptions. All exceptions that are explicitly raised by - code in this package should be a subclass of LivyClientLibException. If you need to account for a - new error condition, either use one of the existing LivyClientLibException subclasses, - or create a new subclass with a descriptive name and add it to this file. + """Base class for all LivyClientLib exceptions. All exceptions that are + explicitly raised by code in this package should be a subclass of + LivyClientLibException. If you need to account for a new error condition, + either use one of the existing LivyClientLibException subclasses, or create + a new subclass with a descriptive name and add it to this file. - We distinguish between "expected" errors, which represent errors that a user is likely - to encounter in normal use, and "internal" errors, which represents exceptions that happen - due to a bug in the library. Check EXPECTED_EXCEPTIONS to see which exceptions - are considered "expected".""" + We distinguish between "expected" errors, which represent errors + that a user is likely to encounter in normal use, and "internal" + errors, which represents exceptions that happen due to a bug in the + library. Check EXPECTED_EXCEPTIONS to see which exceptions are + considered "expected". + """ class HttpClientException(LivyClientLibException): - """An exception thrown by the HTTP client when it fails to make a request.""" + """An exception thrown by the HTTP client when it fails to make a + request.""" class LivyClientTimeoutException(LivyClientLibException): @@ -26,28 +30,29 @@ class LivyClientTimeoutException(LivyClientLibException): class DataFrameParseException(LivyClientLibException): - """An internal error which suggests a bad implementation of dataframe parsing from JSON -- - if we get a JSON parsing error when parsing the results from the Livy server, this exception - is thrown.""" + """An internal error which suggests a bad implementation of dataframe + parsing from JSON -- if we get a JSON parsing error when parsing the + results from the Livy server, this exception is thrown.""" class LivyUnexpectedStatusException(LivyClientLibException): - """An exception that will be shown if some unexpected error happens on the Livy side.""" + """An exception that will be shown if some unexpected error happens on the + Livy side.""" class SessionManagementException(LivyClientLibException): - """An exception that is thrown by the Session Manager when it is a - given session name is invalid in some way.""" + """An exception that is thrown by the Session Manager when it is a given + session name is invalid in some way.""" class BadUserConfigurationException(LivyClientLibException): - """An exception that is thrown when configuration provided by the user is invalid - in some way.""" + """An exception that is thrown when configuration provided by the user is + invalid in some way.""" class BadUserDataException(LivyClientLibException): - """An exception that is thrown when data provided by the user is invalid - in some way.""" + """An exception that is thrown when data provided by the user is invalid in + some way.""" class SqlContextNotFoundException(LivyClientLibException): @@ -55,7 +60,8 @@ class SqlContextNotFoundException(LivyClientLibException): class SparkStatementException(LivyClientLibException): - """Exception that is thrown when an error occurs while parsing or executing Spark statements.""" + """Exception that is thrown when an error occurs while parsing or executing + Spark statements.""" # It has to be a KeyboardInterrupt to interrupt the notebook @@ -85,7 +91,8 @@ def _show_tb(exc_type, exc_val, tb): class SparkStatementCancellationFailedException(KeyboardInterrupt): - """Exception that is thrown when a Spark statement is interrupted but fails to be cancelled in Livy.""" + """Exception that is thrown when a Spark statement is interrupted but fails + to be cancelled in Livy.""" # == DECORATORS FOR EXCEPTION HANDLING == @@ -166,3 +173,50 @@ def wrapped(self, *args, **kwargs): wrapped.__name__ = f.__name__ wrapped.__doc__ = f.__doc__ return wrapped + + +# async_wrap_unexpected_exceptions was created to handle async behavior ipykernel >=6 +# It was safer to create a separate async wrapper than modify the original to accommodate both use-cases +def async_wrap_unexpected_exceptions(f, execute_if_error=None): + """A decorator that catches all exceptions from the async function f and alerts the user about them. + Self can be any object with a "logger" attribute and a "ipython_display" attribute. + All exceptions are logged as "unexpected" exceptions, and a request is made to the user to file an issue + at the Github repository. If there is an error, returns None if execute_if_error is None, or else + returns the output of the function execute_if_error. + Usage: + @async_wrap_unexpected_exceptions + async def fn(self, ...): + ..etc""" + from sparkmagic.utils import configuration as conf + + async def handle_exception(self, e): + self.logger.error( + "ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( + e, traceback.format_exc() + ) + ) + self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e)) + if execute_if_error is None: + return None + + result = execute_if_error() + if asyncio.iscoroutine(result): + return await result + + return result + + async def wrapped(self, *args, **kwargs): + try: + out = f(self, *args, **kwargs) + if asyncio.iscoroutine(out): + out = await out + except Exception as err: + if conf.all_errors_are_fatal(): + raise err + return await handle_exception(self, err) + else: + return out + + wrapped.__name__ = f.__name__ + wrapped.__doc__ = f.__doc__ + return wrapped diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 6d383b64..7a265974 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,5 +1,7 @@ -import ipykernel -from mock import MagicMock, call, patch +import asyncio + + +from unittest.mock import MagicMock, call, patch from nose.tools import with_setup from sparkmagic.kernels.wrapperkernel.sparkkernelbase import SparkKernelBase @@ -10,13 +12,15 @@ do_shutdown_mock = None ipython_display = None code = "some spark code" -user_code_parser = MagicMock(return_value=code) class TestSparkKernel(SparkKernelBase): - def __init__(self): + def __init__(self, user_code_parser=None): kwargs = {"testing": True} - super(TestSparkKernel, self).__init__( + if user_code_parser is None: + user_code_parser = MagicMock(return_value=code) + + super().__init__( None, None, None, None, None, LANG_PYTHON, user_code_parser, **kwargs ) @@ -24,7 +28,8 @@ def __init__(self): def _setup(): global kernel, execute_cell_mock, do_shutdown_mock, ipython_display - kernel = TestSparkKernel() + user_code_parser = MagicMock(return_value=code) + kernel = TestSparkKernel(user_code_parser) kernel._execute_cell_for_user = execute_cell_mock = MagicMock( return_value={"test": "ing", "a": "b", "status": "ok"} @@ -42,9 +47,11 @@ def test_execute_valid_code(): # Verify that the execution flows through. ret = kernel.do_execute(code, False) - user_code_parser.get_code_to_run.assert_called_once_with(code) - assert ret is execute_cell_mock.return_value + kernel.user_code_parser.get_code_to_run.assert_called_once_with(code) + assert execute_cell_mock.called_once_with(ret, True) + assert execute_cell_mock.return_value is ret assert kernel._fatal_error is None + assert execute_cell_mock.called_once_with(code, True) assert ipython_display.send_error.call_count == 0 @@ -57,7 +64,7 @@ def test_execute_throws_if_fatal_error_happened(): ret = kernel.do_execute(code, False) - assert ret is execute_cell_mock.return_value + assert execute_cell_mock.return_value is ret assert kernel._fatal_error == fatal_error assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 @@ -71,8 +78,7 @@ def test_execute_alerts_user_if_an_unexpected_error_happens(): kernel._repeat_fatal_error = MagicMock(side_effect=ValueError) ret = kernel.do_execute(code, False) - - assert ret is execute_cell_mock.return_value + assert execute_cell_mock.return_value is ret assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 @@ -85,13 +91,13 @@ def test_execute_throws_if_fatal_error_happens_for_execution(): reply_content = dict() reply_content["status"] = "error" reply_content["evalue"] = fatal_error + execute_cell_mock.return_value = reply_content ret = kernel._execute_cell( code, False, shutdown_if_error=True, log_if_error=fatal_error ) - - assert ret is execute_cell_mock.return_value + assert execute_cell_mock.return_value is ret assert kernel._fatal_error == message assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 @@ -171,31 +177,48 @@ def test_delete_session(): ) -@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute") @with_setup(_teardown) -def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): - import sys +def test_execute_cell_for_user_ipykernel4(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", + new_callable=MagicMock, + return_value=want, + ) as mock_ipy_execute: + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) - if sys.version_info.major == 2: - from unittest import SkipTest + assert mock_ipy_execute.called + assert want == got - raise SkipTest("Python 3 only") - else: - import asyncio - mock_ipy_execute_result = asyncio.Future() - mock_ipy_execute_result.set_result({"status": "OK"}) - mock_ipy_execute.return_value = mock_ipy_execute_result - actual_result = TestSparkKernel()._execute_cell_for_user(code="Foo", silent=True) +@with_setup(_teardown) +def test_execute_cell_for_user_ipykernel5(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", + new_callable=MagicMock, + ) as mock_ipy_execute: + mock_ipy_execute.return_value = asyncio.Future() + mock_ipy_execute.return_value.set_result(want) - assert {"status": "OK"} == actual_result + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + assert mock_ipy_execute.called + assert want == got -@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute") -@with_setup(_teardown) -def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): - mock_ipy_execute.return_value = {"status": "OK"} - actual_result = TestSparkKernel()._execute_cell_for_user(code="Foo", silent=True) - - assert {"status": "OK"} == actual_result +@with_setup(_teardown) +def test_execute_cell_for_user_ipykernel6(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", return_value=want + ) as mock_ipy_execute: + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + assert mock_ipy_execute.called + assert want == got