From 67470842252c0a53b924d6c452a495bc9a22208d Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Sat, 24 Aug 2024 20:58:13 +0200 Subject: [PATCH 1/3] Remove _ClusterBatch --- integration/test_cluster.py | 95 +++++++++++++++++++++++++ weaviate/collections/batch/base.py | 33 ++------- weaviate/collections/cluster/cluster.py | 18 +++-- 3 files changed, 113 insertions(+), 33 deletions(-) create mode 100644 integration/test_cluster.py diff --git a/integration/test_cluster.py b/integration/test_cluster.py new file mode 100644 index 000000000..e29b50b8f --- /dev/null +++ b/integration/test_cluster.py @@ -0,0 +1,95 @@ +import pytest + +import weaviate +from weaviate.collections.classes.config import ( + Configure, + DataType, + Property, +) +from weaviate.util import parse_version_string + + +NODE_NAME = "node1" +NUM_OBJECT = 10 + + +@pytest.fixture(scope="module") +def client(): + client = weaviate.connect_to_local() + client.collections.delete_all() + yield client + client.collections.delete_all() + + +def test_rest_nodes_without_data(client: weaviate.WeaviateClient): + """get nodes status without data""" + resp = client.cluster.rest_nodes(output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert resp[0]["shards"] is None + assert resp[0]["stats"]["objectCount"] == 0 + assert resp[0]["stats"]["shardCount"] == 0 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + + +def test_rest_nodes_with_data(client: weaviate.WeaviateClient): + """get nodes status with data""" + collection_name_1 = "Collection_1" + uncap_collection_name_1 = "collection_1" + collection = client.collections.create( + name=collection_name_1, + properties=[Property(name="Name", data_type=DataType.TEXT)], + vectorizer_config=Configure.Vectorizer.none(), + ) + collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT)]) + + collection_name_2 = "Collection_2" + collection = client.collections.create( + name=collection_name_2, + properties=[Property(name="Name", data_type=DataType.TEXT)], + vectorizer_config=Configure.Vectorizer.none(), + ) + collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT * 2)]) + + # server behaviour changed by https://github.com/weaviate/weaviate/pull/4203 + server_is_at_least_124 = parse_version_string( + client.get_meta()["version"] + ) > parse_version_string("1.24") + + resp = client.cluster.rest_nodes(output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert len(resp[0]["shards"]) == 2 + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 3 + assert resp[0]["stats"]["shardCount"] == 2 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + + shards = sorted(resp[0]["shards"], key=lambda x: x["class"]) + assert shards[0]["class"] == collection_name_1 + assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + assert shards[1]["class"] == collection_name_2 + assert shards[1]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 2 + + resp = client.cluster.rest_nodes(collection=collection_name_1, output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert len(resp[0]["shards"]) == 1 + assert resp[0]["stats"]["shardCount"] == 1 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + + resp = client.cluster.rest_nodes(uncap_collection_name_1, output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert len(resp[0]["shards"]) == 1 + assert resp[0]["stats"]["shardCount"] == 1 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT diff --git a/weaviate/collections/batch/base.py b/weaviate/collections/batch/base.py index 207c8926a..45599c0a2 100644 --- a/weaviate/collections/batch/base.py +++ b/weaviate/collections/batch/base.py @@ -6,14 +6,11 @@ from collections import deque from copy import copy from dataclasses import dataclass, field -from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union, cast +from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union from pydantic import ValidationError from typing_extensions import TypeAlias -from httpx import ConnectError - -from weaviate.cluster.types import Node from weaviate.collections.batch.grpc_batch_objects import _BatchGRPC from weaviate.collections.batch.rest import _BatchREST from weaviate.collections.classes.batch import ( @@ -35,12 +32,12 @@ ReferenceInputs, ) from weaviate.collections.classes.types import WeaviateProperties +from weaviate.collections.cluster import _ClusterAsync from weaviate.connect import ConnectionV4 from weaviate.event_loop import _EventLoop -from weaviate.exceptions import WeaviateBatchValidationError, EmptyResponseException +from weaviate.exceptions import WeaviateBatchValidationError from weaviate.logger import logger from weaviate.types import UUID, VECTORS -from weaviate.util import _decode_json_response_dict from weaviate.warnings import _Warnings BatchResponse = List[Dict[str, Any]] @@ -183,7 +180,7 @@ def __init__( self.__results_lock = threading.Lock() - self.__cluster = _ClusterBatch(self.__connection) + self.__cluster = _ClusterAsync(self.__connection) self.__batching_mode: _BatchMode = batch_mode self.__max_batch_size: int = 1000 @@ -360,7 +357,7 @@ def batch_send_wrapper() -> None: return demonBatchSend def __dynamic_batching(self) -> None: - status = self.__loop.run_until_complete(self.__cluster.get_nodes_status) + status = self.__loop.run_until_complete(self.__cluster.rest_nodes) if "batchStats" not in status[0] or "queueLength" not in status[0]["batchStats"]: # async indexing - just send a lot self.__batching_mode = _FixedSizeBatching(1000, 10) @@ -700,23 +697,3 @@ def __check_bg_thread_alive(self) -> None: return raise self.__bg_thread_exception or Exception("Batch thread died unexpectedly") - - -class _ClusterBatch: - def __init__(self, connection: ConnectionV4): - self._connection = connection - - async def get_nodes_status( - self, - ) -> List[Node]: - try: - response = await self._connection.get(path="/nodes") - except ConnectError as conn_err: - raise ConnectError("Get nodes status failed due to connection error") from conn_err - - response_typed = _decode_json_response_dict(response, "Nodes status") - assert response_typed is not None - nodes = response_typed.get("nodes") - if nodes is None or nodes == []: - raise EmptyResponseException("Nodes status response returned empty") - return cast(List[Node], nodes) diff --git a/weaviate/collections/cluster/cluster.py b/weaviate/collections/cluster/cluster.py index 0a6696872..1b900fbe4 100644 --- a/weaviate/collections/cluster/cluster.py +++ b/weaviate/collections/cluster/cluster.py @@ -3,6 +3,7 @@ from typing import List, Literal, Optional, Union, overload +from weaviate.cluster.types import Node as NodeREST from weaviate.collections.classes.cluster import Node, Shards, _ConvertFromREST, Stats from weaviate.exceptions import ( EmptyResponseError, @@ -73,6 +74,17 @@ async def nodes( `weaviate.EmptyResponseError` If the response is empty. """ + nodes = await self.rest_nodes(collection, output) + if output == "verbose": + return _ConvertFromREST.nodes_verbose(nodes) + else: + return _ConvertFromREST.nodes_minimal(nodes) + + async def rest_nodes( + self, + collection: Optional[str] = None, + output: Optional[Literal["minimal", "verbose"]] = None, + ) -> List[NodeREST]: path = "/nodes" if collection is not None: path += "/" + _capitalize_first_letter(collection) @@ -86,8 +98,4 @@ async def nodes( nodes = response_typed.get("nodes") if nodes is None or nodes == []: raise EmptyResponseError("Nodes status response returned empty") - - if output == "verbose": - return _ConvertFromREST.nodes_verbose(nodes) - else: - return _ConvertFromREST.nodes_minimal(nodes) + return nodes From 784d9dfa1daf48e905bdbd79a8ff7f67d4c23269 Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Tue, 27 Aug 2024 20:27:00 +0200 Subject: [PATCH 2/3] Fix mypy issue --- weaviate/collections/cluster/cluster.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/weaviate/collections/cluster/cluster.py b/weaviate/collections/cluster/cluster.py index 1b900fbe4..1662bdf99 100644 --- a/weaviate/collections/cluster/cluster.py +++ b/weaviate/collections/cluster/cluster.py @@ -1,14 +1,11 @@ -from weaviate.connect import ConnectionV4 - - -from typing import List, Literal, Optional, Union, overload +from typing import List, Literal, Optional, Union, cast, overload from weaviate.cluster.types import Node as NodeREST from weaviate.collections.classes.cluster import Node, Shards, _ConvertFromREST, Stats +from weaviate.connect import ConnectionV4 from weaviate.exceptions import ( EmptyResponseError, ) - from weaviate.util import _capitalize_first_letter, _decode_json_response_dict @@ -98,4 +95,4 @@ async def rest_nodes( nodes = response_typed.get("nodes") if nodes is None or nodes == []: raise EmptyResponseError("Nodes status response returned empty") - return nodes + return cast(List[NodeREST], nodes) From 7bb00eb27e8e477752720416614c4aea27cb8637 Mon Sep 17 00:00:00 2001 From: Tibor Reiss Date: Thu, 29 Aug 2024 20:29:11 +0200 Subject: [PATCH 3/3] Fix tests, annotations, signatures based on code review --- integration/test_cluster.py | 159 ++++++++++++++------------ weaviate/collections/cluster/sync.pyi | 6 + 2 files changed, 89 insertions(+), 76 deletions(-) diff --git a/integration/test_cluster.py b/integration/test_cluster.py index e29b50b8f..87a313502 100644 --- a/integration/test_cluster.py +++ b/integration/test_cluster.py @@ -1,4 +1,5 @@ -import pytest +from contextlib import contextmanager +from typing import Generator, List import weaviate from weaviate.collections.classes.config import ( @@ -6,90 +7,96 @@ DataType, Property, ) -from weaviate.util import parse_version_string +COLLECTION_NAME_PREFIX = "Collection_test_cluster" NODE_NAME = "node1" NUM_OBJECT = 10 -@pytest.fixture(scope="module") -def client(): +@contextmanager +def get_weaviate_client( + collection_names: List[str], +) -> Generator[weaviate.WeaviateClient, None, None]: client = weaviate.connect_to_local() - client.collections.delete_all() + for collection_name in collection_names: + client.collections.delete(collection_name) yield client - client.collections.delete_all() + for collection_name in collection_names: + client.collections.delete(collection_name) + client.close() -def test_rest_nodes_without_data(client: weaviate.WeaviateClient): +def test_rest_nodes_without_data() -> None: """get nodes status without data""" - resp = client.cluster.rest_nodes(output="verbose") - assert len(resp) == 1 - assert "gitHash" in resp[0] - assert resp[0]["name"] == NODE_NAME - assert resp[0]["shards"] is None - assert resp[0]["stats"]["objectCount"] == 0 - assert resp[0]["stats"]["shardCount"] == 0 - assert resp[0]["status"] == "HEALTHY" - assert "version" in resp[0] - - -def test_rest_nodes_with_data(client: weaviate.WeaviateClient): + with get_weaviate_client([]) as client: + resp = client.cluster.rest_nodes(output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert resp[0]["shards"] is None + assert resp[0]["stats"]["objectCount"] == 0 + assert resp[0]["stats"]["shardCount"] == 0 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + + +def test_rest_nodes_with_data() -> None: """get nodes status with data""" - collection_name_1 = "Collection_1" - uncap_collection_name_1 = "collection_1" - collection = client.collections.create( - name=collection_name_1, - properties=[Property(name="Name", data_type=DataType.TEXT)], - vectorizer_config=Configure.Vectorizer.none(), - ) - collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT)]) - - collection_name_2 = "Collection_2" - collection = client.collections.create( - name=collection_name_2, - properties=[Property(name="Name", data_type=DataType.TEXT)], - vectorizer_config=Configure.Vectorizer.none(), - ) - collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT * 2)]) - - # server behaviour changed by https://github.com/weaviate/weaviate/pull/4203 - server_is_at_least_124 = parse_version_string( - client.get_meta()["version"] - ) > parse_version_string("1.24") - - resp = client.cluster.rest_nodes(output="verbose") - assert len(resp) == 1 - assert "gitHash" in resp[0] - assert resp[0]["name"] == NODE_NAME - assert len(resp[0]["shards"]) == 2 - assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 3 - assert resp[0]["stats"]["shardCount"] == 2 - assert resp[0]["status"] == "HEALTHY" - assert "version" in resp[0] - - shards = sorted(resp[0]["shards"], key=lambda x: x["class"]) - assert shards[0]["class"] == collection_name_1 - assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT - assert shards[1]["class"] == collection_name_2 - assert shards[1]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 2 - - resp = client.cluster.rest_nodes(collection=collection_name_1, output="verbose") - assert len(resp) == 1 - assert "gitHash" in resp[0] - assert resp[0]["name"] == NODE_NAME - assert len(resp[0]["shards"]) == 1 - assert resp[0]["stats"]["shardCount"] == 1 - assert resp[0]["status"] == "HEALTHY" - assert "version" in resp[0] - assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT - - resp = client.cluster.rest_nodes(uncap_collection_name_1, output="verbose") - assert len(resp) == 1 - assert "gitHash" in resp[0] - assert resp[0]["name"] == NODE_NAME - assert len(resp[0]["shards"]) == 1 - assert resp[0]["stats"]["shardCount"] == 1 - assert resp[0]["status"] == "HEALTHY" - assert "version" in resp[0] - assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + collection_name_1 = f"{COLLECTION_NAME_PREFIX}_rest_nodes_with_data_1" + collection_name_2 = f"{COLLECTION_NAME_PREFIX}_rest_nodes_with_data_2" + uncap_collection_name_1 = collection_name_1[0].lower() + collection_name_1[1:] + + with get_weaviate_client([collection_name_1, collection_name_2]) as client: + collection = client.collections.create( + name=collection_name_1, + properties=[Property(name="Name", data_type=DataType.TEXT)], + vectorizer_config=Configure.Vectorizer.none(), + ) + collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT)]) + + collection = client.collections.create( + name=collection_name_2, + properties=[Property(name="Name", data_type=DataType.TEXT)], + vectorizer_config=Configure.Vectorizer.none(), + ) + collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT * 2)]) + + # server behaviour changed by https://github.com/weaviate/weaviate/pull/4203 + server_is_at_least_124 = client._connection._weaviate_version.is_at_least(1, 24, 0) + + resp = client.cluster.rest_nodes(output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert resp[0]["shards"] is not None and len(resp[0]["shards"]) == 2 + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 3 + assert resp[0]["stats"]["shardCount"] == 2 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + + shards = sorted(resp[0]["shards"], key=lambda x: x["class"]) + assert shards[0]["class"] == collection_name_1 + assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + assert shards[1]["class"] == collection_name_2 + assert shards[1]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 2 + + resp = client.cluster.rest_nodes(collection=collection_name_1, output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert resp[0]["shards"] is not None and len(resp[0]["shards"]) == 1 + assert resp[0]["stats"]["shardCount"] == 1 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + + resp = client.cluster.rest_nodes(uncap_collection_name_1, output="verbose") + assert len(resp) == 1 + assert "gitHash" in resp[0] + assert resp[0]["name"] == NODE_NAME + assert resp[0]["shards"] is not None and len(resp[0]["shards"]) == 1 + assert resp[0]["stats"]["shardCount"] == 1 + assert resp[0]["status"] == "HEALTHY" + assert "version" in resp[0] + assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT diff --git a/weaviate/collections/cluster/sync.pyi b/weaviate/collections/cluster/sync.pyi index a32e8ff62..76da2e310 100644 --- a/weaviate/collections/cluster/sync.pyi +++ b/weaviate/collections/cluster/sync.pyi @@ -1,5 +1,6 @@ from typing import List, Literal, Optional, overload +from weaviate.cluster.types import Node as NodeREST from weaviate.collections.classes.cluster import Node, Shards, Stats from weaviate.collections.cluster.cluster import _ClusterBase @@ -25,3 +26,8 @@ class _Cluster(_ClusterBase): *, output: Literal["verbose"], ) -> List[Node[Shards, Stats]]: ... + def rest_nodes( + self, + collection: Optional[str] = None, + output: Optional[Literal["minimal", "verbose"]] = None, + ) -> List[NodeREST]: ...