From f0ea918915b79996e70f8fb88b088dcef5f51741 Mon Sep 17 00:00:00 2001 From: Sirjanpreet Singh Banga <151817113+sirjan-ws-ext@users.noreply.github.com> Date: Thu, 13 Jun 2024 02:40:14 +0530 Subject: [PATCH 01/10] fix: Multi tenancy enabled bots need tenant passed --- weaviate/batch/crud_batch.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index 93bbb9829..9357ea470 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -805,11 +805,19 @@ def _readd_objects_after_timeout( new_batch = ObjectsBatchRequest() for obj in batch_request.get_request_body()["objects"]: class_name = obj["class"] + tenant = obj.get("tenant", None) uuid = obj["id"] - response_head = self._connection.head( - path="/objects/" + class_name + "/" + uuid, - ) - + + if tenant is None: + response_head = self._connection.head( + path="/objects/" + class_name + "/" + uuid, + ) + else: + response_head = self._connection.head( + path="/objects/" + class_name + "/" + uuid, + params={"tenant": tenant}, + ) + if response_head.status_code == 404: new_batch.add( class_name=_capitalize_first_letter(class_name), @@ -819,13 +827,17 @@ def _readd_objects_after_timeout( ) continue - # object might already exist and needs to be overwritten in case of an update - response = self._connection.get( - path="/objects/" + class_name + "/" + uuid, - ) + if tenant is None: + response = self._connection.get( + path="/objects/" + class_name + "/" + uuid, + ) + else: + response = self._connection.get( + path="/objects/" + class_name + "/" + uuid, + params={"tenant": tenant}, + ) - obj_weav = _decode_json_response_dict(response, "Re-add objects") - assert obj_weav is not None + obj_weav = response.json() if obj_weav["properties"] != obj["properties"] or obj.get( "vector", None ) != obj_weav.get("vector", None): @@ -834,6 +846,7 @@ def _readd_objects_after_timeout( data_object=obj["properties"], uuid=uuid, vector=obj.get("vector", None), + tenant=tenant, ) return new_batch From 601efcfbb5c6c2d766fcb5028420609248831b98 Mon Sep 17 00:00:00 2001 From: Sirjanpreet Singh Banga <151817113+sirjan-ws-ext@users.noreply.github.com> Date: Thu, 13 Jun 2024 19:22:33 +0530 Subject: [PATCH 02/10] reuse params + get back 3.26.2 changes --- weaviate/batch/crud_batch.py | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index 9357ea470..ad89137a2 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -807,17 +807,13 @@ def _readd_objects_after_timeout( class_name = obj["class"] tenant = obj.get("tenant", None) uuid = obj["id"] - - if tenant is None: - response_head = self._connection.head( - path="/objects/" + class_name + "/" + uuid, - ) - else: - response_head = self._connection.head( - path="/objects/" + class_name + "/" + uuid, - params={"tenant": tenant}, - ) - + params = {"tenant": tenant} if tenant is not None else None + + response_head = self._connection.head( + path="/objects/" + class_name + "/" + uuid, + params=params, + ) + if response_head.status_code == 404: new_batch.add( class_name=_capitalize_first_letter(class_name), @@ -827,17 +823,13 @@ def _readd_objects_after_timeout( ) continue - if tenant is None: - response = self._connection.get( - path="/objects/" + class_name + "/" + uuid, - ) - else: - response = self._connection.get( - path="/objects/" + class_name + "/" + uuid, - params={"tenant": tenant}, - ) - - obj_weav = response.json() + + response = self._connection.get( + path="/objects/" + class_name + "/" + uuid, + params=params, + ) + obj_weav = _decode_json_response_dict(response, "Re-add objects") + assert obj_weav is not None if obj_weav["properties"] != obj["properties"] or obj.get( "vector", None ) != obj_weav.get("vector", None): From 2db5ead176145c136cfe62ae134121e2d1a0aff1 Mon Sep 17 00:00:00 2001 From: Sirjanpreet Singh Banga <151817113+sirjan-ws-ext@users.noreply.github.com> Date: Thu, 13 Jun 2024 19:23:06 +0530 Subject: [PATCH 03/10] remove extra space/line --- weaviate/batch/crud_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index ad89137a2..81762025d 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -813,7 +813,7 @@ def _readd_objects_after_timeout( path="/objects/" + class_name + "/" + uuid, params=params, ) - + if response_head.status_code == 404: new_batch.add( class_name=_capitalize_first_letter(class_name), From 3244f305fe7e279371999817ba0aa7ef33461037 Mon Sep 17 00:00:00 2001 From: Sirjanpreet Singh Banga <151817113+sirjan-ws-ext@users.noreply.github.com> Date: Thu, 13 Jun 2024 19:23:52 +0530 Subject: [PATCH 04/10] bring back the comments --- weaviate/batch/crud_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index 81762025d..0bae1129b 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -823,7 +823,7 @@ def _readd_objects_after_timeout( ) continue - + # object might already exist and needs to be overwritten in case of an update response = self._connection.get( path="/objects/" + class_name + "/" + uuid, params=params, From b307fe19276ffc40f6c676d2b5f71e0d204ce2fb Mon Sep 17 00:00:00 2001 From: Sirjanpreet Singh Banga <151817113+sirjan-ws-ext@users.noreply.github.com> Date: Thu, 13 Jun 2024 19:24:19 +0530 Subject: [PATCH 05/10] add back the new lines --- weaviate/batch/crud_batch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index 0bae1129b..81bd6db4d 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -828,6 +828,7 @@ def _readd_objects_after_timeout( path="/objects/" + class_name + "/" + uuid, params=params, ) + obj_weav = _decode_json_response_dict(response, "Re-add objects") assert obj_weav is not None if obj_weav["properties"] != obj["properties"] or obj.get( From c32c27fc24df63427c467e16f66c5586e0d17c5c Mon Sep 17 00:00:00 2001 From: Dirk Kulawiak Date: Mon, 24 Jun 2024 13:35:16 +0200 Subject: [PATCH 06/10] Add tests for automatic retry --- mock_tests/conftest.py | 8 +++---- mock_tests/test_automatic_retries.py | 32 +++++++++++++++++++++++++++- weaviate/batch/crud_batch.py | 14 ++++++------ weaviate/embedded.py | 6 +++--- weaviate/util.py | 10 +++++---- 5 files changed, 52 insertions(+), 18 deletions(-) diff --git a/mock_tests/conftest.py b/mock_tests/conftest.py index 740563846..175c7b10d 100644 --- a/mock_tests/conftest.py +++ b/mock_tests/conftest.py @@ -36,13 +36,13 @@ def weaviate_mock(ready_mock): @pytest.fixture(scope="function") -def weaviate_no_auth_mock(ready_mock): - ready_mock.expect_request("/v1/meta").respond_with_json({"version": "1.16"}) - ready_mock.expect_request("/v1/.well-known/openid-configuration").respond_with_response( +def weaviate_no_auth_mock(weaviate_mock): + weaviate_mock.expect_request("/v1/meta").respond_with_json({"version": "1.16"}) + weaviate_mock.expect_request("/v1/.well-known/openid-configuration").respond_with_response( Response(json.dumps({}), status=404) ) - yield ready_mock + yield weaviate_mock @pytest.fixture(scope="function") diff --git a/mock_tests/test_automatic_retries.py b/mock_tests/test_automatic_retries.py index 6c05c010f..44095bda5 100644 --- a/mock_tests/test_automatic_retries.py +++ b/mock_tests/test_automatic_retries.py @@ -1,8 +1,8 @@ import json +import uuid from typing import Optional import pytest -import uuid from werkzeug.wrappers import Request, Response import weaviate @@ -283,3 +283,33 @@ def callback_print_all(results: Optional[BatchResponse]): # callback output for each object print_output, err = capfd.readouterr() assert print_output.count("\n") == n + + +def test_retries_with_tenant(weaviate_no_auth_mock): + tenant = "tenant" + first_try = True + + def handler(request: Request): + nonlocal first_try + objects = request.json["objects"] + for obj in objects: + assert obj["tenant"] == tenant + obj["deprecations"] = None + if first_try == 0: + obj["result"] = {"errors": {"error": [{"message": "I'm an error message"}]}} + first_try = False + else: + obj["result"] = {} + return Response(json.dumps(objects)) + + weaviate_no_auth_mock.expect_request("/v1/batch/objects").respond_with_handler(handler) + + client = weaviate.Client(url=MOCK_SERVER_URL) + + n = 10 + with client.batch( + weaviate_error_retries=WeaviateErrorRetryConf(number_retries=1), + ) as batch: + for i in range(n): + batch.add_data_object({"name": "test" + str(i)}, "test", uuid.uuid4(), tenant=tenant) + weaviate_no_auth_mock.check_assertions() diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index 81bd6db4d..e4f41a405 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -973,7 +973,9 @@ class NonExistingClass not present" self._objects_throughput_frame ) - self._recommended_num_objects = max(round(obj_per_second * self._creation_time), 1) + self._recommended_num_objects = max( + round(obj_per_second * float(self._creation_time)), 1 + ) res = _decode_json_response_list(response, "batch add objects") assert res is not None @@ -1070,7 +1072,7 @@ def create_references(self) -> list: self._references_throughput_frame ) - self._recommended_num_references = round(ref_per_sec * self._creation_time) + self._recommended_num_references = round(ref_per_sec * float(self._creation_time)) res = _decode_json_response_list(response, "Create references") assert res is not None @@ -1177,7 +1179,7 @@ def _send_batch_requests(self, force_wait: bool) -> None: ) self._recommended_num_objects = max( min( - round(obj_per_second * self._creation_time), + round(obj_per_second * float(self._creation_time)), self._recommended_num_objects + 250, ), 1, @@ -1215,7 +1217,7 @@ def _send_batch_requests(self, force_wait: bool) -> None: self._references_throughput_frame ) self._recommended_num_references = min( - round(ref_per_sec * self._creation_time), + round(ref_per_sec * float(self._creation_time)), self._recommended_num_references * 2, ) @@ -1747,11 +1749,11 @@ def creation_time(self, value: Real) -> None: _check_positive_num(value, "creation_time", Real) if self._recommended_num_references is not None: self._recommended_num_references = round( - self._recommended_num_references * value / self._creation_time + self._recommended_num_references * float(value) / float(self._creation_time) ) if self._recommended_num_objects is not None: self._recommended_num_objects = round( - self._recommended_num_objects * value / self._creation_time + self._recommended_num_objects * float(value) / float(self._creation_time) ) self._creation_time = value if self._batching_type: diff --git a/weaviate/embedded.py b/weaviate/embedded.py index a1b500c4a..7aafccdff 100644 --- a/weaviate/embedded.py +++ b/weaviate/embedded.py @@ -15,7 +15,7 @@ from typing import Dict, Optional import requests -import validators # type: ignore +import validators from weaviate import exceptions from weaviate.exceptions import WeaviateStartUpError @@ -184,8 +184,8 @@ def wait_till_listening(self) -> None: def check_supported_platform() -> None: if platform.system() in ["Windows"]: raise WeaviateStartUpError( - f"{platform.system()} is not supported with EmbeddedDB. Please upvote this feature request if " - f"you want this: https://github.com/weaviate/weaviate/issues/3315" + f"""{platform.system()} is not supported with EmbeddedDB. Please upvote this feature request if you want + this: https://github.com/weaviate/weaviate/issues/3315""" # noqa: E231 ) def start(self) -> None: diff --git a/weaviate/util.py b/weaviate/util.py index b344acb4f..0cdde8b69 100644 --- a/weaviate/util.py +++ b/weaviate/util.py @@ -5,13 +5,13 @@ import json import os import re +import uuid as uuid_lib from enum import Enum, EnumMeta from io import BufferedReader from typing import Union, Sequence, Any, Optional, List, Dict, Tuple, cast import requests -import uuid as uuid_lib -import validators # type: ignore +import validators from requests.exceptions import JSONDecodeError from weaviate.exceptions import ( @@ -199,8 +199,10 @@ def generate_local_beacon( raise TypeError("Expected to_object_uuid of type str or uuid.UUID") if class_name is None: - return {"beacon": f"weaviate://localhost/{uuid}"} - return {"beacon": f"weaviate://localhost/{_capitalize_first_letter(class_name)}/{uuid}"} + return {"beacon": f"weaviate://localhost/{uuid}"} # noqa: E231 + return { + "beacon": f"weaviate://localhost/{_capitalize_first_letter(class_name)}/{uuid}" # noqa: E231 + } def _get_dict_from_object(object_: Union[str, dict]) -> dict: From 46494c6da586de1abf915a8236bc5c28672be045 Mon Sep 17 00:00:00 2001 From: Dirk Kulawiak Date: Mon, 24 Jun 2024 13:50:10 +0200 Subject: [PATCH 07/10] Print warnings in case of error --- integration/test_authentication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/test_authentication.py b/integration/test_authentication.py index 697973653..9a19369a6 100644 --- a/integration/test_authentication.py +++ b/integration/test_authentication.py @@ -250,7 +250,7 @@ def test_bearer_token_without_refresh(recwarn): ) client.schema.delete_all() # no exception, client works - assert len(recwarn) == 1 + assert len(recwarn) == 1, [wrn.message for wrn in recwarn] w = recwarn.pop() assert issubclass(w.category, UserWarning) assert str(w.message).startswith("Auth002") From f9d93cdc868cee19c4725b8b80f50634eb519c33 Mon Sep 17 00:00:00 2001 From: Dirk Kulawiak Date: Mon, 24 Jun 2024 14:04:27 +0200 Subject: [PATCH 08/10] Ignore deprecation warning in test --- integration/test_authentication.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration/test_authentication.py b/integration/test_authentication.py index 9a19369a6..f2a97c085 100644 --- a/integration/test_authentication.py +++ b/integration/test_authentication.py @@ -122,6 +122,7 @@ def test_authentication_user_pw( """Test authentication using Resource Owner Password Credentials Grant (User + PW).""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) url = "http://127.0.0.1:" + port assert is_auth_enabled(url) @@ -211,6 +212,7 @@ def test_client_with_authentication_with_anon_weaviate(recwarn): """Test that we warn users when their client has auth enabled, but weaviate has only anon access.""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) url = "http://127.0.0.1:" + ANON_PORT assert not is_auth_enabled(url) @@ -234,6 +236,7 @@ def test_bearer_token_without_refresh(recwarn): # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) url = "http://127.0.0.1:" + WCS_PORT assert is_auth_enabled(url) From 014e197dc88b53a14a06e0f308439c0d8dec1dce Mon Sep 17 00:00:00 2001 From: Dirk Kulawiak Date: Mon, 24 Jun 2024 14:17:36 +0200 Subject: [PATCH 09/10] Ignore correct type --- integration/test_authentication.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/test_authentication.py b/integration/test_authentication.py index f2a97c085..53eb4e560 100644 --- a/integration/test_authentication.py +++ b/integration/test_authentication.py @@ -122,7 +122,7 @@ def test_authentication_user_pw( """Test authentication using Resource Owner Password Credentials Grant (User + PW).""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) url = "http://127.0.0.1:" + port assert is_auth_enabled(url) @@ -212,7 +212,7 @@ def test_client_with_authentication_with_anon_weaviate(recwarn): """Test that we warn users when their client has auth enabled, but weaviate has only anon access.""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) url = "http://127.0.0.1:" + ANON_PORT assert not is_auth_enabled(url) @@ -236,7 +236,7 @@ def test_bearer_token_without_refresh(recwarn): # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="Dep005", category=ResourceWarning) + warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) url = "http://127.0.0.1:" + WCS_PORT assert is_auth_enabled(url) From 6883c20ab3b0355e11d10cdf3f2746c9dedac455 Mon Sep 17 00:00:00 2001 From: Dirk Kulawiak Date: Mon, 24 Jun 2024 14:29:15 +0200 Subject: [PATCH 10/10] Now ignore the correct warnings --- integration/test_authentication.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/test_authentication.py b/integration/test_authentication.py index 53eb4e560..c4f20937f 100644 --- a/integration/test_authentication.py +++ b/integration/test_authentication.py @@ -122,7 +122,7 @@ def test_authentication_user_pw( """Test authentication using Resource Owner Password Credentials Grant (User + PW).""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=DeprecationWarning) url = "http://127.0.0.1:" + port assert is_auth_enabled(url) @@ -212,7 +212,7 @@ def test_client_with_authentication_with_anon_weaviate(recwarn): """Test that we warn users when their client has auth enabled, but weaviate has only anon access.""" # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=DeprecationWarning) url = "http://127.0.0.1:" + ANON_PORT assert not is_auth_enabled(url) @@ -236,7 +236,7 @@ def test_bearer_token_without_refresh(recwarn): # testing for warnings can be flaky without this as there are open SSL conections warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) - warnings.filterwarnings(action="ignore", message="upgrading", category=DeprecationWarning) + warnings.filterwarnings(action="ignore", message="Dep005", category=DeprecationWarning) url = "http://127.0.0.1:" + WCS_PORT assert is_auth_enabled(url)