From c9dcbdc75f3d250570f3aff831efbe874e05bff4 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 31 Oct 2023 12:14:14 -0500 Subject: [PATCH] Removed mutex related milvus tests --- morpheus/service/milvus_vector_db_service.py | 35 +++++++++ tests/test_milvus_vector_db_service.py | 74 -------------------- 2 files changed, 35 insertions(+), 74 deletions(-) diff --git a/morpheus/service/milvus_vector_db_service.py b/morpheus/service/milvus_vector_db_service.py index 18ae5dd4a2..c374f24531 100644 --- a/morpheus/service/milvus_vector_db_service.py +++ b/morpheus/service/milvus_vector_db_service.py @@ -148,6 +148,11 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. ------ ValueError If the provided schema fields configuration is empty. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ logger.debug("Creating collection: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) # Preserve original configuration. @@ -219,6 +224,11 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, ------ RuntimeError If the collection not exists exists. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ return self._collection_insert(name, data, **kwargs) @@ -280,6 +290,11 @@ def insert_dataframe(self, ------ RuntimeError If the collection not exists exists. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ if not self.has_store_object(name): raise RuntimeError(f"Collection {name} doesn't exist.") @@ -362,6 +377,11 @@ def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.A ------- dict[str, typing.Any] Returns result of the updated operation stats. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ if not isinstance(data, list): @@ -389,6 +409,11 @@ def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, ------- typing.Any Returns result of the given keys that are delete from the collection. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ result = self._client.delete(collection_name=name, pks=keys, **kwargs) @@ -413,6 +438,11 @@ def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[ ------- dict[str, typing.Any] Returns result of the given keys that are delete from the collection. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ result = self._client.delete_by_expr(collection_name=name, expression=expr, **kwargs) @@ -438,6 +468,11 @@ def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str ------- list[typing.Any] Returns result rows of the given keys from the collection. + + Notes + ----- + This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring + and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database. """ result = None diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 28a3d646ed..1ab9e9a4c9 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures import json import os @@ -348,79 +347,6 @@ def test_delete(milvus_service_fixture: MilvusVectorDBService, milvus_service_fixture.drop(collection_name) -@pytest.mark.slow -def test_single_instance_with_collection_lock(milvus_service_fixture: MilvusVectorDBService, - idx_part_collection_config_fixture: dict, - data_fixture: list[dict]): - - # Create a collection. - collection_name = "test_insert_and_search_order_with_collection_lock" - milvus_service_fixture.create(collection_name, **idx_part_collection_config_fixture) - - filter_query = "age == 26 or age == 27" - search_vec = np.random.random((1, 10)) - execution_order = [] - - def insert_data(): - result = milvus_service_fixture.insert(collection_name, data_fixture) - assert result['insert_count'] == len(data_fixture) - execution_order.append("Insert Executed") - - def search_data(): - result = milvus_service_fixture.search(collection_name, data=search_vec, filter=filter_query) - execution_order.append("Search Executed") - assert isinstance(result, list) - - def count_entities(): - milvus_service_fixture.count(collection_name) - execution_order.append("Count Collection Entities Executed") - - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - executor.submit(insert_data) - executor.submit(search_data) - executor.submit(count_entities) - - # Assert the execution order - assert execution_order == ["Count Collection Entities Executed", "Insert Executed", "Search Executed"] - - -@pytest.mark.slow -def test_multi_instance_with_collection_lock(milvus_service_fixture: MilvusVectorDBService, - idx_part_collection_config_fixture: dict, - data_fixture: list[dict], - milvus_server_uri: str): - - milvus_service_2 = MilvusVectorDBService(uri=milvus_server_uri) - - collection_name = "test_insert_and_search_order_with_collection_lock" - filter_query = "age == 26 or age == 27" - search_vec = np.random.random((1, 10)) - - execution_order = [] - - def create_collection(): - milvus_service_fixture.create(collection_name, **idx_part_collection_config_fixture) - execution_order.append("Create Executed") - - def insert_data(): - result = milvus_service_2.insert(collection_name, data_fixture) - assert result['insert_count'] == len(data_fixture) - execution_order.append("Insert Executed") - - def search_data(): - result = milvus_service_fixture.search(collection_name, data=search_vec, filter=filter_query) - execution_order.append("Search Executed") - assert isinstance(result, list) - - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - executor.submit(create_collection) - executor.submit(insert_data) - executor.submit(search_data) - - # Assert the execution order - assert execution_order == ["Create Executed", "Insert Executed", "Search Executed"] - - def test_get_collection_lock(): collection_name = "test_collection_lock" lock = MilvusVectorDBService.get_collection_lock(collection_name)