From 3833ceba0ae502f2e05c25f39e68b6e9b285035e Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 24 Jul 2024 15:11:58 +0100 Subject: [PATCH 1/3] Parse the correct timeouts to httpx client --- weaviate/connect/v4.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/weaviate/connect/v4.py b/weaviate/connect/v4.py index c372663b6..209b7e44b 100644 --- a/weaviate/connect/v4.py +++ b/weaviate/connect/v4.py @@ -220,7 +220,10 @@ def __make_async_client(self) -> AsyncClient: return AsyncClient( headers=self._headers, timeout=Timeout( - None, connect=self.timeout_config.query, read=self.timeout_config.insert + None, + connect=self.timeout_config.init, + read=self.timeout_config.query, + write=self.timeout_config.insert, ), mounts=self.__make_mounts(), ) From f68b5f869c340c1845d204d12c6eaac74d6fee9a Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 24 Jul 2024 16:33:34 +0100 Subject: [PATCH 2/3] Change timeout behaviour to better mimic usage as timeout of Weaviate op, not of HTTP op --- mock_tests/conftest.py | 70 ++++++++++++++++++- mock_tests/mock_data.py | 69 ++++++++++++++++++ mock_tests/test_timeouts.py | 25 +++++++ weaviate/client_base.py | 1 + .../collections/batch/grpc_batch_objects.py | 6 +- weaviate/config.py | 6 +- weaviate/connect/v4.py | 37 ++++++++-- weaviate/exceptions.py | 10 ++- 8 files changed, 210 insertions(+), 14 deletions(-) create mode 100644 mock_tests/mock_data.py create mode 100644 mock_tests/test_timeouts.py diff --git a/mock_tests/conftest.py b/mock_tests/conftest.py index 004851ac8..433b354fc 100644 --- a/mock_tests/conftest.py +++ b/mock_tests/conftest.py @@ -1,4 +1,5 @@ import json +import time from concurrent import futures from typing import Generator, Mapping @@ -8,11 +9,19 @@ from grpc_health.v1.health_pb2 import HealthCheckResponse, HealthCheckRequest from grpc_health.v1.health_pb2_grpc import HealthServicer, add_HealthServicer_to_server from pytest_httpserver import HTTPServer, HeaderValueMatcher -from werkzeug.wrappers import Response +from werkzeug.wrappers import Request, Response import weaviate from weaviate.connect.base import ConnectionParams, ProtocolParams -from weaviate.proto.v1 import properties_pb2, tenants_pb2, search_get_pb2, weaviate_pb2_grpc +from weaviate.proto.v1 import ( + batch_pb2, + properties_pb2, + tenants_pb2, + search_get_pb2, + weaviate_pb2_grpc, +) + +from mock_tests.mock_data import mock_class MOCK_IP = "127.0.0.1" MOCK_PORT = 23536 @@ -76,6 +85,26 @@ def weaviate_auth_mock(weaviate_mock: HTTPServer): yield weaviate_mock +@pytest.fixture(scope="function") +def weaviate_timeouts_mock(weaviate_no_auth_mock: HTTPServer): + def slow_get(request: Request) -> Response: + time.sleep(1) + return Response(json.dumps({"doesn't": "matter"}), content_type="application/json") + + def slow_post(request: Request) -> Response: + time.sleep(2) + return Response(json.dumps({"doesn't": "matter"}), content_type="application/json") + + weaviate_no_auth_mock.expect_request( + f"/v1/schema/{mock_class['class']}", method="GET" + ).respond_with_handler(slow_get) + weaviate_no_auth_mock.expect_request("/v1/objects", method="POST").respond_with_handler( + slow_post + ) + + yield weaviate_no_auth_mock + + @pytest.fixture(scope="function") def start_grpc_server() -> Generator[grpc.Server, None, None]: # Create a gRPC server @@ -110,6 +139,22 @@ def weaviate_client( client.close() +@pytest.fixture(scope="function") +def weaviate_timeouts_client( + weaviate_timeouts_mock: HTTPServer, start_grpc_server: grpc.Server +) -> Generator[weaviate.WeaviateClient, None, None]: + client = weaviate.connect_to_local( + host=MOCK_IP, + port=MOCK_PORT, + grpc_port=MOCK_PORT_GRPC, + additional_config=weaviate.classes.init.AdditionalConfig( + timeout=weaviate.classes.init.Timeout(query=0.5, insert=1.5) + ), + ) + yield client + client.close() + + @pytest.fixture(scope="function") def tenants_collection( weaviate_client: weaviate.WeaviateClient, start_grpc_server: grpc.Server @@ -184,3 +229,24 @@ def Search( weaviate_pb2_grpc.add_WeaviateServicer_to_server(MockWeaviateService(), start_grpc_server) return weaviate_client.collections.get("YearZeroCollection") + + +@pytest.fixture(scope="function") +def timeouts_collection( + weaviate_timeouts_client: weaviate.WeaviateClient, start_grpc_server: grpc.Server +) -> weaviate.collections.Collection: + class MockWeaviateService(weaviate_pb2_grpc.WeaviateServicer): + def Search( + self, request: search_get_pb2.SearchRequest, context: grpc.ServicerContext + ) -> search_get_pb2.SearchReply: + time.sleep(1) + return search_get_pb2.SearchReply() + + def BatchObjects( + self, request: batch_pb2.BatchObjectsRequest, context: grpc.ServicerContext + ) -> batch_pb2.BatchObjectsReply: + time.sleep(2) + return batch_pb2.BatchObjectsReply() + + weaviate_pb2_grpc.add_WeaviateServicer_to_server(MockWeaviateService(), start_grpc_server) + return weaviate_timeouts_client.collections.get(mock_class["class"]) diff --git a/mock_tests/mock_data.py b/mock_tests/mock_data.py new file mode 100644 index 000000000..b6f218406 --- /dev/null +++ b/mock_tests/mock_data.py @@ -0,0 +1,69 @@ +mock_class = { + "class": "Something", + "description": "It's something!", + "invertedIndexConfig": { + "bm25": {"b": 0.8, "k1": 1.3}, + "cleanupIntervalSeconds": 61, + "indexPropertyLength": True, + "indexTimestamps": True, + "stopwords": {"additions": None, "preset": "en", "removals": ["the"]}, + }, + "moduleConfig": { + "generative-openai": {}, + "text2vec-contextionary": {"vectorizeClassName": True}, + }, + "multiTenancyConfig": { + "autoTenantActivation": False, + "autoTenantCreation": False, + "enabled": False, + }, + "properties": [ + { + "dataType": ["text[]"], + "indexFilterable": True, + "indexRangeFilters": False, + "indexSearchable": True, + "moduleConfig": { + "text2vec-contextionary": {"skip": False, "vectorizePropertyName": False} + }, + "name": "names", + "tokenization": "word", + } + ], + "replicationConfig": {"asyncEnabled": False, "factor": 1}, + "shardingConfig": { + "virtualPerPhysical": 128, + "desiredCount": 1, + "actualCount": 1, + "desiredVirtualCount": 128, + "actualVirtualCount": 128, + "key": "_id", + "strategy": "hash", + "function": "murmur3", + }, + "vectorIndexConfig": { + "skip": True, + "cleanupIntervalSeconds": 300, + "maxConnections": 64, + "efConstruction": 128, + "ef": -2, + "dynamicEfMin": 101, + "dynamicEfMax": 501, + "dynamicEfFactor": 9, + "vectorCacheMaxObjects": 1000000000001, + "flatSearchCutoff": 40001, + "distance": "cosine", + "pq": { + "enabled": True, + "bitCompression": True, + "segments": 1, + "centroids": 257, + "trainingLimit": 100001, + "encoder": {"type": "tile", "distribution": "normal"}, + }, + "bq": {"enabled": False}, + "sq": {"enabled": False, "trainingLimit": 100000, "rescoreLimit": 20}, + }, + "vectorIndexType": "hnsw", + "vectorizer": "text2vec-contextionary", +} diff --git a/mock_tests/test_timeouts.py b/mock_tests/test_timeouts.py new file mode 100644 index 000000000..094c1094d --- /dev/null +++ b/mock_tests/test_timeouts.py @@ -0,0 +1,25 @@ +import pytest +import weaviate +from weaviate.exceptions import WeaviateTimeoutError, WeaviateQueryError + + +def test_timeout_rest_query(timeouts_collection: weaviate.collections.Collection): + with pytest.raises(WeaviateTimeoutError): + timeouts_collection.config.get() + + +def test_timeout_rest_insert(timeouts_collection: weaviate.collections.Collection): + with pytest.raises(WeaviateTimeoutError): + timeouts_collection.data.insert(properties={"what": "ever"}) + + +def test_timeout_grpc_query(timeouts_collection: weaviate.collections.Collection): + with pytest.raises(WeaviateQueryError) as recwarn: + timeouts_collection.query.fetch_objects() + assert "DEADLINE_EXCEEDED" in str(recwarn) + + +def test_timeout_grpc_insert(timeouts_collection: weaviate.collections.Collection): + with pytest.raises(WeaviateQueryError) as recwarn: + timeouts_collection.data.insert_many([{"what": "ever"}]) + assert "DEADLINE_EXCEEDED" in str(recwarn) diff --git a/weaviate/client_base.py b/weaviate/client_base.py index b833ee006..dcd2b57ca 100644 --- a/weaviate/client_base.py +++ b/weaviate/client_base.py @@ -209,6 +209,7 @@ async def graphql_raw_query(self, gql_query: str) -> _RawGQLReturn: weaviate_object=json_query, error_msg="Raw GQL query failed", status_codes=_ExpectedStatusCodes(ok_in=[200], error="GQL query"), + is_gql_query=True, ) res = _decode_json_response_dict(response, "GQL query") diff --git a/weaviate/collections/batch/grpc_batch_objects.py b/weaviate/collections/batch/grpc_batch_objects.py index 71133c915..84d7fddaa 100644 --- a/weaviate/collections/batch/grpc_batch_objects.py +++ b/weaviate/collections/batch/grpc_batch_objects.py @@ -79,7 +79,9 @@ def pack_vector(vector: Any) -> bytes: for obj in objects ] - async def objects(self, objects: List[_BatchObject], timeout: int) -> BatchObjectReturn: + async def objects( + self, objects: List[_BatchObject], timeout: Union[int, float] + ) -> BatchObjectReturn: """Insert multiple objects into Weaviate through the gRPC API. Parameters: @@ -131,7 +133,7 @@ async def objects(self, objects: List[_BatchObject], timeout: int) -> BatchObjec ) async def __send_batch( - self, batch: List[batch_pb2.BatchObject], timeout: int + self, batch: List[batch_pb2.BatchObject], timeout: Union[int, float] ) -> Dict[int, str]: metadata = self._get_metadata() try: diff --git a/weaviate/config.py b/weaviate/config.py index 8ed905138..8f9a55f95 100644 --- a/weaviate/config.py +++ b/weaviate/config.py @@ -48,9 +48,9 @@ def __post_init__(self) -> None: class Timeout(BaseModel): """Timeouts for the different operations in the client.""" - query: int = Field(default=30, ge=0) - insert: int = Field(default=90, ge=0) - init: int = Field(default=2, ge=0) + query: Union[int, float] = Field(default=30, ge=0) + insert: Union[int, float] = Field(default=90, ge=0) + init: Union[int, float] = Field(default=2, ge=0) class Proxies(BaseModel): diff --git a/weaviate/connect/v4.py b/weaviate/connect/v4.py index 209b7e44b..65ec7f920 100644 --- a/weaviate/connect/v4.py +++ b/weaviate/connect/v4.py @@ -25,6 +25,7 @@ HTTPStatusError, Limits, ReadError, + ReadTimeout, RemoteProtocolError, RequestError, Response, @@ -55,6 +56,7 @@ WeaviateConnectionError, WeaviateGRPCUnavailableError, WeaviateStartUpError, + WeaviateTimeoutError, ) from weaviate.proto.v1 import weaviate_pb2_grpc from weaviate.util import ( @@ -219,12 +221,6 @@ def __make_mounts(self) -> Dict[str, AsyncHTTPTransport]: def __make_async_client(self) -> AsyncClient: return AsyncClient( headers=self._headers, - timeout=Timeout( - None, - connect=self.timeout_config.init, - read=self.timeout_config.query, - write=self.timeout_config.insert, - ), mounts=self.__make_mounts(), ) @@ -409,12 +405,36 @@ def __get_latest_headers(self) -> Dict[str, str]: copied_headers.update({"authorization": self.get_current_bearer_token()}) return copied_headers + def __get_timeout( + self, method: Literal["DELETE", "GET", "HEAD", "PATCH", "POST", "PUT"], is_gql_query: bool + ) -> Timeout: + """ + In this way, the client waits the `httpx` default of 5s when connecting to a socket (connect), writing chunks (write), and + acquiring a connection from the pool (pool), but a custom amount as specified for reading the response (read). + + From the PoV of the user, a request is considered to be timed out if no response is received within the specified time. + They specify the times depending on how they expect Weaviate to behave. For example, a query might take longer than an insert or vice versa. + + https://www.python-httpx.org/advanced/timeouts/ + """ + timeout = None + if method == "DELETE" or method == "PATCH" or method == "PUT": + timeout = self.timeout_config.insert + elif method == "GET" or method == "HEAD": + timeout = self.timeout_config.query + elif method == "POST" and is_gql_query: + timeout = self.timeout_config.query + elif method == "POST" and not is_gql_query: + timeout = self.timeout_config.insert + return Timeout(timeout=5.0, read=timeout) + async def __send( self, method: Literal["DELETE", "GET", "HEAD", "PATCH", "POST", "PUT"], url: str, error_msg: str, status_codes: Optional[_ExpectedStatusCodes], + is_gql_query: bool = False, weaviate_object: Optional[JSONPayload] = None, params: Optional[Dict[str, Any]] = None, ) -> Response: @@ -430,6 +450,7 @@ async def __send( json=weaviate_object, params=params, headers=self.__get_latest_headers(), + timeout=self.__get_timeout(method, is_gql_query), ) res = await self._client.send(req) if status_codes is not None and res.status_code not in status_codes.ok: @@ -439,6 +460,8 @@ async def __send( raise WeaviateClosedClientError() from e except ConnectError as conn_err: raise WeaviateConnectionError(error_msg) from conn_err + except ReadTimeout as read_err: + raise WeaviateTimeoutError(error_msg) from read_err except Exception as e: raise e @@ -483,6 +506,7 @@ async def post( params: Optional[Dict[str, Any]] = None, error_msg: str = "", status_codes: Optional[_ExpectedStatusCodes] = None, + is_gql_query: bool = False, ) -> Response: return await self.__send( "POST", @@ -491,6 +515,7 @@ async def post( params=params, error_msg=error_msg, status_codes=status_codes, + is_gql_query=is_gql_query, ) async def put( diff --git a/weaviate/exceptions.py b/weaviate/exceptions.py index 0d6161e41..ac54d8199 100644 --- a/weaviate/exceptions.py +++ b/weaviate/exceptions.py @@ -317,7 +317,7 @@ class WeaviateConnectionError(WeaviateBaseError): """Is raised when the connection to Weaviate fails.""" def __init__(self, message: str = "") -> None: - msg = f"""Connection to Weaviate failed. {message}""" + msg = f"""Connection to Weaviate failed. Details: {message}""" super().__init__(msg) @@ -327,3 +327,11 @@ class WeaviateUnsupportedFeatureError(WeaviateBaseError): def __init__(self, feature: str, current: str, minimum: str) -> None: msg = f"""{feature} is not supported by your connected server's Weaviate version. The current version is {current}, but the feature requires at least version {minimum}.""" super().__init__(msg) + + +class WeaviateTimeoutError(WeaviateBaseError): + """Is raised when a request to Weaviate times out.""" + + def __init__(self, message: str = "") -> None: + msg = f"""The request to Weaviate timed out while awaiting a response. Try adjusting the timeout config for your client. Details: {message}""" + super().__init__(msg) From 8443c3df019cbda3cd662bb4aa066d89df69d932 Mon Sep 17 00:00:00 2001 From: Tommy Smith Date: Wed, 24 Jul 2024 16:37:20 +0100 Subject: [PATCH 3/3] Complete docstring --- weaviate/connect/v4.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weaviate/connect/v4.py b/weaviate/connect/v4.py index 65ec7f920..cd23d6d87 100644 --- a/weaviate/connect/v4.py +++ b/weaviate/connect/v4.py @@ -413,7 +413,8 @@ def __get_timeout( acquiring a connection from the pool (pool), but a custom amount as specified for reading the response (read). From the PoV of the user, a request is considered to be timed out if no response is received within the specified time. - They specify the times depending on how they expect Weaviate to behave. For example, a query might take longer than an insert or vice versa. + They specify the times depending on how they expect Weaviate to behave. For example, a query might take longer than an insert or vice versa + but, in either case, the user only cares about how long it takes for a response to be received. https://www.python-httpx.org/advanced/timeouts/ """