From 9cc74829b554d43650bb9aca5d8ce4d43ffa923f Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 11:43:55 -0700 Subject: [PATCH 01/16] Support ipykernel v6 async results and add test coverage run black formatter --- .github/workflows/tests.yml | 1 + .../kernels/wrapperkernel/sparkkernelbase.py | 130 +++++++++++++----- .../sparkmagic/tests/test_sparkkernelbase.py | 115 +++++++++++----- 3 files changed, 180 insertions(+), 66 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f56676b8d..16eb55fe9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,6 +24,7 @@ jobs: pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils pip install -r autovizwidget/requirements.txt -e autovizwidget pip install -r sparkmagic/requirements.txt -e sparkmagic + pip install aiounittest - name: Run hdijupyterutils tests run: | nosetests hdijupyterutils diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 55c80e475..3ea82e3af 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -1,12 +1,9 @@ # Copyright (c) 2015 aggftw@gmail.com # Distributed under the terms of the Modified BSD License. -try: - from asyncio import Future -except ImportError: - class Future(object): - """A class nothing will use.""" - +import asyncio import requests + + from ipykernel.ipkernel import IPythonKernel from hdijupyterutils.ipythondisplay import IpythonDisplay @@ -17,8 +14,17 @@ class Future(object): class SparkKernelBase(IPythonKernel): - def __init__(self, implementation, implementation_version, language, language_version, language_info, - session_language, user_code_parser=None, **kwargs): + def __init__( + self, + implementation, + implementation_version, + language, + language_version, + language_info, + session_language, + user_code_parser=None, + **kwargs + ): # Required by Jupyter - Override self.implementation = implementation self.implementation_version = implementation_version @@ -49,12 +55,17 @@ def __init__(self, implementation, implementation_version, language, language_ve if conf.use_auto_viz(): self._register_auto_viz() - def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): + def do_execute( + self, code, silent, store_history=True, user_expressions=None, allow_stdin=False + ): def f(self): if self._fatal_error is not None: return self._repeat_fatal_error() - return self._do_execute(code, silent, store_history, user_expressions, allow_stdin) + return self._do_execute( + code, silent, store_history, user_expressions, allow_stdin + ) + return wrap_unexpected_exceptions(f, self._complete_cell)(self) def do_shutdown(self, restart): @@ -66,65 +77,113 @@ def do_shutdown(self, restart): def _do_execute(self, code, silent, store_history, user_expressions, allow_stdin): code_to_run = self.user_code_parser.get_code_to_run(code) - res = self._execute_cell(code_to_run, silent, store_history, user_expressions, allow_stdin) + res = self._execute_cell( + code_to_run, silent, store_history, user_expressions, allow_stdin + ) return res def _load_magics_extension(self): register_magics_code = "%load_ext sparkmagic.kernels" - self._execute_cell(register_magics_code, True, False, shutdown_if_error=True, - log_if_error="Failed to load the Spark kernels magics library.") + self._execute_cell( + register_magics_code, + True, + False, + shutdown_if_error=True, + log_if_error="Failed to load the Spark kernels magics library.", + ) self.logger.debug("Loaded magics.") def _change_language(self): - register_magics_code = "%%_do_not_call_change_language -l {}\n ".format(self.session_language) - self._execute_cell(register_magics_code, True, False, shutdown_if_error=True, - log_if_error="Failed to change language to {}.".format(self.session_language)) + register_magics_code = "%%_do_not_call_change_language -l {}\n ".format( + self.session_language + ) + self._execute_cell( + register_magics_code, + True, + False, + shutdown_if_error=True, + log_if_error="Failed to change language to {}.".format( + self.session_language + ), + ) self.logger.debug("Changed language.") def _register_auto_viz(self): from sparkmagic.utils.sparkevents import get_spark_events_handler import autovizwidget.utils.configuration as c - + handler = get_spark_events_handler() c.override("events_handler", handler) - + register_auto_viz_code = """from autovizwidget.widget.utils import display_dataframe ip = get_ipython() ip.display_formatter.ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)""" - self._execute_cell(register_auto_viz_code, True, False, shutdown_if_error=True, - log_if_error="Failed to register auto viz for notebook.") + self._execute_cell( + register_auto_viz_code, + True, + False, + shutdown_if_error=True, + log_if_error="Failed to register auto viz for notebook.", + ) self.logger.debug("Registered auto viz.") def _delete_session(self): code = "%%_do_not_call_delete_session\n " self._execute_cell_for_user(code, True, False) - def _execute_cell(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False, - shutdown_if_error=False, log_if_error=None): - reply_content = self._execute_cell_for_user(code, silent, store_history, user_expressions, allow_stdin) + def _execute_cell( + self, + code, + silent, + store_history=True, + user_expressions=None, + allow_stdin=False, + shutdown_if_error=False, + log_if_error=None, + ): + reply_content = self._execute_cell_for_user( + code, silent, store_history, user_expressions, allow_stdin + ) if shutdown_if_error and reply_content[u"status"] == u"error": error_from_reply = reply_content[u"evalue"] if log_if_error is not None: - message = "{}\nException details:\n\t\"{}\"".format(log_if_error, error_from_reply) + message = '{}\nException details:\n\t"{}"'.format( + log_if_error, error_from_reply + ) return self._abort_with_fatal_error(message) return reply_content - def _execute_cell_for_user(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): - result = super(SparkKernelBase, self).do_execute(code, silent, store_history, user_expressions, allow_stdin) - if isinstance(result, Future): - result = result.result() + async def _execute_cell_for_user( + self, code, silent, store_history=True, user_expressions=None, allow_stdin=False + ): + result = super().do_execute( + code, silent, store_history, user_expressions, allow_stdin + ) + + # In ipykernel 6, this returns native asyncio coroutine + if asyncio.iscoroutine(result): + return await result + + # In ipykernel 5, this returns gen.coroutine + if isinstance(result, asyncio.Future): + return result.result() + + # In ipykernel 4, this func is synchronous return result def _do_shutdown_ipykernel(self, restart): return super(SparkKernelBase, self).do_shutdown(restart) def _complete_cell(self): - """A method that runs a cell with no effect. Call this and return the value it - returns when there's some sort of error preventing the user's cell from executing; this - will register the cell from the Jupyter UI as being completed.""" + """A method that runs a cell with no effect. + + Call this and return the value it returns when there's some sort + of error preventing the user's cell from executing; this will + register the cell from the Jupyter UI as being completed. + """ return self._execute_cell("None", False, True, None, False) def _show_user_error(self, message): @@ -132,9 +191,12 @@ def _show_user_error(self, message): self.ipython_display.send_error(message) def _queue_fatal_error(self, message): - """Queues up a fatal error to be thrown when the next cell is executed; does not - raise an error immediately. We use this for errors that happen on kernel startup, - since IPython crashes if we throw an exception in the __init__ method.""" + """Queues up a fatal error to be thrown when the next cell is executed; + does not raise an error immediately. + + We use this for errors that happen on kernel startup, since + IPython crashes if we throw an exception in the __init__ method. + """ self._fatal_error = message def _abort_with_fatal_error(self, message): diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index f901c8e48..017b5bd81 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,6 +1,9 @@ +import asyncio import ipykernel -from mock import MagicMock, call, patch + +from unittest.mock import MagicMock, AsyncMock, call, patch from nose.tools import with_setup +from aiounittest import async_test from sparkmagic.kernels.wrapperkernel.sparkkernelbase import SparkKernelBase from sparkmagic.utils.constants import LANG_PYTHON @@ -16,8 +19,9 @@ class TestSparkKernel(SparkKernelBase): def __init__(self): kwargs = {"testing": True} - super(TestSparkKernel, self).__init__(None, None, None, None, None, LANG_PYTHON, user_code_parser, - **kwargs) + super(TestSparkKernel, self).__init__( + None, None, None, None, None, LANG_PYTHON, user_code_parser, **kwargs + ) def _setup(): @@ -25,8 +29,9 @@ def _setup(): kernel = TestSparkKernel() - kernel._execute_cell_for_user = execute_cell_mock = MagicMock(return_value={'test': 'ing', 'a': 'b', - 'status': 'ok'}) + kernel._execute_cell_for_user = execute_cell_mock = MagicMock( + return_value={"test": "ing", "a": "b", "status": "ok"} + ) kernel._do_shutdown_ipykernel = do_shutdown_mock = MagicMock() kernel.ipython_display = ipython_display = MagicMock() @@ -79,13 +84,15 @@ def test_execute_alerts_user_if_an_unexpected_error_happens(): def test_execute_throws_if_fatal_error_happens_for_execution(): # Verify that the kernel sends the error from Python execution's context to the user fatal_error = u"Error." - message = "{}\nException details:\n\t\"{}\"".format(fatal_error, fatal_error) + message = '{}\nException details:\n\t"{}"'.format(fatal_error, fatal_error) reply_content = dict() reply_content[u"status"] = u"error" reply_content[u"evalue"] = fatal_error execute_cell_mock.return_value = reply_content - ret = kernel._execute_cell(code, False, shutdown_if_error=True, log_if_error=fatal_error) + ret = kernel._execute_cell( + code, False, shutdown_if_error=True, log_if_error=fatal_error + ) assert ret is execute_cell_mock.return_value assert kernel._fatal_error == message @@ -118,55 +125,99 @@ def test_shutdown_cleans_up(): def test_register_auto_viz(): kernel._register_auto_viz() - assert call("from autovizwidget.widget.utils import display_dataframe\nip = get_ipython()\nip.display_formatter" - ".ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)", - True, False, None, False) in execute_cell_mock.mock_calls + assert ( + call( + "from autovizwidget.widget.utils import display_dataframe\nip = get_ipython()\nip.display_formatter" + ".ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)", + True, + False, + None, + False, + ) + in execute_cell_mock.mock_calls + ) @with_setup(_setup, _teardown) def test_change_language(): kernel._change_language() - assert call("%%_do_not_call_change_language -l {}\n ".format(LANG_PYTHON), - True, False, None, False) in execute_cell_mock.mock_calls + assert ( + call( + "%%_do_not_call_change_language -l {}\n ".format(LANG_PYTHON), + True, + False, + None, + False, + ) + in execute_cell_mock.mock_calls + ) @with_setup(_setup, _teardown) def test_load_magics(): kernel._load_magics_extension() - assert call("%load_ext sparkmagic.kernels", True, False, None, False) in execute_cell_mock.mock_calls + assert ( + call("%load_ext sparkmagic.kernels", True, False, None, False) + in execute_cell_mock.mock_calls + ) @with_setup(_setup, _teardown) def test_delete_session(): kernel._delete_session() - assert call("%%_do_not_call_delete_session\n ", True, False) in execute_cell_mock.mock_calls + assert ( + call("%%_do_not_call_delete_session\n ", True, False) + in execute_cell_mock.mock_calls + ) + -@patch.object(ipykernel.ipkernel.IPythonKernel, 'do_execute') +@async_test +@patch.object( + ipykernel.ipkernel.IPythonKernel, + "do_execute", +) @with_setup(_teardown) -def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): - import sys - if sys.version_info.major == 2: - from unittest import SkipTest - raise SkipTest("Python 3 only") - else: - import asyncio - mock_ipy_execute_result = asyncio.Future() - mock_ipy_execute_result.set_result({'status': 'OK'}) - mock_ipy_execute.return_value = mock_ipy_execute_result +async def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): + mock_ipy_execute.return_value = {"status": "OK"} - actual_result = TestSparkKernel()._execute_cell_for_user(code='Foo', silent=True) + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="Foo", silent=True + ) - assert {'status': 'OK'} == actual_result + print(actual_result) + assert {"status": "OK"} == actual_result -@patch.object(ipykernel.ipkernel.IPythonKernel, 'do_execute') +@async_test +@patch.object( + ipykernel.ipkernel.IPythonKernel, + "do_execute", +) @with_setup(_teardown) -def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): - mock_ipy_execute.return_value = {'status': 'OK'} +async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): + mock_ipy_execute.return_value = asyncio.Future() + mock_ipy_execute.return_value.set_result({"status": "OK"}) + + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="Foo", silent=True + ) - actual_result = TestSparkKernel()._execute_cell_for_user(code='Foo', silent=True) + print(actual_result) + assert {"status": "OK"} == actual_result - assert {'status': 'OK'} == actual_result + +@async_test +@patch.object( + ipykernel.ipkernel.IPythonKernel, + "do_execute", + new=AsyncMock(return_value={"status": "OK"}), +) +@with_setup(_teardown) +async def test_execute_cell_for_user_ipykernel6(): + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="Foo", silent=True + ) + assert {"status": "OK"} == actual_result From 56ff7fc076ad2eaf8c6da4bb0208b366204484bb Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 12:03:45 -0700 Subject: [PATCH 02/16] Tests passing in all ipython versions --- .../sparkmagic/kernels/wrapperkernel/sparkkernelbase.py | 4 ++-- sparkmagic/sparkmagic/tests/test_sparkkernelbase.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 3ea82e3af..0f640a144 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -165,11 +165,11 @@ async def _execute_cell_for_user( # In ipykernel 6, this returns native asyncio coroutine if asyncio.iscoroutine(result): - return await result + result = await result # In ipykernel 5, this returns gen.coroutine if isinstance(result, asyncio.Future): - return result.result() + result = result.result() # In ipykernel 4, this func is synchronous return result diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 017b5bd81..4d964dbb7 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,7 +1,7 @@ import asyncio import ipykernel -from unittest.mock import MagicMock, AsyncMock, call, patch +from unittest.mock import MagicMock, call, patch from nose.tools import with_setup from aiounittest import async_test @@ -213,10 +213,10 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): @patch.object( ipykernel.ipkernel.IPythonKernel, "do_execute", - new=AsyncMock(return_value={"status": "OK"}), ) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel6(): +async def test_execute_cell_for_user_ipykernel6(mock_ipy_execute): + mock_ipy_execute.return_value = {"status": "OK"} actual_result = await TestSparkKernel()._execute_cell_for_user( code="Foo", silent=True ) From 3bad81a394b3c072edcd8d52179d345838f0f4a6 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 12:28:12 -0700 Subject: [PATCH 03/16] Debugging Python 3.6 --- sparkmagic/sparkmagic/tests/test_sparkkernelbase.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 4d964dbb7..3fd0bbc2a 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,7 +1,7 @@ import asyncio import ipykernel -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, AsyncMock, call, patch from nose.tools import with_setup from aiounittest import async_test @@ -184,7 +184,7 @@ async def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): mock_ipy_execute.return_value = {"status": "OK"} actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) print(actual_result) @@ -202,7 +202,7 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): mock_ipy_execute.return_value.set_result({"status": "OK"}) actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) print(actual_result) @@ -213,11 +213,11 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): @patch.object( ipykernel.ipkernel.IPythonKernel, "do_execute", + new=AsyncMock(return_value={"status": "OK"}), ) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel6(mock_ipy_execute): - mock_ipy_execute.return_value = {"status": "OK"} +async def test_execute_cell_for_user_ipykernel6(): actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) assert {"status": "OK"} == actual_result From c5a980a40bdb33c1dc1405dbede88df7bbf2b29d Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 12:34:47 -0700 Subject: [PATCH 04/16] Revert "Debugging Python 3.6" This reverts commit 3bad81a394b3c072edcd8d52179d345838f0f4a6. --- sparkmagic/sparkmagic/tests/test_sparkkernelbase.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 3fd0bbc2a..4d964dbb7 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,7 +1,7 @@ import asyncio import ipykernel -from unittest.mock import MagicMock, AsyncMock, call, patch +from unittest.mock import MagicMock, call, patch from nose.tools import with_setup from aiounittest import async_test @@ -184,7 +184,7 @@ async def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): mock_ipy_execute.return_value = {"status": "OK"} actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True + code="Foo", silent=True ) print(actual_result) @@ -202,7 +202,7 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): mock_ipy_execute.return_value.set_result({"status": "OK"}) actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True + code="Foo", silent=True ) print(actual_result) @@ -213,11 +213,11 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): @patch.object( ipykernel.ipkernel.IPythonKernel, "do_execute", - new=AsyncMock(return_value={"status": "OK"}), ) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel6(): +async def test_execute_cell_for_user_ipykernel6(mock_ipy_execute): + mock_ipy_execute.return_value = {"status": "OK"} actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True + code="Foo", silent=True ) assert {"status": "OK"} == actual_result From 406f32fc3d1a121ac96c542e2e607d772536b806 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 13:12:30 -0700 Subject: [PATCH 05/16] Early returns --- .../kernels/wrapperkernel/sparkkernelbase.py | 8 +++---- .../sparkmagic/tests/test_sparkkernelbase.py | 21 +++++-------------- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 0f640a144..7aedd3f94 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -35,7 +35,7 @@ def __init__( # Override self.session_language = session_language - super(SparkKernelBase, self).__init__(**kwargs) + super().__init__(**kwargs) self.logger = SparkLog(u"{}_jupyter_kernel".format(self.session_language)) self._fatal_error = None @@ -165,17 +165,17 @@ async def _execute_cell_for_user( # In ipykernel 6, this returns native asyncio coroutine if asyncio.iscoroutine(result): - result = await result + return await result # In ipykernel 5, this returns gen.coroutine if isinstance(result, asyncio.Future): - result = result.result() + return result.result() # In ipykernel 4, this func is synchronous return result def _do_shutdown_ipykernel(self, restart): - return super(SparkKernelBase, self).do_shutdown(restart) + return super().do_shutdown(restart) def _complete_cell(self): """A method that runs a cell with no effect. diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 4d964dbb7..ee0157ab0 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -175,37 +175,26 @@ def test_delete_session(): @async_test -@patch.object( - ipykernel.ipkernel.IPythonKernel, - "do_execute", -) +@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute", new_callable=MagicMock) @with_setup(_teardown) async def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): mock_ipy_execute.return_value = {"status": "OK"} - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) - - print(actual_result) assert {"status": "OK"} == actual_result @async_test -@patch.object( - ipykernel.ipkernel.IPythonKernel, - "do_execute", -) +@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute", new_callable=MagicMock) @with_setup(_teardown) async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): mock_ipy_execute.return_value = asyncio.Future() mock_ipy_execute.return_value.set_result({"status": "OK"}) actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) - - print(actual_result) assert {"status": "OK"} == actual_result @@ -218,6 +207,6 @@ async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): async def test_execute_cell_for_user_ipykernel6(mock_ipy_execute): mock_ipy_execute.return_value = {"status": "OK"} actual_result = await TestSparkKernel()._execute_cell_for_user( - code="Foo", silent=True + code="1", silent=True ) assert {"status": "OK"} == actual_result From ce658395e2668e9f35f9166cb4466df5adecfb87 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 14:48:25 -0700 Subject: [PATCH 06/16] patch decorator doesn't work on async functions in python <3.8 --- .github/workflows/tests.yml | 1 + .../sparkmagic/tests/test_sparkkernelbase.py | 66 +++++++++++-------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 16eb55fe9..0edc8f31e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,6 +21,7 @@ jobs: sudo apt-get install -y libkrb5-dev - name: Install package dependencies run: | + python -m pip install --upgrade pip pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils pip install -r autovizwidget/requirements.txt -e autovizwidget pip install -r sparkmagic/requirements.txt -e sparkmagic diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index ee0157ab0..5db2f5158 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -175,38 +175,52 @@ def test_delete_session(): @async_test -@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute", new_callable=MagicMock) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel4(mock_ipy_execute): - mock_ipy_execute.return_value = {"status": "OK"} - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) - assert {"status": "OK"} == actual_result +async def test_execute_cell_for_user_ipykernel4(): + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", + new_callable=MagicMock, + return_value={"status": "OK"}, + ) as mock_ipy_execute: + print(mock_ipy_execute) + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="1", silent=True + ) + print(actual_result) + assert mock_ipy_execute.called + assert mock_ipy_execute.called + assert {"status": "OK"} == actual_result @async_test -@patch.object(ipykernel.ipkernel.IPythonKernel, "do_execute", new_callable=MagicMock) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel5(mock_ipy_execute): - mock_ipy_execute.return_value = asyncio.Future() - mock_ipy_execute.return_value.set_result({"status": "OK"}) - - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) - assert {"status": "OK"} == actual_result +async def test_execute_cell_for_user_ipykernel5(): + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", + new_callable=MagicMock, + ) as mock_ipy_execute: + mock_ipy_execute.return_value = asyncio.Future() + mock_ipy_execute.return_value.set_result({"status": "OK"}) + print(mock_ipy_execute) + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="1", silent=True + ) + print(actual_result) + assert mock_ipy_execute.called + assert {"status": "OK"} == actual_result @async_test -@patch.object( - ipykernel.ipkernel.IPythonKernel, - "do_execute", -) @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel6(mock_ipy_execute): - mock_ipy_execute.return_value = {"status": "OK"} - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) - assert {"status": "OK"} == actual_result +async def test_execute_cell_for_user_ipykernel6(): + with patch( + "ipykernel.ipkernel.IPythonKernel.do_execute", + return_value={"status": "OK"}, + ) as mock_ipy_execute: + mock_ipy_execute.return_value = {"status": "OK"} + print(mock_ipy_execute) + actual_result = await TestSparkKernel()._execute_cell_for_user( + code="1", silent=True + ) + assert mock_ipy_execute.called + assert {"status": "OK"} == actual_result From 11ed740aa761e95738d9ee17723415aabc0183aa Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 14:51:46 -0700 Subject: [PATCH 07/16] Test cleanup --- .../sparkmagic/tests/test_sparkkernelbase.py | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 5db2f5158..c74af30eb 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -177,50 +177,49 @@ def test_delete_session(): @async_test @with_setup(_teardown) async def test_execute_cell_for_user_ipykernel4(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 with patch( "ipykernel.ipkernel.IPythonKernel.do_execute", new_callable=MagicMock, - return_value={"status": "OK"}, + return_value=want, ) as mock_ipy_execute: - print(mock_ipy_execute) - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) - print(actual_result) - assert mock_ipy_execute.called + got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + assert mock_ipy_execute.called - assert {"status": "OK"} == actual_result + assert want == got @async_test @with_setup(_teardown) async def test_execute_cell_for_user_ipykernel5(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 with patch( "ipykernel.ipkernel.IPythonKernel.do_execute", new_callable=MagicMock, ) as mock_ipy_execute: mock_ipy_execute.return_value = asyncio.Future() - mock_ipy_execute.return_value.set_result({"status": "OK"}) - print(mock_ipy_execute) - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) - print(actual_result) + mock_ipy_execute.return_value.set_result(want) + + got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + assert mock_ipy_execute.called - assert {"status": "OK"} == actual_result + assert want == got @async_test @with_setup(_teardown) async def test_execute_cell_for_user_ipykernel6(): + want = {"status": "OK"} + # Can't use patch decorator because + # it fails to patch async functions in Python < 3.8 with patch( - "ipykernel.ipkernel.IPythonKernel.do_execute", - return_value={"status": "OK"}, + "ipykernel.ipkernel.IPythonKernel.do_execute", return_value=want ) as mock_ipy_execute: - mock_ipy_execute.return_value = {"status": "OK"} - print(mock_ipy_execute) - actual_result = await TestSparkKernel()._execute_cell_for_user( - code="1", silent=True - ) + got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + assert mock_ipy_execute.called - assert {"status": "OK"} == actual_result + assert want == got From ac13287d2371a3ed7fcd65ac398bab0d5d398d35 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Thu, 16 Sep 2021 14:54:08 -0700 Subject: [PATCH 08/16] Remove ipykernel<6 constraint --- hdijupyterutils/requirements.txt | 2 +- hdijupyterutils/setup.py | 2 +- sparkmagic/requirements.txt | 2 +- sparkmagic/setup.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hdijupyterutils/requirements.txt b/hdijupyterutils/requirements.txt index f2156b88d..2d3f7cc43 100644 --- a/hdijupyterutils/requirements.txt +++ b/hdijupyterutils/requirements.txt @@ -2,7 +2,7 @@ ipython>=4.0.2 nose mock ipywidgets>5.0.0 -ipykernel>=4.2.2<6.0.0 +ipykernel>=4.2.2 jupyter>=1 pandas>=0.17.1 numpy>=1.16.5 diff --git a/hdijupyterutils/setup.py b/hdijupyterutils/setup.py index 9501e31be..f143deb72 100644 --- a/hdijupyterutils/setup.py +++ b/hdijupyterutils/setup.py @@ -61,7 +61,7 @@ def version(path): "nose", "mock", "ipywidgets>5.0.0", - "ipykernel>=4.2.2<6.0.0", + "ipykernel>=4.2.2", "jupyter>=1", "pandas>=0.17.1", "numpy", diff --git a/sparkmagic/requirements.txt b/sparkmagic/requirements.txt index 6b3d175c2..0230efdb0 100644 --- a/sparkmagic/requirements.txt +++ b/sparkmagic/requirements.txt @@ -6,7 +6,7 @@ mock pandas>=0.17.1 numpy requests -ipykernel>=4.2.2<6.0.0 +ipykernel>=4.2.2 ipywidgets>5.0.0 notebook>=4.2 tornado>=4 diff --git a/sparkmagic/setup.py b/sparkmagic/setup.py index 312390d35..5f6d61cf9 100644 --- a/sparkmagic/setup.py +++ b/sparkmagic/setup.py @@ -34,7 +34,7 @@ def read(path, encoding="utf-8"): def version(path): - """Obtain the package version from a python file e.g. pkg/__init__.py + """Obtain the package version from a python file e.g. pkg/__init__.py. See . """ @@ -89,7 +89,7 @@ def version(path): "pandas>=0.17.1", "numpy", "requests", - "ipykernel<6.0.0", + "ipykernel>=4.2.2", "ipywidgets>5.0.0", "notebook>=4.2", "tornado>=4", From 6fc77e840fef713c7eaf65be048ca6702013b793 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Mon, 25 Oct 2021 13:18:33 -0700 Subject: [PATCH 09/16] Upddate kernel base w/ async behavior and tests --- .../kernels/wrapperkernel/sparkkernelbase.py | 78 +++++----- .../sparkmagic/livyclientlib/exceptions.py | 123 +++++++++++---- .../sparkmagic/tests/test_sparkkernelbase.py | 141 ++++++++++++------ 3 files changed, 237 insertions(+), 105 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 7aedd3f94..558e14f94 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -9,7 +9,7 @@ import sparkmagic.utils.configuration as conf from sparkmagic.utils.sparklogger import SparkLog -from sparkmagic.livyclientlib.exceptions import wrap_unexpected_exceptions +from sparkmagic.livyclientlib.exceptions import async_wrap_unexpected_exceptions from sparkmagic.kernels.wrapperkernel.usercodeparser import UserCodeParser @@ -49,43 +49,40 @@ def __init__( # Disable warnings for test env in HDI requests.packages.urllib3.disable_warnings() - if not kwargs.get("testing", False): - self._load_magics_extension() - self._change_language() - if conf.use_auto_viz(): - self._register_auto_viz() - - def do_execute( + async def do_execute( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False ): - def f(self): + async def f(self): if self._fatal_error is not None: - return self._repeat_fatal_error() + return await self._repeat_fatal_error() - return self._do_execute( + return await self._do_execute( code, silent, store_history, user_expressions, allow_stdin ) - return wrap_unexpected_exceptions(f, self._complete_cell)(self) + wrapped = async_wrap_unexpected_exceptions(f, self._complete_cell) + return await wrapped(self) - def do_shutdown(self, restart): + async def do_shutdown(self, restart): # Cleanup - self._delete_session() + await self._delete_session() - return self._do_shutdown_ipykernel(restart) + return await self._do_shutdown_ipykernel(restart) - def _do_execute(self, code, silent, store_history, user_expressions, allow_stdin): + async def _do_execute( + self, code, silent, store_history, user_expressions, allow_stdin + ): code_to_run = self.user_code_parser.get_code_to_run(code) - res = self._execute_cell( + res = await self._execute_cell( code_to_run, silent, store_history, user_expressions, allow_stdin ) return res - def _load_magics_extension(self): + async def _load_magics_extension(self): register_magics_code = "%load_ext sparkmagic.kernels" - self._execute_cell( + await self._execute_cell( register_magics_code, True, False, @@ -94,11 +91,11 @@ def _load_magics_extension(self): ) self.logger.debug("Loaded magics.") - def _change_language(self): + async def _change_language(self): register_magics_code = "%%_do_not_call_change_language -l {}\n ".format( self.session_language ) - self._execute_cell( + await self._execute_cell( register_magics_code, True, False, @@ -109,7 +106,7 @@ def _change_language(self): ) self.logger.debug("Changed language.") - def _register_auto_viz(self): + async def _register_auto_viz(self): from sparkmagic.utils.sparkevents import get_spark_events_handler import autovizwidget.utils.configuration as c @@ -119,7 +116,7 @@ def _register_auto_viz(self): register_auto_viz_code = """from autovizwidget.widget.utils import display_dataframe ip = get_ipython() ip.display_formatter.ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)""" - self._execute_cell( + await self._execute_cell( register_auto_viz_code, True, False, @@ -128,11 +125,11 @@ def _register_auto_viz(self): ) self.logger.debug("Registered auto viz.") - def _delete_session(self): + async def _delete_session(self): code = "%%_do_not_call_delete_session\n " - self._execute_cell_for_user(code, True, False) + await self._execute_cell_for_user(code, True, False) - def _execute_cell( + async def _execute_cell( self, code, silent, @@ -142,7 +139,7 @@ def _execute_cell( shutdown_if_error=False, log_if_error=None, ): - reply_content = self._execute_cell_for_user( + reply_content = await self._execute_cell_for_user( code, silent, store_history, user_expressions, allow_stdin ) @@ -152,7 +149,7 @@ def _execute_cell( message = '{}\nException details:\n\t"{}"'.format( log_if_error, error_from_reply ) - return self._abort_with_fatal_error(message) + return await self._abort_with_fatal_error(message) return reply_content @@ -168,23 +165,30 @@ async def _execute_cell_for_user( return await result # In ipykernel 5, this returns gen.coroutine - if isinstance(result, asyncio.Future): + if asyncio.isfuture(result): return result.result() # In ipykernel 4, this func is synchronous return result - def _do_shutdown_ipykernel(self, restart): - return super().do_shutdown(restart) + async def _do_shutdown_ipykernel(self, restart): + # INVESTIGATE: Inspect if should await + result = super().do_shutdown(restart) + + # In tests, super() calls this SparkKernelBase.do_shutdown, which is async + if asyncio.iscoroutine(result): + return await result + + return result - def _complete_cell(self): + async def _complete_cell(self): """A method that runs a cell with no effect. Call this and return the value it returns when there's some sort of error preventing the user's cell from executing; this will register the cell from the Jupyter UI as being completed. """ - return self._execute_cell("None", False, True, None, False) + return await self._execute_cell("None", False, True, None, False) def _show_user_error(self, message): self.logger.error(message) @@ -199,14 +203,14 @@ def _queue_fatal_error(self, message): """ self._fatal_error = message - def _abort_with_fatal_error(self, message): + async def _abort_with_fatal_error(self, message): """Queues up a fatal error and throws it immediately.""" self._queue_fatal_error(message) - return self._repeat_fatal_error() + return await self._repeat_fatal_error() - def _repeat_fatal_error(self): + async def _repeat_fatal_error(self): """Throws an error that has already been queued.""" error = conf.fatal_error_suggestion().format(self._fatal_error) self.logger.error(error) self.ipython_display.send_error(error) - return self._complete_cell() + return await self._complete_cell() diff --git a/sparkmagic/sparkmagic/livyclientlib/exceptions.py b/sparkmagic/sparkmagic/livyclientlib/exceptions.py index 5c3bb0e5f..c34f1b089 100644 --- a/sparkmagic/sparkmagic/livyclientlib/exceptions.py +++ b/sparkmagic/sparkmagic/livyclientlib/exceptions.py @@ -1,4 +1,4 @@ -from __future__ import print_function +import asyncio import sys import traceback from sparkmagic.utils.constants import EXPECTED_ERROR_MSG, INTERNAL_ERROR_MSG @@ -6,47 +6,53 @@ # == EXCEPTIONS == class LivyClientLibException(Exception): - """Base class for all LivyClientLib exceptions. All exceptions that are explicitly raised by - code in this package should be a subclass of LivyClientLibException. If you need to account for a - new error condition, either use one of the existing LivyClientLibException subclasses, - or create a new subclass with a descriptive name and add it to this file. + """Base class for all LivyClientLib exceptions. All exceptions that are + explicitly raised by code in this package should be a subclass of + LivyClientLibException. If you need to account for a new error condition, + either use one of the existing LivyClientLibException subclasses, or create + a new subclass with a descriptive name and add it to this file. - We distinguish between "expected" errors, which represent errors that a user is likely - to encounter in normal use, and "internal" errors, which represents exceptions that happen - due to a bug in the library. Check EXPECTED_EXCEPTIONS to see which exceptions - are considered "expected".""" + We distinguish between "expected" errors, which represent errors + that a user is likely to encounter in normal use, and "internal" + errors, which represents exceptions that happen due to a bug in the + library. Check EXPECTED_EXCEPTIONS to see which exceptions are + considered "expected". + """ class HttpClientException(LivyClientLibException): - """An exception thrown by the HTTP client when it fails to make a request.""" + """An exception thrown by the HTTP client when it fails to make a + request.""" class LivyClientTimeoutException(LivyClientLibException): """An exception for timeouts while interacting with Livy.""" + class DataFrameParseException(LivyClientLibException): - """An internal error which suggests a bad implementation of dataframe parsing from JSON -- - if we get a JSON parsing error when parsing the results from the Livy server, this exception - is thrown.""" + """An internal error which suggests a bad implementation of dataframe + parsing from JSON -- if we get a JSON parsing error when parsing the + results from the Livy server, this exception is thrown.""" class LivyUnexpectedStatusException(LivyClientLibException): - """An exception that will be shown if some unexpected error happens on the Livy side.""" + """An exception that will be shown if some unexpected error happens on the + Livy side.""" class SessionManagementException(LivyClientLibException): - """An exception that is thrown by the Session Manager when it is a - given session name is invalid in some way.""" + """An exception that is thrown by the Session Manager when it is a given + session name is invalid in some way.""" class BadUserConfigurationException(LivyClientLibException): - """An exception that is thrown when configuration provided by the user is invalid - in some way.""" + """An exception that is thrown when configuration provided by the user is + invalid in some way.""" class BadUserDataException(LivyClientLibException): - """An exception that is thrown when data provided by the user is invalid - in some way.""" + """An exception that is thrown when data provided by the user is invalid in + some way.""" class SqlContextNotFoundException(LivyClientLibException): @@ -54,7 +60,8 @@ class SqlContextNotFoundException(LivyClientLibException): class SparkStatementException(LivyClientLibException): - """Exception that is thrown when an error occurs while parsing or executing Spark statements.""" + """Exception that is thrown when an error occurs while parsing or executing + Spark statements.""" # It has to be a KeyboardInterrupt to interrupt the notebook @@ -84,11 +91,22 @@ def _show_tb(exc_type, exc_val, tb): class SparkStatementCancellationFailedException(KeyboardInterrupt): - """Exception that is thrown when a Spark statement is interrupted but fails to be cancelled in Livy.""" + """Exception that is thrown when a Spark statement is interrupted but fails + to be cancelled in Livy.""" # == DECORATORS FOR EXCEPTION HANDLING == -EXPECTED_EXCEPTIONS = [BadUserConfigurationException, BadUserDataException, LivyUnexpectedStatusException, SqlContextNotFoundException, HttpClientException, LivyClientTimeoutException, SessionManagementException, SparkStatementException] +EXPECTED_EXCEPTIONS = [ + BadUserConfigurationException, + BadUserDataException, + LivyUnexpectedStatusException, + SqlContextNotFoundException, + HttpClientException, + LivyClientTimeoutException, + SessionManagementException, + SparkStatementException, +] + def handle_expected_exceptions(f): """A decorator that handles expected exceptions. Self can be any object with @@ -98,6 +116,7 @@ def handle_expected_exceptions(f): def fn(self, ...): etc...""" from sparkmagic.utils import configuration as conf + exceptions_to_handle = tuple(EXPECTED_EXCEPTIONS) # Notice that we're NOT handling e.DataFrameParseException here. That's because DataFrameParseException @@ -114,6 +133,7 @@ def wrapped(self, *args, **kwargs): return None else: return out + wrapped.__name__ = f.__name__ wrapped.__doc__ = f.__doc__ return wrapped @@ -128,10 +148,15 @@ def wrap_unexpected_exceptions(f, execute_if_error=None): Usage: @wrap_unexpected_exceptions def fn(self, ...): - ..etc """ + ..etc""" from sparkmagic.utils import configuration as conf + def handle_exception(self, e): - self.logger.error(u"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format(e, traceback.format_exc())) + self.logger.error( + u"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( + e, traceback.format_exc() + ) + ) self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e)) return None if execute_if_error is None else execute_if_error() @@ -144,6 +169,54 @@ def wrapped(self, *args, **kwargs): return handle_exception(self, err) else: return out + + wrapped.__name__ = f.__name__ + wrapped.__doc__ = f.__doc__ + return wrapped + + +# async_wrap_unexpected_exceptions was created to handle async behavior ipykernel >=6 +# It was safer to create a separate async wrapper than modify the original to accommodate both use-cases +def async_wrap_unexpected_exceptions(f, execute_if_error=None): + """A decorator that catches all exceptions from the async function f and alerts the user about them. + Self can be any object with a "logger" attribute and a "ipython_display" attribute. + All exceptions are logged as "unexpected" exceptions, and a request is made to the user to file an issue + at the Github repository. If there is an error, returns None if execute_if_error is None, or else + returns the output of the function execute_if_error. + Usage: + @async_wrap_unexpected_exceptions + async def fn(self, ...): + ..etc""" + from sparkmagic.utils import configuration as conf + + async def handle_exception(self, e): + self.logger.error( + u"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( + e, traceback.format_exc() + ) + ) + self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e)) + if execute_if_error is None: + return None + + result = execute_if_error() + if asyncio.iscoroutine(result): + return await result + + return result + + async def wrapped(self, *args, **kwargs): + try: + out = f(self, *args, **kwargs) + if asyncio.iscoroutine(out): + out = await out + except Exception as err: + if conf.all_errors_are_fatal(): + raise err + return await handle_exception(self, err) + else: + return out + wrapped.__name__ = f.__name__ wrapped.__doc__ = f.__doc__ return wrapped diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index c74af30eb..dd6e7f0f3 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -1,13 +1,28 @@ import asyncio -import ipykernel + from unittest.mock import MagicMock, call, patch from nose.tools import with_setup -from aiounittest import async_test +from aiounittest import async_test, futurized from sparkmagic.kernels.wrapperkernel.sparkkernelbase import SparkKernelBase from sparkmagic.utils.constants import LANG_PYTHON +# AsyncMock is only available in Python >= 3.8 +# This helps modify tests to accommodate older Python version +is_async_mock_available = True +try: + # Python >= 3.8 + from unittest.mock import AsyncMock +except Exception: + # Patch + is_async_mock_available = False + + class AsyncMock(MagicMock): + async def __call__(self, *args, **kwargs): + return super(AsyncMock, self).__call__(*args, **kwargs) + + kernel = None execute_cell_mock = None do_shutdown_mock = None @@ -19,7 +34,7 @@ class TestSparkKernel(SparkKernelBase): def __init__(self): kwargs = {"testing": True} - super(TestSparkKernel, self).__init__( + super().__init__( None, None, None, None, None, LANG_PYTHON, user_code_parser, **kwargs ) @@ -29,10 +44,20 @@ def _setup(): kernel = TestSparkKernel() - kernel._execute_cell_for_user = execute_cell_mock = MagicMock( - return_value={"test": "ing", "a": "b", "status": "ok"} - ) - kernel._do_shutdown_ipykernel = do_shutdown_mock = MagicMock() + if is_async_mock_available: + kernel._execute_cell_for_user = execute_cell_mock = AsyncMock( + return_value={"test": "ing", "a": "b", "status": "ok"} + ) + kernel._do_shutdown_ipykernel = do_shutdown_mock = AsyncMock() + else: + # Use futurized return values to keep async func behavior + kernel._do_shutdown_ipykernel = do_shutdown_mock = MagicMock( + return_value=futurized(None) + ) + kernel._execute_cell_for_user = execute_cell_mock = MagicMock( + return_value=futurized({"test": "ing", "a": "b", "status": "ok"}) + ) + kernel.ipython_display = ipython_display = MagicMock() @@ -40,90 +65,118 @@ def _teardown(): pass +# Helper for calling assert_called_once_with on native AsyncMock vs Mock w/ futurized return values +async def async_assert_called_once_with(mock, *values): + result = mock.called_once_with(*values) + if asyncio.iscoroutine(result): + result = await result + + assert result + + +# Helper for asserting return values on native AsyncMock vs Mock w/ futurized return values +async def async_assert_return_value(mock, value): + return_value = mock.return_value + if asyncio.isfuture(return_value): + return_value = await return_value + + assert return_value is value + + +@async_test @with_setup(_setup, _teardown) -def test_execute_valid_code(): +async def test_execute_valid_code(): # Verify that the execution flows through. - ret = kernel.do_execute(code, False) + ret = await kernel.do_execute(code, False) user_code_parser.get_code_to_run.assert_called_once_with(code) - assert ret is execute_cell_mock.return_value + await async_assert_return_value(execute_cell_mock, ret) assert kernel._fatal_error is None - assert execute_cell_mock.called_once_with(code, True) + # assert execute_cell_mock.called_once_with(code, True) + await async_assert_called_once_with(execute_cell_mock, code, True) assert ipython_display.send_error.call_count == 0 +@async_test @with_setup(_setup, _teardown) -def test_execute_throws_if_fatal_error_happened(): +async def test_execute_throws_if_fatal_error_happened(): # Verify that if a fatal error already happened, we don't run the code and show the fatal error instead. fatal_error = "Error." kernel._fatal_error = fatal_error - ret = kernel.do_execute(code, False) + ret = await kernel.do_execute(code, False) - assert ret is execute_cell_mock.return_value + await async_assert_return_value(execute_cell_mock, ret) assert kernel._fatal_error == fatal_error - assert execute_cell_mock.called_once_with("None", True) + # assert execute_cell_mock.called_once_with("None", True) + await async_assert_called_once_with(execute_cell_mock, "None", True) assert ipython_display.send_error.call_count == 1 +@async_test @with_setup(_setup, _teardown) -def test_execute_alerts_user_if_an_unexpected_error_happens(): +async def test_execute_alerts_user_if_an_unexpected_error_happens(): # Verify that developer error shows developer error (the Github link). # Because do_execute is so minimal, we'll assume we have a bug in the _repeat_fatal_error method kernel._fatal_error = "Something bad happened before" - kernel._repeat_fatal_error = MagicMock(side_effect=ValueError) + kernel._repeat_fatal_error = AsyncMock(side_effect=ValueError) - ret = kernel.do_execute(code, False) - - assert ret is execute_cell_mock.return_value - assert execute_cell_mock.called_once_with("None", True) + ret = await kernel.do_execute(code, False) + await async_assert_return_value(execute_cell_mock, ret) + # assert execute_cell_mock.called_once_with("None", True) + await async_assert_called_once_with(execute_cell_mock, "None", True) assert ipython_display.send_error.call_count == 1 +@async_test @with_setup(_setup, _teardown) -def test_execute_throws_if_fatal_error_happens_for_execution(): +async def test_execute_throws_if_fatal_error_happens_for_execution(): # Verify that the kernel sends the error from Python execution's context to the user fatal_error = u"Error." message = '{}\nException details:\n\t"{}"'.format(fatal_error, fatal_error) reply_content = dict() reply_content[u"status"] = u"error" reply_content[u"evalue"] = fatal_error - execute_cell_mock.return_value = reply_content + execute_cell_mock.return_value = ( + reply_content if is_async_mock_available else futurized(reply_content) + ) - ret = kernel._execute_cell( + ret = await kernel._execute_cell( code, False, shutdown_if_error=True, log_if_error=fatal_error ) - - assert ret is execute_cell_mock.return_value + await async_assert_return_value(execute_cell_mock, ret) assert kernel._fatal_error == message - assert execute_cell_mock.called_once_with("None", True) + # assert execute_cell_mock.called_once_with("None", True) + await async_assert_called_once_with(execute_cell_mock, "None", True) assert ipython_display.send_error.call_count == 1 +@async_test @with_setup(_setup, _teardown) -def test_shutdown_cleans_up(): +async def test_shutdown_cleans_up(): # No restart - kernel._execute_cell_for_user = ecfu_m = MagicMock() - kernel._do_shutdown_ipykernel = dsi_m = MagicMock() + kernel._execute_cell_for_user = ecfu_m = AsyncMock() + kernel._do_shutdown_ipykernel = dsi_m = AsyncMock() - kernel.do_shutdown(False) + await kernel.do_shutdown(False) ecfu_m.assert_called_once_with("%%_do_not_call_delete_session\n ", True, False) dsi_m.assert_called_once_with(False) # On restart - kernel._execute_cell_for_user = ecfu_m = MagicMock() - kernel._do_shutdown_ipykernel = dsi_m = MagicMock() + kernel._execute_cell_for_user = ecfu_m = AsyncMock() + kernel._do_shutdown_ipykernel = dsi_m = AsyncMock() - kernel.do_shutdown(True) + await kernel.do_shutdown(True) ecfu_m.assert_called_once_with("%%_do_not_call_delete_session\n ", True, False) dsi_m.assert_called_once_with(True) +@async_test @with_setup(_setup, _teardown) -def test_register_auto_viz(): - kernel._register_auto_viz() +async def test_register_auto_viz(): + await kernel._register_auto_viz() assert ( call( @@ -138,9 +191,10 @@ def test_register_auto_viz(): ) +@async_test @with_setup(_setup, _teardown) -def test_change_language(): - kernel._change_language() +async def test_change_language(): + await kernel._change_language() assert ( call( @@ -154,9 +208,10 @@ def test_change_language(): ) +@async_test @with_setup(_setup, _teardown) -def test_load_magics(): - kernel._load_magics_extension() +async def test_load_magics(): + await kernel._load_magics_extension() assert ( call("%load_ext sparkmagic.kernels", True, False, None, False) @@ -164,9 +219,10 @@ def test_load_magics(): ) +@async_test @with_setup(_setup, _teardown) -def test_delete_session(): - kernel._delete_session() +async def test_delete_session(): + await kernel._delete_session() assert ( call("%%_do_not_call_delete_session\n ", True, False) @@ -220,6 +276,5 @@ async def test_execute_cell_for_user_ipykernel6(): "ipykernel.ipkernel.IPythonKernel.do_execute", return_value=want ) as mock_ipy_execute: got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) - assert mock_ipy_execute.called assert want == got From 743bd0a82a6851d9cdf2a51b8eb74d3dab526f3d Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Mon, 8 Nov 2021 19:16:01 -0800 Subject: [PATCH 10/16] Lazily load magics on first cell execution --- .../kernels/wrapperkernel/sparkkernelbase.py | 19 +++++++++ .../sparkmagic/tests/test_sparkkernelbase.py | 39 ++++++++++++++++--- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 558e14f94..7ee41b32d 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -32,6 +32,10 @@ def __init__( self.language_version = language_version self.language_info = language_info + # Load magics lazily in do_execute because they are async + # because IPythonKernel doesn't support async "on start" or init functionality natively + self._has_lazily_loaded_magics = False + # Override self.session_language = session_language @@ -49,6 +53,10 @@ def __init__( # Disable warnings for test env in HDI requests.packages.urllib3.disable_warnings() + # Do not load magics in testing + if kwargs.get("testing", False): + self._has_lazily_loaded_magics = True + async def do_execute( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False ): @@ -60,6 +68,17 @@ async def f(self): code, silent, store_history, user_expressions, allow_stdin ) + # On first cell, lazily initialize magics before executing + if not self._has_lazily_loaded_magics: + await self._load_magics_extension() + await self._change_language() + + if conf.use_auto_viz(): + await self._register_auto_viz() + + self._has_lazily_loaded_magics = True + + # Execute the code and handle exceptions wrapped = async_wrap_unexpected_exceptions(f, self._complete_cell) return await wrapped(self) diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index dd6e7f0f3..04560474a 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock, call, patch from nose.tools import with_setup -from aiounittest import async_test, futurized +from aiounittest.helpers import async_test, futurized from sparkmagic.kernels.wrapperkernel.sparkkernelbase import SparkKernelBase from sparkmagic.utils.constants import LANG_PYTHON @@ -28,12 +28,14 @@ async def __call__(self, *args, **kwargs): do_shutdown_mock = None ipython_display = None code = "some spark code" -user_code_parser = MagicMock(return_value=code) class TestSparkKernel(SparkKernelBase): - def __init__(self): + def __init__(self, user_code_parser=None): kwargs = {"testing": True} + if user_code_parser is None: + user_code_parser = MagicMock(return_value=code) + super().__init__( None, None, None, None, None, LANG_PYTHON, user_code_parser, **kwargs ) @@ -42,7 +44,8 @@ def __init__(self): def _setup(): global kernel, execute_cell_mock, do_shutdown_mock, ipython_display - kernel = TestSparkKernel() + user_code_parser = MagicMock(return_value=code) + kernel = TestSparkKernel(user_code_parser) if is_async_mock_available: kernel._execute_cell_for_user = execute_cell_mock = AsyncMock( @@ -89,7 +92,7 @@ async def test_execute_valid_code(): # Verify that the execution flows through. ret = await kernel.do_execute(code, False) - user_code_parser.get_code_to_run.assert_called_once_with(code) + kernel.user_code_parser.get_code_to_run.assert_called_once_with(code) await async_assert_return_value(execute_cell_mock, ret) assert kernel._fatal_error is None # assert execute_cell_mock.called_once_with(code, True) @@ -278,3 +281,29 @@ async def test_execute_cell_for_user_ipykernel6(): got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) assert mock_ipy_execute.called assert want == got + + +@async_test +@with_setup(_setup, _teardown) +async def test_lazily_loads_magics(): + # Verify that the execution flows through. + kernel._has_lazily_loaded_magics = False + + # Set mocks + kernel._load_magics_extension = AsyncMock() + kernel._change_language = AsyncMock() + + ret = await kernel.do_execute(code, False) + + # Assert magics are loaded + assert kernel._has_lazily_loaded_magics + assert kernel._load_magics_extension.called + assert kernel._change_language.called + + # Assert code is executed + kernel.user_code_parser.get_code_to_run.assert_called_once_with(code) + await async_assert_return_value(execute_cell_mock, ret) + assert kernel._fatal_error is None + # assert execute_cell_mock.called_once_with(code, True) + await async_assert_called_once_with(execute_cell_mock, code, True) + assert ipython_display.send_error.call_count == 0 From bfd3a2fa108dfce25b12b7e23341d4f42df111ab Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Tue, 9 Nov 2021 08:46:28 -0800 Subject: [PATCH 11/16] Use asyncio event loop to load magics synchronously in init --- .../kernels/wrapperkernel/sparkkernelbase.py | 23 ++++++---------- .../sparkmagic/tests/test_sparkkernelbase.py | 26 ------------------- 2 files changed, 8 insertions(+), 41 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 7ee41b32d..f3e96a39d 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -32,10 +32,6 @@ def __init__( self.language_version = language_version self.language_info = language_info - # Load magics lazily in do_execute because they are async - # because IPythonKernel doesn't support async "on start" or init functionality natively - self._has_lazily_loaded_magics = False - # Override self.session_language = session_language @@ -55,7 +51,14 @@ def __init__( # Do not load magics in testing if kwargs.get("testing", False): - self._has_lazily_loaded_magics = True + # Get and use asyncio event loop to run async functions + # from a synchronous init function + # Python 3.6 compatibility + loop = asyncio.get_running_loop() + loop.run_until_complete(self._load_magics_extension()) + loop.run_until_complete(self._change_language()) + if conf.use_auto_viz(): + loop.run_until_complete(self._register_auto_viz()) async def do_execute( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False @@ -68,16 +71,6 @@ async def f(self): code, silent, store_history, user_expressions, allow_stdin ) - # On first cell, lazily initialize magics before executing - if not self._has_lazily_loaded_magics: - await self._load_magics_extension() - await self._change_language() - - if conf.use_auto_viz(): - await self._register_auto_viz() - - self._has_lazily_loaded_magics = True - # Execute the code and handle exceptions wrapped = async_wrap_unexpected_exceptions(f, self._complete_cell) return await wrapped(self) diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index 04560474a..b087eb985 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -281,29 +281,3 @@ async def test_execute_cell_for_user_ipykernel6(): got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) assert mock_ipy_execute.called assert want == got - - -@async_test -@with_setup(_setup, _teardown) -async def test_lazily_loads_magics(): - # Verify that the execution flows through. - kernel._has_lazily_loaded_magics = False - - # Set mocks - kernel._load_magics_extension = AsyncMock() - kernel._change_language = AsyncMock() - - ret = await kernel.do_execute(code, False) - - # Assert magics are loaded - assert kernel._has_lazily_loaded_magics - assert kernel._load_magics_extension.called - assert kernel._change_language.called - - # Assert code is executed - kernel.user_code_parser.get_code_to_run.assert_called_once_with(code) - await async_assert_return_value(execute_cell_mock, ret) - assert kernel._fatal_error is None - # assert execute_cell_mock.called_once_with(code, True) - await async_assert_called_once_with(execute_cell_mock, code, True) - assert ipython_display.send_error.call_count == 0 From a49e8c22abcfbea173e938c4d1f940db30669cd3 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Tue, 9 Nov 2021 09:40:46 -0800 Subject: [PATCH 12/16] create an event loop. do not run for tests --- .../sparkmagic/kernels/wrapperkernel/sparkkernelbase.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index f3e96a39d..20fdf2cf1 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -50,11 +50,12 @@ def __init__( requests.packages.urllib3.disable_warnings() # Do not load magics in testing - if kwargs.get("testing", False): + if not kwargs.get("testing", False): # Get and use asyncio event loop to run async functions # from a synchronous init function # Python 3.6 compatibility - loop = asyncio.get_running_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) loop.run_until_complete(self._load_magics_extension()) loop.run_until_complete(self._change_language()) if conf.use_auto_viz(): From 46b23fbb6025038c547bbf20c1a7f680cf25b75f Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Wed, 27 Apr 2022 11:06:53 -0700 Subject: [PATCH 13/16] Use nest-asyncio to keep do_execute synchronous --- sparkmagic/requirements.txt | 1 + sparkmagic/setup.py | 1 + .../kernels/wrapperkernel/sparkkernelbase.py | 109 +++++++------- .../sparkmagic/tests/test_sparkkernelbase.py | 136 ++++++------------ 4 files changed, 108 insertions(+), 139 deletions(-) diff --git a/sparkmagic/requirements.txt b/sparkmagic/requirements.txt index 0230efdb0..426d7b2b5 100644 --- a/sparkmagic/requirements.txt +++ b/sparkmagic/requirements.txt @@ -11,3 +11,4 @@ ipywidgets>5.0.0 notebook>=4.2 tornado>=4 requests_kerberos>=0.8.0 +nest_asyncio==1.5.5 diff --git a/sparkmagic/setup.py b/sparkmagic/setup.py index 5f6d61cf9..93eff939f 100644 --- a/sparkmagic/setup.py +++ b/sparkmagic/setup.py @@ -94,5 +94,6 @@ def version(path): "notebook>=4.2", "tornado>=4", "requests_kerberos>=0.8.0", + "nest_asyncio==1.5.5", ], ) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 20fdf2cf1..1727e1e95 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -9,9 +9,19 @@ import sparkmagic.utils.configuration as conf from sparkmagic.utils.sparklogger import SparkLog -from sparkmagic.livyclientlib.exceptions import async_wrap_unexpected_exceptions +from sparkmagic.livyclientlib.exceptions import wrap_unexpected_exceptions from sparkmagic.kernels.wrapperkernel.usercodeparser import UserCodeParser +# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 +import nest_asyncio + + +# NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 +def run_sync(task): + loop = asyncio.get_event_loop() + result = loop.run_until_complete(task) + return result + class SparkKernelBase(IPythonKernel): def __init__( @@ -37,7 +47,7 @@ def __init__( super().__init__(**kwargs) - self.logger = SparkLog(u"{}_jupyter_kernel".format(self.session_language)) + self.logger = SparkLog("{}_jupyter_kernel".format(self.session_language)) self._fatal_error = None self.ipython_display = IpythonDisplay() @@ -49,53 +59,53 @@ def __init__( # Disable warnings for test env in HDI requests.packages.urllib3.disable_warnings() + # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 + # Patch loop.run_until_complete + nest_asyncio.apply() + # Do not load magics in testing - if not kwargs.get("testing", False): - # Get and use asyncio event loop to run async functions - # from a synchronous init function - # Python 3.6 compatibility - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(self._load_magics_extension()) - loop.run_until_complete(self._change_language()) - if conf.use_auto_viz(): - loop.run_until_complete(self._register_auto_viz()) - - async def do_execute( + if kwargs.get("testing", False): + return + + # Load magics on init + self._load_magics_extension() + self._change_language() + if conf.use_auto_viz(): + self._register_auto_viz() + + def do_execute( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False ): - async def f(self): + def f(self): if self._fatal_error is not None: - return await self._repeat_fatal_error() + return self._repeat_fatal_error() - return await self._do_execute( + return self._do_execute( code, silent, store_history, user_expressions, allow_stdin ) # Execute the code and handle exceptions - wrapped = async_wrap_unexpected_exceptions(f, self._complete_cell) - return await wrapped(self) + wrapped = wrap_unexpected_exceptions(f, self._complete_cell) + return wrapped(self) - async def do_shutdown(self, restart): + def do_shutdown(self, restart): # Cleanup - await self._delete_session() + self._delete_session() - return await self._do_shutdown_ipykernel(restart) + return self._do_shutdown_ipykernel(restart) - async def _do_execute( - self, code, silent, store_history, user_expressions, allow_stdin - ): + def _do_execute(self, code, silent, store_history, user_expressions, allow_stdin): code_to_run = self.user_code_parser.get_code_to_run(code) - res = await self._execute_cell( + res = self._execute_cell( code_to_run, silent, store_history, user_expressions, allow_stdin ) return res - async def _load_magics_extension(self): + def _load_magics_extension(self): register_magics_code = "%load_ext sparkmagic.kernels" - await self._execute_cell( + self._execute_cell( register_magics_code, True, False, @@ -104,11 +114,11 @@ async def _load_magics_extension(self): ) self.logger.debug("Loaded magics.") - async def _change_language(self): + def _change_language(self): register_magics_code = "%%_do_not_call_change_language -l {}\n ".format( self.session_language ) - await self._execute_cell( + self._execute_cell( register_magics_code, True, False, @@ -119,7 +129,7 @@ async def _change_language(self): ) self.logger.debug("Changed language.") - async def _register_auto_viz(self): + def _register_auto_viz(self): from sparkmagic.utils.sparkevents import get_spark_events_handler import autovizwidget.utils.configuration as c @@ -129,7 +139,7 @@ async def _register_auto_viz(self): register_auto_viz_code = """from autovizwidget.widget.utils import display_dataframe ip = get_ipython() ip.display_formatter.ipython_display_formatter.for_type_by_name('pandas.core.frame', 'DataFrame', display_dataframe)""" - await self._execute_cell( + self._execute_cell( register_auto_viz_code, True, False, @@ -138,11 +148,11 @@ async def _register_auto_viz(self): ) self.logger.debug("Registered auto viz.") - async def _delete_session(self): + def _delete_session(self): code = "%%_do_not_call_delete_session\n " - await self._execute_cell_for_user(code, True, False) + self._execute_cell_for_user(code, True, False) - async def _execute_cell( + def _execute_cell( self, code, silent, @@ -152,21 +162,21 @@ async def _execute_cell( shutdown_if_error=False, log_if_error=None, ): - reply_content = await self._execute_cell_for_user( + reply_content = self._execute_cell_for_user( code, silent, store_history, user_expressions, allow_stdin ) - if shutdown_if_error and reply_content[u"status"] == u"error": - error_from_reply = reply_content[u"evalue"] + if shutdown_if_error and reply_content["status"] == "error": + error_from_reply = reply_content["evalue"] if log_if_error is not None: message = '{}\nException details:\n\t"{}"'.format( log_if_error, error_from_reply ) - return await self._abort_with_fatal_error(message) + return self._abort_with_fatal_error(message) return reply_content - async def _execute_cell_for_user( + def _execute_cell_for_user( self, code, silent, store_history=True, user_expressions=None, allow_stdin=False ): result = super().do_execute( @@ -175,7 +185,7 @@ async def _execute_cell_for_user( # In ipykernel 6, this returns native asyncio coroutine if asyncio.iscoroutine(result): - return await result + return run_sync(result) # In ipykernel 5, this returns gen.coroutine if asyncio.isfuture(result): @@ -184,24 +194,23 @@ async def _execute_cell_for_user( # In ipykernel 4, this func is synchronous return result - async def _do_shutdown_ipykernel(self, restart): - # INVESTIGATE: Inspect if should await + def _do_shutdown_ipykernel(self, restart): result = super().do_shutdown(restart) # In tests, super() calls this SparkKernelBase.do_shutdown, which is async if asyncio.iscoroutine(result): - return await result + return run_sync(result) return result - async def _complete_cell(self): + def _complete_cell(self): """A method that runs a cell with no effect. Call this and return the value it returns when there's some sort of error preventing the user's cell from executing; this will register the cell from the Jupyter UI as being completed. """ - return await self._execute_cell("None", False, True, None, False) + return self._execute_cell("None", False, True, None, False) def _show_user_error(self, message): self.logger.error(message) @@ -216,14 +225,14 @@ def _queue_fatal_error(self, message): """ self._fatal_error = message - async def _abort_with_fatal_error(self, message): + def _abort_with_fatal_error(self, message): """Queues up a fatal error and throws it immediately.""" self._queue_fatal_error(message) - return await self._repeat_fatal_error() + return self._repeat_fatal_error() - async def _repeat_fatal_error(self): + def _repeat_fatal_error(self): """Throws an error that has already been queued.""" error = conf.fatal_error_suggestion().format(self._fatal_error) self.logger.error(error) self.ipython_display.send_error(error) - return await self._complete_cell() + return self._complete_cell() diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index b087eb985..e01f1c05b 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -47,20 +47,10 @@ def _setup(): user_code_parser = MagicMock(return_value=code) kernel = TestSparkKernel(user_code_parser) - if is_async_mock_available: - kernel._execute_cell_for_user = execute_cell_mock = AsyncMock( - return_value={"test": "ing", "a": "b", "status": "ok"} - ) - kernel._do_shutdown_ipykernel = do_shutdown_mock = AsyncMock() - else: - # Use futurized return values to keep async func behavior - kernel._do_shutdown_ipykernel = do_shutdown_mock = MagicMock( - return_value=futurized(None) - ) - kernel._execute_cell_for_user = execute_cell_mock = MagicMock( - return_value=futurized({"test": "ing", "a": "b", "status": "ok"}) - ) - + kernel._execute_cell_for_user = execute_cell_mock = MagicMock( + return_value={"test": "ing", "a": "b", "status": "ok"} + ) + kernel._do_shutdown_ipykernel = do_shutdown_mock = MagicMock() kernel.ipython_display = ipython_display = MagicMock() @@ -68,118 +58,92 @@ def _teardown(): pass -# Helper for calling assert_called_once_with on native AsyncMock vs Mock w/ futurized return values -async def async_assert_called_once_with(mock, *values): - result = mock.called_once_with(*values) - if asyncio.iscoroutine(result): - result = await result - - assert result - - -# Helper for asserting return values on native AsyncMock vs Mock w/ futurized return values -async def async_assert_return_value(mock, value): - return_value = mock.return_value - if asyncio.isfuture(return_value): - return_value = await return_value - - assert return_value is value - - -@async_test @with_setup(_setup, _teardown) -async def test_execute_valid_code(): +def test_execute_valid_code(): # Verify that the execution flows through. - ret = await kernel.do_execute(code, False) + ret = kernel.do_execute(code, False) kernel.user_code_parser.get_code_to_run.assert_called_once_with(code) - await async_assert_return_value(execute_cell_mock, ret) + assert execute_cell_mock.called_once_with(ret, True) + assert execute_cell_mock.return_value is ret assert kernel._fatal_error is None - # assert execute_cell_mock.called_once_with(code, True) - await async_assert_called_once_with(execute_cell_mock, code, True) + + assert execute_cell_mock.called_once_with(code, True) assert ipython_display.send_error.call_count == 0 -@async_test @with_setup(_setup, _teardown) -async def test_execute_throws_if_fatal_error_happened(): +def test_execute_throws_if_fatal_error_happened(): # Verify that if a fatal error already happened, we don't run the code and show the fatal error instead. fatal_error = "Error." kernel._fatal_error = fatal_error - ret = await kernel.do_execute(code, False) + ret = kernel.do_execute(code, False) - await async_assert_return_value(execute_cell_mock, ret) + assert execute_cell_mock.return_value is ret assert kernel._fatal_error == fatal_error - # assert execute_cell_mock.called_once_with("None", True) - await async_assert_called_once_with(execute_cell_mock, "None", True) + assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 -@async_test @with_setup(_setup, _teardown) -async def test_execute_alerts_user_if_an_unexpected_error_happens(): +def test_execute_alerts_user_if_an_unexpected_error_happens(): # Verify that developer error shows developer error (the Github link). # Because do_execute is so minimal, we'll assume we have a bug in the _repeat_fatal_error method kernel._fatal_error = "Something bad happened before" - kernel._repeat_fatal_error = AsyncMock(side_effect=ValueError) + kernel._repeat_fatal_error = MagicMock(side_effect=ValueError) - ret = await kernel.do_execute(code, False) - await async_assert_return_value(execute_cell_mock, ret) - # assert execute_cell_mock.called_once_with("None", True) - await async_assert_called_once_with(execute_cell_mock, "None", True) + ret = kernel.do_execute(code, False) + assert execute_cell_mock.return_value is ret + assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 -@async_test @with_setup(_setup, _teardown) -async def test_execute_throws_if_fatal_error_happens_for_execution(): +def test_execute_throws_if_fatal_error_happens_for_execution(): # Verify that the kernel sends the error from Python execution's context to the user - fatal_error = u"Error." + fatal_error = "Error." message = '{}\nException details:\n\t"{}"'.format(fatal_error, fatal_error) reply_content = dict() - reply_content[u"status"] = u"error" - reply_content[u"evalue"] = fatal_error + reply_content["status"] = "error" + reply_content["evalue"] = fatal_error execute_cell_mock.return_value = ( reply_content if is_async_mock_available else futurized(reply_content) ) - ret = await kernel._execute_cell( + ret = kernel._execute_cell( code, False, shutdown_if_error=True, log_if_error=fatal_error ) - await async_assert_return_value(execute_cell_mock, ret) + assert execute_cell_mock.return_value is ret assert kernel._fatal_error == message - # assert execute_cell_mock.called_once_with("None", True) - await async_assert_called_once_with(execute_cell_mock, "None", True) + assert execute_cell_mock.called_once_with("None", True) assert ipython_display.send_error.call_count == 1 -@async_test @with_setup(_setup, _teardown) -async def test_shutdown_cleans_up(): +def test_shutdown_cleans_up(): # No restart - kernel._execute_cell_for_user = ecfu_m = AsyncMock() - kernel._do_shutdown_ipykernel = dsi_m = AsyncMock() + kernel._execute_cell_for_user = ecfu_m = MagicMock() + kernel._do_shutdown_ipykernel = dsi_m = MagicMock() - await kernel.do_shutdown(False) + kernel.do_shutdown(False) ecfu_m.assert_called_once_with("%%_do_not_call_delete_session\n ", True, False) dsi_m.assert_called_once_with(False) # On restart - kernel._execute_cell_for_user = ecfu_m = AsyncMock() - kernel._do_shutdown_ipykernel = dsi_m = AsyncMock() + kernel._execute_cell_for_user = ecfu_m = MagicMock() + kernel._do_shutdown_ipykernel = dsi_m = MagicMock() - await kernel.do_shutdown(True) + kernel.do_shutdown(True) ecfu_m.assert_called_once_with("%%_do_not_call_delete_session\n ", True, False) dsi_m.assert_called_once_with(True) -@async_test @with_setup(_setup, _teardown) -async def test_register_auto_viz(): - await kernel._register_auto_viz() +def test_register_auto_viz(): + kernel._register_auto_viz() assert ( call( @@ -194,10 +158,9 @@ async def test_register_auto_viz(): ) -@async_test @with_setup(_setup, _teardown) -async def test_change_language(): - await kernel._change_language() +def test_change_language(): + kernel._change_language() assert ( call( @@ -211,10 +174,9 @@ async def test_change_language(): ) -@async_test @with_setup(_setup, _teardown) -async def test_load_magics(): - await kernel._load_magics_extension() +def test_load_magics(): + kernel._load_magics_extension() assert ( call("%load_ext sparkmagic.kernels", True, False, None, False) @@ -222,10 +184,9 @@ async def test_load_magics(): ) -@async_test @with_setup(_setup, _teardown) -async def test_delete_session(): - await kernel._delete_session() +def test_delete_session(): + kernel._delete_session() assert ( call("%%_do_not_call_delete_session\n ", True, False) @@ -233,9 +194,8 @@ async def test_delete_session(): ) -@async_test @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel4(): +def test_execute_cell_for_user_ipykernel4(): want = {"status": "OK"} # Can't use patch decorator because # it fails to patch async functions in Python < 3.8 @@ -244,15 +204,14 @@ async def test_execute_cell_for_user_ipykernel4(): new_callable=MagicMock, return_value=want, ) as mock_ipy_execute: - got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) assert mock_ipy_execute.called assert want == got -@async_test @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel5(): +def test_execute_cell_for_user_ipykernel5(): want = {"status": "OK"} # Can't use patch decorator because # it fails to patch async functions in Python < 3.8 @@ -263,21 +222,20 @@ async def test_execute_cell_for_user_ipykernel5(): mock_ipy_execute.return_value = asyncio.Future() mock_ipy_execute.return_value.set_result(want) - got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) assert mock_ipy_execute.called assert want == got -@async_test @with_setup(_teardown) -async def test_execute_cell_for_user_ipykernel6(): +def test_execute_cell_for_user_ipykernel6(): want = {"status": "OK"} # Can't use patch decorator because # it fails to patch async functions in Python < 3.8 with patch( "ipykernel.ipkernel.IPythonKernel.do_execute", return_value=want ) as mock_ipy_execute: - got = await TestSparkKernel()._execute_cell_for_user(code="1", silent=True) + got = TestSparkKernel()._execute_cell_for_user(code="1", silent=True) assert mock_ipy_execute.called assert want == got From 8744e47a8f58af49b4bf0420576e1dd5bded97db Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Wed, 27 Apr 2022 13:09:34 -0700 Subject: [PATCH 14/16] Remove async do_execute code from tests --- .github/workflows/tests.yml | 1 - .../sparkmagic/tests/test_sparkkernelbase.py | 21 ++----------------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ec60ac6c9..e8a90d2c4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -25,7 +25,6 @@ jobs: pip install -r hdijupyterutils/requirements.txt -e hdijupyterutils pip install -r autovizwidget/requirements.txt -e autovizwidget pip install -r sparkmagic/requirements.txt -e sparkmagic - pip install aiounittest - name: Run hdijupyterutils tests run: | nosetests hdijupyterutils diff --git a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py index e01f1c05b..7a2659746 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkkernelbase.py @@ -3,26 +3,10 @@ from unittest.mock import MagicMock, call, patch from nose.tools import with_setup -from aiounittest.helpers import async_test, futurized from sparkmagic.kernels.wrapperkernel.sparkkernelbase import SparkKernelBase from sparkmagic.utils.constants import LANG_PYTHON -# AsyncMock is only available in Python >= 3.8 -# This helps modify tests to accommodate older Python version -is_async_mock_available = True -try: - # Python >= 3.8 - from unittest.mock import AsyncMock -except Exception: - # Patch - is_async_mock_available = False - - class AsyncMock(MagicMock): - async def __call__(self, *args, **kwargs): - return super(AsyncMock, self).__call__(*args, **kwargs) - - kernel = None execute_cell_mock = None do_shutdown_mock = None @@ -107,9 +91,8 @@ def test_execute_throws_if_fatal_error_happens_for_execution(): reply_content = dict() reply_content["status"] = "error" reply_content["evalue"] = fatal_error - execute_cell_mock.return_value = ( - reply_content if is_async_mock_available else futurized(reply_content) - ) + + execute_cell_mock.return_value = reply_content ret = kernel._execute_cell( code, False, shutdown_if_error=True, log_if_error=fatal_error From 3d4ed1a6cc2e863bd1ea73d0252bd31bafd0f12d Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Wed, 27 Apr 2022 13:24:06 -0700 Subject: [PATCH 15/16] Use run_sync from jupyter/notebook --- .../kernels/wrapperkernel/sparkkernelbase.py | 61 ++++++++++++++++--- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py index 1727e1e95..8bdb3f59b 100644 --- a/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py +++ b/sparkmagic/sparkmagic/kernels/wrapperkernel/sparkkernelbase.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 aggftw@gmail.com # Distributed under the terms of the Modified BSD License. import asyncio +import inspect import requests @@ -17,10 +18,49 @@ # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 -def run_sync(task): - loop = asyncio.get_event_loop() - result = loop.run_until_complete(task) - return result +# Taken from: https://github.com/jupyter/notebook/blob/eb3a1c24839205afcef0ba65ace2309d38300a2b/notebook/utils.py#L332 +def run_sync(maybe_async): + """If async, runs maybe_async and blocks until it has executed, + possibly creating an event loop. + If not async, just returns maybe_async as it is the result of something + that has already executed. + Parameters + ---------- + maybe_async : async or non-async object + The object to be executed, if it is async. + Returns + ------- + result : + Whatever the async object returns, or the object itself. + """ + if not inspect.isawaitable(maybe_async): + # that was not something async, just return it + return maybe_async + # it is async, we need to run it in an event loop + + def wrapped(): + create_new_event_loop = False + result = None + loop = None + try: + loop = asyncio.get_event_loop() + except RuntimeError: + create_new_event_loop = True + else: + if loop.is_closed(): + create_new_event_loop = True + if create_new_event_loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(maybe_async) + except RuntimeError as e: + if str(e) == "This event loop is already running": + # just return a Future, hoping that it will be awaited + result = asyncio.ensure_future(maybe_async) + return result + + return wrapped() class SparkKernelBase(IPythonKernel): @@ -45,6 +85,15 @@ def __init__( # Override self.session_language = session_language + # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 + # Patch loop.run_until_complete as early as possible + try: + nest_asyncio.apply() + except RuntimeError: + # nest_asyncio requires a running loop in order to patch. + # In tests the loop may not have been created yet. + pass + super().__init__(**kwargs) self.logger = SparkLog("{}_jupyter_kernel".format(self.session_language)) @@ -59,10 +108,6 @@ def __init__( # Disable warnings for test env in HDI requests.packages.urllib3.disable_warnings() - # NOTE: This is a (hopefully) temporary workaround to accommodate async do_execute in ipykernel>=6 - # Patch loop.run_until_complete - nest_asyncio.apply() - # Do not load magics in testing if kwargs.get("testing", False): return From c0b8aaa0320b1d2aebc394311814913fb8cf28e1 Mon Sep 17 00:00:00 2001 From: Devin Stein Date: Fri, 29 Apr 2022 11:26:08 -0700 Subject: [PATCH 16/16] Run black --- sparkmagic/sparkmagic/livyclientlib/exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sparkmagic/sparkmagic/livyclientlib/exceptions.py b/sparkmagic/sparkmagic/livyclientlib/exceptions.py index c34f1b089..404269b80 100644 --- a/sparkmagic/sparkmagic/livyclientlib/exceptions.py +++ b/sparkmagic/sparkmagic/livyclientlib/exceptions.py @@ -153,7 +153,7 @@ def fn(self, ...): def handle_exception(self, e): self.logger.error( - u"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( + "ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( e, traceback.format_exc() ) ) @@ -191,7 +191,7 @@ async def fn(self, ...): async def handle_exception(self, e): self.logger.error( - u"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( + "ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format( e, traceback.format_exc() ) )