Skip to content

Commit

Permalink
Removed Mutex Related Milvus Tests (#1325)
Browse files Browse the repository at this point in the history
Removed `with_collection_lock` related tests from Milvus and added notes to function documentation.

Closes #1326 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Bhargav Suryadevara (https://github.com/bsuryadevara)

Approvers:
  - Devin Robison (https://github.com/drobison00)

URL: #1325
  • Loading branch information
bsuryadevara authored Nov 1, 2023
1 parent d91f891 commit 9d258a5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 74 deletions.
35 changes: 35 additions & 0 deletions morpheus/service/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.
------
ValueError
If the provided schema fields configuration is empty.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""
logger.debug("Creating collection: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs)
# Preserve original configuration.
Expand Down Expand Up @@ -219,6 +224,11 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str,
------
RuntimeError
If the collection not exists exists.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""

return self._collection_insert(name, data, **kwargs)
Expand Down Expand Up @@ -280,6 +290,11 @@ def insert_dataframe(self,
------
RuntimeError
If the collection not exists exists.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""
if not self.has_store_object(name):
raise RuntimeError(f"Collection {name} doesn't exist.")
Expand Down Expand Up @@ -362,6 +377,11 @@ def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.A
-------
dict[str, typing.Any]
Returns result of the updated operation stats.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""

if not isinstance(data, list):
Expand Down Expand Up @@ -389,6 +409,11 @@ def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str,
-------
typing.Any
Returns result of the given keys that are delete from the collection.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""

result = self._client.delete(collection_name=name, pks=keys, **kwargs)
Expand All @@ -413,6 +438,11 @@ def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[
-------
dict[str, typing.Any]
Returns result of the given keys that are delete from the collection.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""

result = self._client.delete_by_expr(collection_name=name, expression=expr, **kwargs)
Expand All @@ -438,6 +468,11 @@ def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str
-------
list[typing.Any]
Returns result rows of the given keys from the collection.
Notes
-----
This function is decorated with `with_collection_lock` to ensure that the operation is synchronized by acquiring
and releasing a collection-specific lock, preventing concurrent access issues in the Milvus vector database.
"""

result = None
Expand Down
74 changes: 0 additions & 74 deletions tests/test_milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import json
import os

Expand Down Expand Up @@ -348,79 +347,6 @@ def test_delete(milvus_service_fixture: MilvusVectorDBService,
milvus_service_fixture.drop(collection_name)


@pytest.mark.slow
def test_single_instance_with_collection_lock(milvus_service_fixture: MilvusVectorDBService,
idx_part_collection_config_fixture: dict,
data_fixture: list[dict]):

# Create a collection.
collection_name = "test_insert_and_search_order_with_collection_lock"
milvus_service_fixture.create(collection_name, **idx_part_collection_config_fixture)

filter_query = "age == 26 or age == 27"
search_vec = np.random.random((1, 10))
execution_order = []

def insert_data():
result = milvus_service_fixture.insert(collection_name, data_fixture)
assert result['insert_count'] == len(data_fixture)
execution_order.append("Insert Executed")

def search_data():
result = milvus_service_fixture.search(collection_name, data=search_vec, filter=filter_query)
execution_order.append("Search Executed")
assert isinstance(result, list)

def count_entities():
milvus_service_fixture.count(collection_name)
execution_order.append("Count Collection Entities Executed")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(insert_data)
executor.submit(search_data)
executor.submit(count_entities)

# Assert the execution order
assert execution_order == ["Count Collection Entities Executed", "Insert Executed", "Search Executed"]


@pytest.mark.slow
def test_multi_instance_with_collection_lock(milvus_service_fixture: MilvusVectorDBService,
idx_part_collection_config_fixture: dict,
data_fixture: list[dict],
milvus_server_uri: str):

milvus_service_2 = MilvusVectorDBService(uri=milvus_server_uri)

collection_name = "test_insert_and_search_order_with_collection_lock"
filter_query = "age == 26 or age == 27"
search_vec = np.random.random((1, 10))

execution_order = []

def create_collection():
milvus_service_fixture.create(collection_name, **idx_part_collection_config_fixture)
execution_order.append("Create Executed")

def insert_data():
result = milvus_service_2.insert(collection_name, data_fixture)
assert result['insert_count'] == len(data_fixture)
execution_order.append("Insert Executed")

def search_data():
result = milvus_service_fixture.search(collection_name, data=search_vec, filter=filter_query)
execution_order.append("Search Executed")
assert isinstance(result, list)

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(create_collection)
executor.submit(insert_data)
executor.submit(search_data)

# Assert the execution order
assert execution_order == ["Create Executed", "Insert Executed", "Search Executed"]


def test_get_collection_lock():
collection_name = "test_collection_lock"
lock = MilvusVectorDBService.get_collection_lock(collection_name)
Expand Down

0 comments on commit 9d258a5

Please sign in to comment.