From a1347eea2550aa2f24c8aac72a7c3fe92650791b Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Fri, 13 Oct 2023 15:46:44 -0400 Subject: [PATCH 1/6] refactor!: completely wipe out pyuv pynvim has been using asyncio as the only available event loop implementation, since python 3.4. Remove all the pyuv-related codes --- pynvim/msgpack_rpc/event_loop/__init__.py | 2 +- pynvim/msgpack_rpc/event_loop/asyncio.py | 13 +-- pynvim/msgpack_rpc/event_loop/base.py | 4 + pynvim/msgpack_rpc/event_loop/uv.py | 124 ---------------------- setup.py | 1 - tox.ini | 1 - 6 files changed, 8 insertions(+), 137 deletions(-) delete mode 100644 pynvim/msgpack_rpc/event_loop/uv.py diff --git a/pynvim/msgpack_rpc/event_loop/__init__.py b/pynvim/msgpack_rpc/event_loop/__init__.py index e94cdbfe..1cf40a77 100644 --- a/pynvim/msgpack_rpc/event_loop/__init__.py +++ b/pynvim/msgpack_rpc/event_loop/__init__.py @@ -1,6 +1,6 @@ """Event loop abstraction subpackage. -Tries to use pyuv as a backend, falling back to the asyncio implementation. +We use python's built-in asyncio as the backend. """ from pynvim.msgpack_rpc.event_loop.asyncio import AsyncioEventLoop as EventLoop diff --git a/pynvim/msgpack_rpc/event_loop/asyncio.py b/pynvim/msgpack_rpc/event_loop/asyncio.py index 164173b8..3a8a155d 100644 --- a/pynvim/msgpack_rpc/event_loop/asyncio.py +++ b/pynvim/msgpack_rpc/event_loop/asyncio.py @@ -1,12 +1,4 @@ -"""Event loop implementation that uses the `asyncio` standard module. - -The `asyncio` module was added to python standard library on 3.4, and it -provides a pure python implementation of an event loop library. It is used -as a fallback in case pyuv is not available(on python implementations other -than CPython). - -""" -from __future__ import absolute_import +"""Event loop implementation that uses the `asyncio` standard module.""" import asyncio import logging @@ -22,9 +14,10 @@ debug, info, warn = (logger.debug, logger.info, logger.warning,) loop_cls = asyncio.SelectorEventLoop + if os.name == 'nt': + import msvcrt # pylint: disable=import-error from asyncio.windows_utils import PipeHandle # type: ignore[attr-defined] - import msvcrt # On windows use ProactorEventLoop which support pipes and is backed by the # more powerful IOCP facility diff --git a/pynvim/msgpack_rpc/event_loop/base.py b/pynvim/msgpack_rpc/event_loop/base.py index 86fde9c2..33bbf698 100644 --- a/pynvim/msgpack_rpc/event_loop/base.py +++ b/pynvim/msgpack_rpc/event_loop/base.py @@ -28,6 +28,10 @@ Literal['child'] ] +# TODO: Since pynvim now supports python 3, the only available backend of the +# msgpack_rpc BaseEventLoop is the built-in asyncio (see #294). We will have +# to remove some unnecessary abstractions as well as greenlet. See also #489 + class BaseEventLoop(ABC): diff --git a/pynvim/msgpack_rpc/event_loop/uv.py b/pynvim/msgpack_rpc/event_loop/uv.py deleted file mode 100644 index 969187ee..00000000 --- a/pynvim/msgpack_rpc/event_loop/uv.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Event loop implementation that uses pyuv(libuv-python bindings).""" -import sys -from collections import deque - -import pyuv - -from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop - - -class UvEventLoop(BaseEventLoop): - - """`BaseEventLoop` subclass that uses `pvuv` as a backend.""" - - def _init(self): - self._loop = pyuv.Loop() - self._async = pyuv.Async(self._loop, self._on_async) - self._connection_error = None - self._error_stream = None - self._callbacks = deque() - - def _on_connect(self, stream, error): - self.stop() - if error: - msg = 'Cannot connect to {}: {}'.format( - self._connect_address, pyuv.errno.strerror(error)) - self._connection_error = OSError(msg) - return - self._read_stream = self._write_stream = stream - - def _on_read(self, handle, data, error): - if error or not data: - msg = pyuv.errno.strerror(error) if error else 'EOF' - self._on_error(msg) - return - if handle == self._error_stream: - return - self._on_data(data) - - def _on_write(self, handle, error): - if error: - msg = pyuv.errno.strerror(error) - self._on_error(msg) - - def _on_exit(self, handle, exit_status, term_signal): - self._on_error('EOF') - - def _disconnected(self, *args): - raise OSError('Not connected to Nvim') - - def _connect_tcp(self, address, port): - stream = pyuv.TCP(self._loop) - self._connect_address = '{}:{}'.format(address, port) - stream.connect((address, port), self._on_connect) - - def _connect_socket(self, path): - stream = pyuv.Pipe(self._loop) - self._connect_address = path - stream.connect(path, self._on_connect) - - def _connect_stdio(self): - self._read_stream = pyuv.Pipe(self._loop) - self._read_stream.open(sys.stdin.fileno()) - self._write_stream = pyuv.Pipe(self._loop) - self._write_stream.open(sys.stdout.fileno()) - - def _connect_child(self, argv): - self._write_stream = pyuv.Pipe(self._loop) - self._read_stream = pyuv.Pipe(self._loop) - self._error_stream = pyuv.Pipe(self._loop) - stdin = pyuv.StdIO(self._write_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_READABLE_PIPE) - stdout = pyuv.StdIO(self._read_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE) - stderr = pyuv.StdIO(self._error_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE) - pyuv.Process.spawn(self._loop, - args=argv, - exit_callback=self._on_exit, - flags=pyuv.UV_PROCESS_WINDOWS_HIDE, - stdio=(stdin, stdout, stderr,)) - self._error_stream.start_read(self._on_read) - - def _start_reading(self): - if self._transport_type in ['tcp', 'socket']: - self._loop.run() - if self._connection_error: - self.run = self.send = self._disconnected - raise self._connection_error - self._read_stream.start_read(self._on_read) - - def _send(self, data): - self._write_stream.write(data, self._on_write) - - def _run(self): - self._loop.run(pyuv.UV_RUN_DEFAULT) - - def _stop(self): - self._loop.stop() - - def _close(self): - pass - - def _threadsafe_call(self, fn): - self._callbacks.append(fn) - self._async.send() - - def _on_async(self, handle): - while self._callbacks: - self._callbacks.popleft()() - - def _setup_signals(self, signals): - self._signal_handles = [] - - def handler(h, signum): - self._on_signal(signum) - - for signum in signals: - handle = pyuv.Signal(self._loop) - handle.start(handler, signum) - self._signal_handles.append(handle) - - def _teardown_signals(self): - for handle in self._signal_handles: - handle.stop() diff --git a/setup.py b/setup.py index dea5ae0f..6b3a615d 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,6 @@ ] extras_require = { - 'pyuv': ['pyuv>=1.0.0'], 'test': tests_require, } diff --git a/tox.ini b/tox.ini index a57a7909..2252504c 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,6 @@ extras = test deps = pytest-timeout # cov: pytest-cov -# pyuv: pyuv # setenv = # cov: PYTEST_ADDOPTS=--cov=. {env:PYTEST_ADDOPTS:} # passenv = PYTEST_ADDOPTS From be5810bba55794104a944a9dbcfde04aed9895ab Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Sun, 15 Oct 2023 02:53:34 -0400 Subject: [PATCH 2/6] refactor: expose event loop as a property rather than a field event loops for Session and Nvim are final and can't be changed. --- pynvim/api/nvim.py | 13 ++++++++++++- pynvim/msgpack_rpc/session.py | 9 +++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pynvim/api/nvim.py b/pynvim/api/nvim.py index b1b75eb8..b5ea84b7 100644 --- a/pynvim/api/nvim.py +++ b/pynvim/api/nvim.py @@ -1,4 +1,6 @@ """Main Nvim interface.""" + +import asyncio import os import sys import threading @@ -140,7 +142,16 @@ def __init__( self._err_cb: Callable[[str], Any] = lambda _: None else: self._err_cb = err_cb - self.loop = self._session.loop._loop + + @property + def loop(self) -> asyncio.AbstractEventLoop: + """Get the event loop (exposed to rplugins).""" # noqa + + # see #294: for python 3.4+, the only available and guaranteed + # implementation of msgpack_rpc BaseEventLoop is the AsyncioEventLoop. + # The underlying asyncio event loop is exposed to rplugins. + # pylint: disable=protected-access + return self._session.loop._loop # type: ignore def _from_nvim(self, obj: Any, decode: Optional[TDecodeMode] = None) -> Any: if decode is None: diff --git a/pynvim/msgpack_rpc/session.py b/pynvim/msgpack_rpc/session.py index 453f218b..4e8f6259 100644 --- a/pynvim/msgpack_rpc/session.py +++ b/pynvim/msgpack_rpc/session.py @@ -11,6 +11,7 @@ from pynvim.compat import check_async from pynvim.msgpack_rpc.async_session import AsyncSession +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop if sys.version_info < (3, 8): from typing_extensions import Literal @@ -42,7 +43,7 @@ class Notification(NamedTuple): Message = Union[Request, Notification] -class Session(object): +class Session: """Msgpack-rpc session layer that uses coroutines for a synchronous API. @@ -59,11 +60,15 @@ def __init__(self, async_session: AsyncSession): self._pending_messages: Deque[Message] = deque() self._is_running = False self._setup_exception: Optional[Exception] = None - self.loop = async_session.loop self._loop_thread: Optional[threading.Thread] = None self.error_wrapper: Callable[[Tuple[int, str]], Exception] = \ lambda e: Exception(e[1]) + @property + def loop(self) -> BaseEventLoop: + """Get the underlying msgpack EventLoop.""" + return self._async_session.loop + def threadsafe_call( self, fn: Callable[..., Any], *args: Any, **kwargs: Any ) -> None: From 2059684da75ef9d0910379eb12ed438a96618873 Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Fri, 13 Oct 2023 14:08:02 -0400 Subject: [PATCH 3/6] refactor: use async coroutine and add debugging statements --- pynvim/msgpack_rpc/event_loop/asyncio.py | 77 ++++++++++++++---------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/pynvim/msgpack_rpc/event_loop/asyncio.py b/pynvim/msgpack_rpc/event_loop/asyncio.py index 3a8a155d..24fa2df5 100644 --- a/pynvim/msgpack_rpc/event_loop/asyncio.py +++ b/pynvim/msgpack_rpc/event_loop/asyncio.py @@ -25,6 +25,8 @@ loop_cls = asyncio.ProactorEventLoop # type: ignore[attr-defined,misc] +# pylint: disable=logging-fstring-interpolation + class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol, asyncio.SubprocessProtocol): """`BaseEventLoop` subclass that uses `asyncio` as a backend.""" @@ -42,6 +44,7 @@ def connection_made(self, transport): def connection_lost(self, exc): """Used to signal `asyncio.Protocol` of a lost connection.""" + debug(f"connection_lost: exc = {exc}") self._on_error(exc.args[0] if exc else 'EOF') def data_received(self, data: bytes) -> None: @@ -71,6 +74,7 @@ def pipe_data_received(self, fd, data): def process_exited(self) -> None: """Used to signal `asyncio.SubprocessProtocol` when the child exits.""" + debug("process_exited") self._on_error('EOF') def _init(self) -> None: @@ -81,50 +85,61 @@ def _init(self) -> None: self._child_watcher = None def _connect_tcp(self, address: str, port: int) -> None: - coroutine = self._loop.create_connection(self._fact, address, port) - self._loop.run_until_complete(coroutine) + async def connect_tcp(): + await self._loop.create_connection(self._fact, address, port) + debug(f"tcp connection successful: {address}:{port}") + + self._loop.run_until_complete(connect_tcp()) def _connect_socket(self, path: str) -> None: - if os.name == 'nt': - coroutine = self._loop.create_pipe_connection( # type: ignore[attr-defined] - self._fact, path - ) - else: - coroutine = self._loop.create_unix_connection(self._fact, path) - self._loop.run_until_complete(coroutine) + async def connect_socket(): + if os.name == 'nt': + transport, _ = await self._loop.create_pipe_connection(self._fact, path) + else: + transport, _ = await self._loop.create_unix_connection(self._fact, path) + debug("socket connection successful: %s", transport) + + self._loop.run_until_complete(connect_socket()) def _connect_stdio(self) -> None: - if os.name == 'nt': - pipe: Any = PipeHandle( - msvcrt.get_osfhandle(sys.stdin.fileno()) # type: ignore[attr-defined] - ) - else: - pipe = sys.stdin - coroutine = self._loop.connect_read_pipe(self._fact, pipe) - self._loop.run_until_complete(coroutine) - debug("native stdin connection successful") + async def connect_stdin(): + if os.name == 'nt': + pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno())) + else: + pipe = sys.stdin + await self._loop.connect_read_pipe(self._fact, pipe) + debug("native stdin connection successful") + self._loop.run_until_complete(connect_stdin()) # Make sure subprocesses don't clobber stdout, # send the output to stderr instead. rename_stdout = os.dup(sys.stdout.fileno()) os.dup2(sys.stderr.fileno(), sys.stdout.fileno()) - if os.name == 'nt': - pipe = PipeHandle( - msvcrt.get_osfhandle(rename_stdout) # type: ignore[attr-defined] - ) - else: - pipe = os.fdopen(rename_stdout, 'wb') - coroutine = self._loop.connect_write_pipe(self._fact, pipe) # type: ignore[assignment] - self._loop.run_until_complete(coroutine) - debug("native stdout connection successful") + async def connect_stdout(): + if os.name == 'nt': + pipe = PipeHandle(msvcrt.get_osfhandle(rename_stdout)) + else: + pipe = os.fdopen(rename_stdout, 'wb') + + await self._loop.connect_write_pipe(self._fact, pipe) + debug("native stdout connection successful") + + self._loop.run_until_complete(connect_stdout()) def _connect_child(self, argv: List[str]) -> None: if os.name != 'nt': - self._child_watcher = asyncio.get_child_watcher() - self._child_watcher.attach_loop(self._loop) - coroutine = self._loop.subprocess_exec(self._fact, *argv) - self._loop.run_until_complete(coroutine) + # see #238, #241 + _child_watcher = asyncio.get_child_watcher() + _child_watcher.attach_loop(self._loop) + + async def create_subprocess(): + transport: asyncio.SubprocessTransport + transport, protocol = await self._loop.subprocess_exec(self._fact, *argv) + pid = transport.get_pid() + debug("child subprocess_exec successful, PID = %s", pid) + + self._loop.run_until_complete(create_subprocess()) def _start_reading(self) -> None: pass From 3f5f532f75e7e00b94dddb9c58e39bfb9efdc9ab Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Sat, 14 Oct 2023 02:03:05 -0400 Subject: [PATCH 4/6] refactor: separate asyncio Protocol from AsyncioEventLoop - Separate Protocol from AsyncioEventLoop (which were too complex). This makes it much clearer to discern which methods are asyncio- specific specializations of the abstract base event loop class (used for managing the lifecycle of event loop itself) and which serves as callback functions for IPC communication. - Document the design and the lifecycle of the BaseEventLoop class. Although asyncio is the only existing implementation, the current behavior or abstraction is documented (until further refactorings) to avoid confusions and clarify how the subclass should be implemented. - Use `typing.override` in the AsyncioEventLoop subclass (requires typing-extensions >= 4.5.0). --- pynvim/msgpack_rpc/event_loop/asyncio.py | 162 +++++++++++++++++------ pynvim/msgpack_rpc/event_loop/base.py | 97 ++++++++------ pynvim/msgpack_rpc/msgpack_stream.py | 13 +- setup.cfg | 2 +- setup.py | 4 +- 5 files changed, 184 insertions(+), 94 deletions(-) diff --git a/pynvim/msgpack_rpc/event_loop/asyncio.py b/pynvim/msgpack_rpc/event_loop/asyncio.py index 24fa2df5..a801a326 100644 --- a/pynvim/msgpack_rpc/event_loop/asyncio.py +++ b/pynvim/msgpack_rpc/event_loop/asyncio.py @@ -6,9 +6,14 @@ import sys from collections import deque from signal import Signals -from typing import Any, Callable, Deque, List, Optional +from typing import Any, Callable, Deque, List, Optional, cast -from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop +if sys.version_info >= (3, 12): + from typing import Final, override +else: + from typing_extensions import Final, override + +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop, TTransportType logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) @@ -27,88 +32,136 @@ # pylint: disable=logging-fstring-interpolation -class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol, - asyncio.SubprocessProtocol): - """`BaseEventLoop` subclass that uses `asyncio` as a backend.""" +class Protocol(asyncio.Protocol, asyncio.SubprocessProtocol): + """The protocol class used for asyncio-based RPC communication.""" - _queued_data: Deque[bytes] - if os.name != 'nt': - _child_watcher: Optional['asyncio.AbstractChildWatcher'] + def __init__(self, on_data, on_error): + """Initialize the Protocol object.""" + assert on_data is not None + assert on_error is not None + self._on_data = on_data + self._on_error = on_error + @override def connection_made(self, transport): """Used to signal `asyncio.Protocol` of a successful connection.""" - self._transport = transport - self._raw_transport = transport - if isinstance(transport, asyncio.SubprocessTransport): - self._transport = transport.get_pipe_transport(0) + del transport # no-op - def connection_lost(self, exc): + @override + def connection_lost(self, exc: Optional[Exception]) -> None: """Used to signal `asyncio.Protocol` of a lost connection.""" debug(f"connection_lost: exc = {exc}") - self._on_error(exc.args[0] if exc else 'EOF') + self._on_error(exc if exc else EOFError()) + @override def data_received(self, data: bytes) -> None: """Used to signal `asyncio.Protocol` of incoming data.""" - if self._on_data: - self._on_data(data) - return - self._queued_data.append(data) + self._on_data(data) - def pipe_connection_lost(self, fd, exc): + @override + def pipe_connection_lost(self, fd: int, exc: Optional[Exception]) -> None: """Used to signal `asyncio.SubprocessProtocol` of a lost connection.""" debug("pipe_connection_lost: fd = %s, exc = %s", fd, exc) if os.name == 'nt' and fd == 2: # stderr # On windows, ignore piped stderr being closed immediately (#505) return - self._on_error(exc.args[0] if exc else 'EOF') + self._on_error(exc if exc else EOFError()) + @override def pipe_data_received(self, fd, data): """Used to signal `asyncio.SubprocessProtocol` of incoming data.""" if fd == 2: # stderr fd number # Ignore stderr message, log only for debugging debug("stderr: %s", str(data)) - elif self._on_data: - self._on_data(data) - else: - self._queued_data.append(data) + elif fd == 1: # stdout + self.data_received(data) + @override def process_exited(self) -> None: """Used to signal `asyncio.SubprocessProtocol` when the child exits.""" debug("process_exited") - self._on_error('EOF') + self._on_error(EOFError()) + + +class AsyncioEventLoop(BaseEventLoop): + """`BaseEventLoop` subclass that uses core `asyncio` as a backend.""" + + _protocol: Optional[Protocol] + _transport: Optional[asyncio.WriteTransport] + _signals: List[Signals] + _data_buffer: Deque[bytes] + if os.name != 'nt': + _child_watcher: Optional['asyncio.AbstractChildWatcher'] - def _init(self) -> None: - self._loop = loop_cls() - self._queued_data = deque() - self._fact = lambda: self + def __init__(self, + transport_type: TTransportType, + *args: Any, **kwargs: Any): + """asyncio-specific initialization. see BaseEventLoop.__init__.""" + + # The underlying asyncio event loop. + self._loop: Final[asyncio.AbstractEventLoop] = loop_cls() + + # Handle messages from nvim that may arrive before run() starts. + self._data_buffer = deque() + + def _on_data(data: bytes) -> None: + if self._on_data is None: + self._data_buffer.append(data) + return + self._on_data(data) + + # pylint: disable-next=unnecessary-lambda + self._protocol_factory = lambda: Protocol( + on_data=_on_data, + on_error=self._on_error, + ) + self._protocol = None + + # The communication channel (endpoint) created by _connect_*() method. + self._transport = None self._raw_transport = None self._child_watcher = None + super().__init__(transport_type, *args, **kwargs) + + @override def _connect_tcp(self, address: str, port: int) -> None: async def connect_tcp(): - await self._loop.create_connection(self._fact, address, port) + transport, protocol = await self._loop.create_connection( + self._protocol_factory, address, port) debug(f"tcp connection successful: {address}:{port}") + self._transport = transport + self._protocol = protocol self._loop.run_until_complete(connect_tcp()) + @override def _connect_socket(self, path: str) -> None: async def connect_socket(): if os.name == 'nt': - transport, _ = await self._loop.create_pipe_connection(self._fact, path) + _create_connection = self._loop.create_pipe_connection else: - transport, _ = await self._loop.create_unix_connection(self._fact, path) - debug("socket connection successful: %s", transport) + _create_connection = self._loop.create_unix_connection + + transport, protocol = await _create_connection( + self._protocol_factory, path) + debug("socket connection successful: %s", self._transport) + self._transport = transport + self._protocol = protocol self._loop.run_until_complete(connect_socket()) + @override def _connect_stdio(self) -> None: async def connect_stdin(): if os.name == 'nt': pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno())) else: pipe = sys.stdin - await self._loop.connect_read_pipe(self._fact, pipe) + transport, protocol = await self._loop.connect_read_pipe( + self._protocol_factory, pipe) debug("native stdin connection successful") + del transport, protocol self._loop.run_until_complete(connect_stdin()) # Make sure subprocesses don't clobber stdout, @@ -122,52 +175,74 @@ async def connect_stdout(): else: pipe = os.fdopen(rename_stdout, 'wb') - await self._loop.connect_write_pipe(self._fact, pipe) + transport, protocol = await self._loop.connect_write_pipe( + self._protocol_factory, pipe) debug("native stdout connection successful") - + self._transport = transport + self._protocol = protocol self._loop.run_until_complete(connect_stdout()) + @override def _connect_child(self, argv: List[str]) -> None: if os.name != 'nt': # see #238, #241 - _child_watcher = asyncio.get_child_watcher() - _child_watcher.attach_loop(self._loop) + self._child_watcher = asyncio.get_child_watcher() + self._child_watcher.attach_loop(self._loop) async def create_subprocess(): - transport: asyncio.SubprocessTransport - transport, protocol = await self._loop.subprocess_exec(self._fact, *argv) + transport: asyncio.SubprocessTransport # type: ignore + transport, protocol = await self._loop.subprocess_exec( + self._protocol_factory, *argv) pid = transport.get_pid() debug("child subprocess_exec successful, PID = %s", pid) + self._transport = cast(asyncio.WriteTransport, + transport.get_pipe_transport(0)) # stdin + self._protocol = protocol + + # await until child process have been launched and the transport has + # been established self._loop.run_until_complete(create_subprocess()) + @override def _start_reading(self) -> None: pass + @override def _send(self, data: bytes) -> None: + assert self._transport, "connection has not been established." self._transport.write(data) + @override def _run(self) -> None: - while self._queued_data: - data = self._queued_data.popleft() + # process the early messages that arrived as soon as the transport + # channels are open and on_data is fully ready to receive messages. + while self._data_buffer: + data: bytes = self._data_buffer.popleft() if self._on_data is not None: self._on_data(data) + self._loop.run_forever() + @override def _stop(self) -> None: self._loop.stop() + @override def _close(self) -> None: + # TODO close all the transports if self._raw_transport is not None: - self._raw_transport.close() + self._raw_transport.close() # type: ignore[unreachable] self._loop.close() if self._child_watcher is not None: self._child_watcher.close() self._child_watcher = None + @override def _threadsafe_call(self, fn: Callable[[], Any]) -> None: self._loop.call_soon_threadsafe(fn) + @override def _setup_signals(self, signals: List[Signals]) -> None: if os.name == 'nt': # add_signal_handler is not supported in win32 @@ -178,6 +253,7 @@ def _setup_signals(self, signals: List[Signals]) -> None: for signum in self._signals: self._loop.add_signal_handler(signum, self._on_signal, signum) + @override def _teardown_signals(self) -> None: for signum in self._signals: self._loop.remove_signal_handler(signum) diff --git a/pynvim/msgpack_rpc/event_loop/base.py b/pynvim/msgpack_rpc/event_loop/base.py index 33bbf698..c7def3e1 100644 --- a/pynvim/msgpack_rpc/event_loop/base.py +++ b/pynvim/msgpack_rpc/event_loop/base.py @@ -4,7 +4,7 @@ import sys import threading from abc import ABC, abstractmethod -from typing import Any, Callable, List, Optional, Type, Union +from typing import Any, Callable, List, Optional, Union if sys.version_info < (3, 8): from typing_extensions import Literal @@ -34,13 +34,22 @@ class BaseEventLoop(ABC): - """Abstract base class for all event loops. Event loops act as the bottom layer for Nvim sessions created by this library. They hide system/transport details behind a simple interface for reading/writing bytes to the connected Nvim instance. + A lifecycle of event loop is as follows: (1. -> [2. -> 3.]* -> 4.) + 1. initialization (__init__): connection to Nvim is established. + 2. run(data_cb): run the event loop (blocks until the loop stops). + Requests are sent to the remote neovim by calling send(), and + responses (messages) from the remote neovim will be passed to the + given `data_cb` callback function while the event loop is running. + Note that run() may be called multiple times. + 3. stop(): stop the event loop. + 4. close(): close the event loop, destroying all the internal resources. + This class exposes public methods for interacting with the underlying event loop and delegates implementation-specific work to the following methods, which subclasses are expected to implement: @@ -54,15 +63,17 @@ class BaseEventLoop(ABC): embedded Nvim that has its stdin/stdout connected to the event loop. - `_start_reading()`: Called after any of _connect_* methods. Can be used to perform any post-connection setup or validation. - - `_send(data)`: Send `data`(byte array) to Nvim. The data is only + - `_send(data)`: Send `data` (byte array) to Nvim (usually RPC request). - `_run()`: Runs the event loop until stopped or the connection is closed. - calling the following methods when some event happens: - actually sent when the event loop is running. - - `_on_data(data)`: When Nvim sends some data. + The following methods can be called upon some events by the event loop: + - `_on_data(data)`: When Nvim sends some data (usually RPC response). - `_on_signal(signum)`: When a signal is received. - - `_on_error(message)`: When a non-recoverable error occurs(eg: - connection lost) - - `_stop()`: Stop the event loop + - `_on_error(exc)`: When a non-recoverable error occurs (e.g: + connection lost, or any other OSError) + Note that these _on_{data,signal,error} methods are not 'final', may be + changed around an execution of run(). The subclasses are expected to + handle any early messages arriving while _on_data is not yet set. + - `_stop()`: Stop the event loop. - `_interrupt(data)`: Like `stop()`, but may be called from other threads this. - `_setup_signals(signals)`: Add implementation-specific listeners for @@ -77,33 +88,20 @@ def __init__(self, transport_type: TTransportType, *args: Any, **kwargs: Any): configuration, like this: >>> BaseEventLoop('tcp', '127.0.0.1', 7450) - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' >>> BaseEventLoop('socket', '/tmp/nvim-socket') - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' >>> BaseEventLoop('stdio') - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' - >>> BaseEventLoop('child', - ['nvim', '--embed', '--headless', '-u', 'NONE']) - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' - - This calls the implementation-specific initialization - `_init`, one of the `_connect_*` methods(based on `transport_type`) - and `_start_reading()` + >>> BaseEventLoop('child', ['nvim', '--embed', '--headless', '-u', 'NONE']) + + Implementation-specific initialization should be made in the __init__ + constructor of the subclass, which must call the constructor of the + super class (BaseEventLoop), in which one of the `_connect_*` methods + (based on `transport_type`) and then `_start_reading()`. """ self._transport_type = transport_type self._signames = dict((k, v) for v, k in signal.__dict__.items() if v.startswith('SIG')) self._on_data: Optional[Callable[[bytes], None]] = None self._error: Optional[BaseException] = None - self._init() try: getattr(self, '_connect_{}'.format(transport_type))(*args, **kwargs) except Exception as e: @@ -111,10 +109,6 @@ def __init__(self, transport_type: TTransportType, *args: Any, **kwargs: Any): raise e self._start_reading() - @abstractmethod - def _init(self) -> None: - raise NotImplementedError() - @abstractmethod def _start_reading(self) -> None: raise NotImplementedError() @@ -172,17 +166,23 @@ def threadsafe_call(self, fn): """ self._threadsafe_call(fn) - def run(self, data_cb): - """Run the event loop.""" + @abstractmethod + def _threadsafe_call(self, fn: Callable[[], Any]) -> None: + raise NotImplementedError() + + def run(self, data_cb: Callable[[bytes], None]) -> None: + """Run the event loop, and receives response messages to a callback.""" if self._error: err = self._error if isinstance(self._error, KeyboardInterrupt): - # KeyboardInterrupt is not destructive(it may be used in + # KeyboardInterrupt is not destructive (it may be used in # the REPL). # After throwing KeyboardInterrupt, cleanup the _error field # so the loop may be started again self._error = None raise err + + # data_cb: e.g., MsgpackStream._on_data self._on_data = data_cb if threading.current_thread() == main_thread: self._setup_signals([signal.SIGINT, signal.SIGTERM]) @@ -194,6 +194,10 @@ def run(self, data_cb): signal.signal(signal.SIGINT, default_int_handler) self._on_data = None + @abstractmethod + def _run(self) -> None: + raise NotImplementedError() + def stop(self) -> None: """Stop the event loop.""" self._stop() @@ -213,23 +217,32 @@ def _close(self) -> None: raise NotImplementedError() def _on_signal(self, signum: signal.Signals) -> None: - msg = 'Received {}'.format(self._signames[signum]) + # pylint: disable-next=consider-using-f-string + msg = 'Received signal {}'.format(self._signames[signum]) debug(msg) + if signum == signal.SIGINT and self._transport_type == 'stdio': # When the transport is stdio, we are probably running as a Nvim # child process. In that case, we don't want to be killed by # ctrl+C return - cls: Type[BaseException] = Exception + if signum == signal.SIGINT: - cls = KeyboardInterrupt - self._error = cls(msg) + self._error = KeyboardInterrupt() + else: + self._error = Exception(msg) self.stop() - def _on_error(self, error: str) -> None: - debug(error) - self._error = OSError(error) + def _on_error(self, exc: Exception) -> None: + debug(str(exc)) + self._error = exc self.stop() def _on_interrupt(self) -> None: self.stop() + + def _setup_signals(self, signals: List[signal.Signals]) -> None: + pass # no-op by default + + def _teardown_signals(self) -> None: + pass # no-op by default diff --git a/pynvim/msgpack_rpc/msgpack_stream.py b/pynvim/msgpack_rpc/msgpack_stream.py index 49340c50..f209d849 100644 --- a/pynvim/msgpack_rpc/msgpack_stream.py +++ b/pynvim/msgpack_rpc/msgpack_stream.py @@ -4,20 +4,20 @@ from msgpack import Packer, Unpacker from pynvim.compat import unicode_errors_default +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) -class MsgpackStream(object): - +class MsgpackStream: """Two-way msgpack stream that wraps a event loop byte stream. This wraps the event loop interface for reading/writing bytes and exposes an interface for reading/writing msgpack documents. """ - def __init__(self, event_loop): + def __init__(self, event_loop: BaseEventLoop) -> None: """Wrap `event_loop` on a msgpack-aware interface.""" self.loop = event_loop self._packer = Packer(unicode_errors=unicode_errors_default) @@ -30,7 +30,7 @@ def threadsafe_call(self, fn): def send(self, msg): """Queue `msg` for sending to Nvim.""" - debug('sent %s', msg) + debug('sending %s', msg) self.loop.send(self._packer.pack(msg)) def run(self, message_cb): @@ -51,14 +51,15 @@ def close(self): """Close the event loop.""" self.loop.close() - def _on_data(self, data): + def _on_data(self, data: bytes) -> None: self._unpacker.feed(data) while True: try: debug('waiting for message...') msg = next(self._unpacker) debug('received message: %s', msg) - self._message_cb(msg) + assert self._message_cb is not None + self._message_cb(msg) # type: ignore[unreachable] except StopIteration: debug('unpacker needs more data...') break diff --git a/setup.cfg b/setup.cfg index 3b593d6b..de3fce30 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,7 +2,7 @@ test = pytest [flake8] -extend-ignore = D211,E731,D401,W503 +extend-ignore = D211,E731,D401,W503,D202 max-line-length = 100 per-file-ignores = test/*:D1 diff --git a/setup.py b/setup.py index 6b3a615d..f4ebee0c 100644 --- a/setup.py +++ b/setup.py @@ -30,8 +30,8 @@ # pypy already includes an implementation of the greenlet module install_requires.append('greenlet>=3.0') -if sys.version_info < (3, 8): - install_requires.append('typing-extensions') +if sys.version_info < (3, 12): + install_requires.append('typing-extensions>=4.5') # __version__: see pynvim/_version.py From 7f60f72def9b58a48d18abd84fc3a92b39b164bc Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Sun, 15 Oct 2023 04:35:07 -0400 Subject: [PATCH 5/6] refactor: improve typing --- pynvim/msgpack_rpc/async_session.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pynvim/msgpack_rpc/async_session.py b/pynvim/msgpack_rpc/async_session.py index 333c6cf2..e7766454 100644 --- a/pynvim/msgpack_rpc/async_session.py +++ b/pynvim/msgpack_rpc/async_session.py @@ -1,13 +1,19 @@ """Asynchronous msgpack-rpc handling in the event loop pipeline.""" import logging from traceback import format_exc +from typing import Any, AnyStr, Callable, Dict +from pynvim.msgpack_rpc.msgpack_stream import MsgpackStream logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) -class AsyncSession(object): +# response call back takes two arguments: (err, return_value) +ResponseCallback = Callable[..., None] + + +class AsyncSession: """Asynchronous msgpack-rpc layer that wraps a msgpack stream. @@ -16,11 +22,11 @@ class AsyncSession(object): requests and notifications. """ - def __init__(self, msgpack_stream): + def __init__(self, msgpack_stream: MsgpackStream): """Wrap `msgpack_stream` on a msgpack-rpc interface.""" self._msgpack_stream = msgpack_stream self._next_request_id = 1 - self._pending_requests = {} + self._pending_requests: Dict[int, ResponseCallback] = {} self._request_cb = self._notification_cb = None self._handlers = { 0: self._on_request, @@ -33,7 +39,8 @@ def threadsafe_call(self, fn): """Wrapper around `MsgpackStream.threadsafe_call`.""" self._msgpack_stream.threadsafe_call(fn) - def request(self, method, args, response_cb): + def request(self, method: AnyStr, args: Any, + response_cb: ResponseCallback) -> None: """Send a msgpack-rpc request to Nvim. A msgpack-rpc with method `method` and argument `args` is sent to @@ -89,8 +96,9 @@ def _on_request(self, msg): # - msg[2]: method name # - msg[3]: arguments debug('received request: %s, %s', msg[2], msg[3]) - self._request_cb(msg[2], msg[3], Response(self._msgpack_stream, - msg[1])) + assert self._request_cb is not None + self._request_cb(msg[2], msg[3], + Response(self._msgpack_stream, msg[1])) def _on_response(self, msg): # response to a previous request: @@ -105,6 +113,7 @@ def _on_notification(self, msg): # - msg[1]: event name # - msg[2]: arguments debug('received notification: %s, %s', msg[1], msg[2]) + assert self._notification_cb is not None self._notification_cb(msg[1], msg[2]) def _on_invalid_message(self, msg): @@ -113,15 +122,14 @@ def _on_invalid_message(self, msg): self._msgpack_stream.send([1, 0, error, None]) -class Response(object): - +class Response: """Response to a msgpack-rpc request that came from Nvim. When Nvim sends a msgpack-rpc request, an instance of this class is created for remembering state required to send a response. """ - def __init__(self, msgpack_stream, request_id): + def __init__(self, msgpack_stream: MsgpackStream, request_id: int): """Initialize the Response instance.""" self._msgpack_stream = msgpack_stream self._request_id = request_id From 17fbcbccde9978a5caa05f571174bfff4a0b7bdd Mon Sep 17 00:00:00 2001 From: Jongwook Choi Date: Mon, 16 Oct 2023 02:00:32 -0400 Subject: [PATCH 6/6] fix: prevent closed pipe errors on closing asyncio transport resources The proactor pipe transports for subprocess won't be automatically closed, so "closed pipe" errors (pytest warnings) occur during garbage collection (upon `__del__`). This results in a bunch of pytest warnings whenever closing and freeing up fixture Nvim sessions. A solution is to close all the internal `_ProactorBasePipeTransport` objects later when closing the asyncio event loop. Also, `_ProactorBasePipeTransport.close()` does not close immediately, but rather works asynchronously; therefore the `__del__` finalizer still can throw if called by GC after the event loop is closed. One solution for properly closing the pipe transports is to await the graceful shutdown of these transports. Example CI output (the pytest warnings that are going to be fixed): ``` Exception ignored in: Traceback (most recent call last): File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\proactor_events.py", line 116, in __del__ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\proactor_events.py", line 80, in __repr__ info.append(f'fd={self._sock.fileno()}') ^^^^^^^^^^^^^^^^^^^ File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\windows_utils.py", line 102, in fileno raise ValueError("I/O operation on closed pipe") ValueError: I/O operation on closed pipe Exception ignored in: Traceback (most recent call last): File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\proactor_events.py", line 116, in __del__ File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\proactor_events.py", line 80, in __repr__ File "C:\hostedtoolcache\windows\Python\3.11.5\x64\Lib\asyncio\windows_utils.py", line 102, in fileno ValueError: I/O operation on closed pipe ``` --- pynvim/msgpack_rpc/event_loop/asyncio.py | 42 ++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/pynvim/msgpack_rpc/event_loop/asyncio.py b/pynvim/msgpack_rpc/event_loop/asyncio.py index a801a326..532fbef6 100644 --- a/pynvim/msgpack_rpc/event_loop/asyncio.py +++ b/pynvim/msgpack_rpc/event_loop/asyncio.py @@ -117,9 +117,10 @@ def _on_data(data: bytes) -> None: ) self._protocol = None - # The communication channel (endpoint) created by _connect_*() method. + # The communication channel (endpoint) created by _connect_*() methods, + # where we write request messages to be sent to neovim self._transport = None - self._raw_transport = None + self._to_close: List[asyncio.BaseTransport] = [] self._child_watcher = None super().__init__(transport_type, *args, **kwargs) @@ -161,7 +162,8 @@ async def connect_stdin(): transport, protocol = await self._loop.connect_read_pipe( self._protocol_factory, pipe) debug("native stdin connection successful") - del transport, protocol + self._to_close.append(transport) + del protocol self._loop.run_until_complete(connect_stdin()) # Make sure subprocesses don't clobber stdout, @@ -200,6 +202,16 @@ async def create_subprocess(): transport.get_pipe_transport(0)) # stdin self._protocol = protocol + # proactor transport implementations do not close the pipes + # automatically, so make sure they are closed upon shutdown + def _close_later(transport): + if transport is not None: + self._to_close.append(transport) + + _close_later(transport.get_pipe_transport(1)) + _close_later(transport.get_pipe_transport(2)) + _close_later(transport) + # await until child process have been launched and the transport has # been established self._loop.run_until_complete(create_subprocess()) @@ -230,10 +242,28 @@ def _stop(self) -> None: @override def _close(self) -> None: - # TODO close all the transports - if self._raw_transport is not None: - self._raw_transport.close() # type: ignore[unreachable] + def _close_transport(transport): + transport.close() + + # Windows: for ProactorBasePipeTransport, close() doesn't take in + # effect immediately (closing happens asynchronously inside the + # event loop), need to wait a bit for completing graceful shutdown. + if os.name == 'nt' and hasattr(transport, '_sock'): + async def wait_until_closed(): + # pylint: disable-next=protected-access + while transport._sock is not None: + await asyncio.sleep(0.01) + self._loop.run_until_complete(wait_until_closed()) + + if self._transport: + _close_transport(self._transport) + self._transport = None + for transport in self._to_close: + _close_transport(transport) + self._to_close[:] = [] + self._loop.close() + if self._child_watcher is not None: self._child_watcher.close() self._child_watcher = None