Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore BlockingIOError error logs emitted by aio.grpc in event loops #1180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ async def test_async_client_is_ready() -> None:
assert await client.is_ready()


@pytest.mark.skip("gRPC proxying is not supported by grpclib at this time")
def test_local_proxies() -> None:
with weaviate.connect_to_local(
additional_config=wvc.init.AdditionalConfig(
Expand All @@ -604,3 +603,21 @@ def test_local_proxies() -> None:
)
collection.data.insert({"name": "Test"})
assert collection.query.fetch_objects().objects[0].properties["name"] == "Test"


@pytest.mark.asyncio
async def test_async_client_inside_sync_client(caplog: pytest.LogCaptureFixture) -> None:
with weaviate.connect_to_local() as client:
async with weaviate.use_async_with_local() as aclient:
assert client.is_ready()
assert await aclient.is_ready()
assert "BlockingIOError: [Errno 35] Resource temporarily unavailable" not in caplog.text


@pytest.mark.asyncio
async def test_sync_client_inside_async_client(caplog: pytest.LogCaptureFixture) -> None:
async with weaviate.use_async_with_local() as aclient:
with weaviate.connect_to_local() as client:
assert client.is_ready()
assert await aclient.is_ready()
assert "BlockingIOError: [Errno 35] Resource temporarily unavailable" not in caplog.text
5 changes: 4 additions & 1 deletion weaviate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)
from .gql import Query
from .schema import Schema
from weaviate.event_loop import _EventLoopSingleton
from weaviate.event_loop import _EventLoopSingleton, _EventLoop
from .types import NUMBER
from .util import _get_valid_timeout_config, _type_request_response
from .warnings import _Warnings
Expand Down Expand Up @@ -83,6 +83,7 @@ def __init__(
self._event_loop = _EventLoopSingleton.get_instance()
assert self._event_loop.loop is not None
self._loop = self._event_loop.loop
_EventLoop.patch_exception_handler(self._loop)

super().__init__(
connection_params=connection_params,
Expand Down Expand Up @@ -144,6 +145,8 @@ def __init__(
skip_init_checks: bool = False,
) -> None:
self._loop = asyncio.get_event_loop()
_EventLoop.patch_exception_handler(self._loop)

super().__init__(
connection_params=connection_params,
embedded_options=embedded_options,
Expand Down
20 changes: 0 additions & 20 deletions weaviate/connect/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@ def _grpc_address(self) -> Tuple[str, int]:
def _grpc_target(self) -> str:
return f"{self.grpc.host}:{self.grpc.port}"

# @overload
# def _grpc_channel(self, async_channel: Literal[False], proxies: Dict[str, str]) -> Channel:
# ...

# @overload
# def _grpc_channel(self, async_channel: Literal[True], proxies: Dict[str, str]) -> AsyncChannel:
# ...

def _grpc_channel(self, proxies: Dict[str, str]) -> Channel:
if (p := proxies.get("grpc")) is not None:
options: list = [*GRPC_DEFAULT_OPTIONS, ("grpc.http_proxy", p)]
Expand All @@ -136,18 +128,6 @@ def _grpc_channel(self, proxies: Dict[str, str]) -> Channel:
options=options,
)

# def _grpc_channel(self, proxies: Dict[str, str]) -> Channel:
# # if (p := proxies.get("grpc")) is not None:
# # options: list = [*GRPC_DEFAULT_OPTIONS, ("grpc.http_proxy", p)]
# # else:
# # options = GRPC_DEFAULT_OPTIONS
# return Channel(
# host=self._grpc_address[0],
# port=self._grpc_address[1],
# # options=options,
# ssl=self.grpc.secure,
# )

@property
def _http_scheme(self) -> str:
return "https" if self.http.secure else "http"
Expand Down
65 changes: 18 additions & 47 deletions weaviate/connect/v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass, field
from ssl import SSLZeroReturnError
from threading import Event, Thread
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast, overload
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast

from authlib.integrations.httpx_client import ( # type: ignore
AsyncOAuth2Client,
Expand All @@ -28,7 +28,6 @@
RemoteProtocolError,
RequestError,
Response,
HTTPTransport,
Timeout,
)

Expand Down Expand Up @@ -199,58 +198,30 @@ def set_integrations(self, integrations_config: List[_IntegrationConfig]) -> Non
self._headers.update(integration._to_header())
self.__additional_headers.update(integration._to_header())

@overload
def __make_mounts(self, type_: Literal["sync"]) -> Dict[str, HTTPTransport]:
...

@overload
def __make_mounts(self, type_: Literal["async"]) -> Dict[str, AsyncHTTPTransport]:
...

def __make_mounts(
self, type_: Literal["sync", "async"]
) -> Union[Dict[str, HTTPTransport], Dict[str, AsyncHTTPTransport]]:
if type_ == "async":
return {
f"{key}://"
if key == "http" or key == "https"
else key: AsyncHTTPTransport(
limits=Limits(
max_connections=self.__connection_config.session_pool_maxsize,
max_keepalive_connections=self.__connection_config.session_pool_connections,
),
proxy=proxy,
retries=self.__connection_config.session_pool_max_retries,
trust_env=self.__trust_env,
)
for key, proxy in self._proxies.items()
if key != "grpc"
}
else:
assert type_ == "sync"
return {
f"{key}://"
if key == "http" or key == "https"
else key: HTTPTransport(
limits=Limits(
max_connections=self.__connection_config.session_pool_maxsize,
max_keepalive_connections=self.__connection_config.session_pool_connections,
),
proxy=proxy,
retries=self.__connection_config.session_pool_max_retries,
trust_env=self.__trust_env,
)
for key, proxy in self._proxies.items()
if key != "grpc"
}
def __make_mounts(self) -> Dict[str, AsyncHTTPTransport]:
return {
f"{key}://"
if key == "http" or key == "https"
else key: AsyncHTTPTransport(
limits=Limits(
max_connections=self.__connection_config.session_pool_maxsize,
max_keepalive_connections=self.__connection_config.session_pool_connections,
),
proxy=proxy,
retries=self.__connection_config.session_pool_max_retries,
trust_env=self.__trust_env,
)
for key, proxy in self._proxies.items()
if key != "grpc"
}

def __make_async_client(self) -> AsyncClient:
return AsyncClient(
headers=self._headers,
timeout=Timeout(
None, connect=self.timeout_config.query, read=self.timeout_config.insert
),
mounts=self.__make_mounts("async"),
mounts=self.__make_mounts(),
)

def __make_clients(self) -> None:
Expand Down
31 changes: 30 additions & 1 deletion weaviate/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
import time
from concurrent.futures import Future
from typing import Any, Callable, Coroutine, Generic, Optional, TypeVar, cast
from typing import Any, Callable, Coroutine, Dict, Generic, Optional, TypeVar, cast

from typing_extensions import ParamSpec

Expand Down Expand Up @@ -81,6 +81,30 @@ def __start_new_event_loop() -> asyncio.AbstractEventLoop:

return loop

@staticmethod
def patch_exception_handler(loop: asyncio.AbstractEventLoop) -> None:
"""
This patches the asyncio exception handler to ignore the `BlockingIOError: [Errno 35] Resource temporarily unavailable` error
that is emitted by `aio.grpc` when multiple event loops are used in separate threads. This error is not actually an implementation/call error,
it's just a problem with grpc's cython implementation of `aio.Channel.__init__` whereby a `socket.recv(1)` call only works on the first call with
all subsequent calls to `aio.Channel.__init__` throwing the above error.

This call within the `aio.Channel.__init__` method does not affect the functionality of the library and can be safely ignored.

Context:
- https://github.com/grpc/grpc/issues/25364
- https://github.com/grpc/grpc/pull/36096
"""

def exception_handler(loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None:
if "exception" in context:
err = f"{type(context['exception']).__name__}: {context['exception']}"
if "BlockingIOError: [Errno 35] Resource temporarily unavailable" == err:
return
loop.default_exception_handler(context)

loop.set_exception_handler(exception_handler)

def __del__(self) -> None:
self.shutdown()

Expand All @@ -95,3 +119,8 @@ def get_instance(cls) -> _EventLoop:
cls._instance = _EventLoop()
cls._instance.start()
return cls._instance

def __del__(self) -> None:
if self._instance is not None:
self._instance.shutdown()
self._instance = None
Loading