Skip to content

Commit

Permalink
Adding initial update changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Manisha Sudhir committed Aug 7, 2023
1 parent 84a8ea3 commit 0578b9b
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 4 deletions.
16 changes: 16 additions & 0 deletions sdk/python/docs/source/feast.protos.feast.core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ feast.protos.feast.core.FeatureView\_pb2\_grpc module
:undoc-members:
:show-inheritance:

feast.protos.feast.core.VectorFeatureView\_pb2 module
-----------------------------------------------

.. automodule:: feast.protos.feast.core.VectorFeatureView_pb2
:members:
:undoc-members:
:show-inheritance:

feast.protos.feast.core.VectorFeatureView\_pb2\_grpc module
-----------------------------------------------------

.. automodule:: feast.protos.feast.core.VectorFeatureView_pb2_grpc
:members:
:undoc-members:
:show-inheritance:

feast.protos.feast.core.Feature\_pb2 module
-------------------------------------------

Expand Down
30 changes: 27 additions & 3 deletions sdk/python/feast/expediagroup/vectordb/milvus_online_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

Expand All @@ -10,6 +11,10 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel

from pymilvus import Collection, FieldSchema, CollectionSchema, DataType, connections, utility

logger = logging.getLogger(__name__)


class MilvusOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for the Milvus online store"""
Expand All @@ -23,6 +28,11 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel):
port: int = 19530
""" the port to connect to a Milvus instance. Should be the one used for GRPC (default: 19530) """

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Establish the Milvus connection using the provided host and port
connections.connect(host=self.host, port=self.port, use_secure=True)


class MilvusOnlineStore(VectorOnlineStore):
def online_write_batch(
Expand Down Expand Up @@ -58,9 +68,23 @@ def update(
entities_to_keep: Sequence[Entity],
partial: bool,
):
raise NotImplementedError(
"to be implemented in https://jira.expedia.biz/browse/EAPC-7970"
)
for table_to_keep in tables_to_keep:
try:
Collection(name=table_to_keep.name, schema=table_to_keep.schema)
logger.info(f"Collection {table_to_keep.name} has been updated successfully.")
except Exception as e:
logger.error(f"Collection update failed due to {e}")

for table_to_delete in tables_to_delete:
collection_available = utility.has_collection(table_to_delete.name)
try:
if collection_available:
utility.drop_collection(table_to_delete.name)
logger.info(f"Collection {table_to_keep.name} has been deleted successfully.")
else:
return logger.error("Collection does not exist or is already deleted.")
except Exception as e:
logger.error(f"Collection deletion failed due to {e}")

def teardown(
self,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ def __init__(self, **data: Any):
self._online_config = "dynamodb"
elif data["provider"] == "rockset":
self._online_config = "rockset"
elif data["provider"] == "milvus":
self._online_config = "milvus"

self._batch_engine = None
if "batch_engine" in data:
Expand Down
159 changes: 159 additions & 0 deletions sdk/python/tests/expediagroup/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import logging
from typing import List, Optional
from dataclasses import dataclass
import pytest
from pymilvus import (
Collection, FieldSchema, CollectionSchema, DataType, utility
)
from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator
from feast.repo_config import RepoConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.expediagroup.vectordb.milvus_online_store import (
MilvusOnlineStoreConfig, MilvusOnlineStore
)
from feast.field import Field

logging.basicConfig(level=logging.INFO)

REGISTRY = "s3://test_registry/registry.db"
PROJECT = "test_aws"
PROVIDER = "aws"
TABLE_NAME = "milvus_online_store"
REGION = "us-west-2"
HOST = "localhost"


@dataclass
class MockFeatureView:
name: str
schema: Optional[List[Field]]


@pytest.fixture
def repo_config():
return RepoConfig(
registry=REGISTRY,
project=PROJECT,
provider=PROVIDER,
online_store=MilvusOnlineStoreConfig(host=HOST, region=REGION),
offline_store=FileOfflineStoreConfig(),
entity_key_serialization_version=2,
)


@pytest.fixture
def milvus_online_store():
return MilvusOnlineStore()


@pytest.fixture(scope="class")
def milvus_online_setup():
# Creating an online store through embedded Milvus for all tests in the class
online_store_creator = MilvusOnlineStoreCreator("milvus")
online_store_creator.create_online_store()

yield online_store_creator

# Tearing down the Milvus instance after all tests in the class
online_store_creator.teardown()


class TestMilvusOnlineStore:
def test_milvus_update(self, milvus_online_setup):

collection_to_delete = "Collection1"
collection_to_write = "Collection2"
MilvusOnlineStoreConfig(host=HOST)

# Creating a common schema for collection
schema = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])

# Ensuring no collections exist at the start of the test
utility.drop_collection(collection_to_delete)
utility.drop_collection(collection_to_write)

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name=collection_to_delete, schema=schema)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

assert len(utility.list_collections()) == 1

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[MockFeatureView(name=collection_to_delete, schema=None)],
tables_to_keep=[MockFeatureView(name=collection_to_write, schema=schema)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

logging.info(utility.list_collections())
assert utility.has_collection(collection_to_write) is True
assert utility.has_collection(collection_to_delete) is False
assert len(utility.list_collections()) == 1


def connect_from_connections():

online_store_creator = MilvusOnlineStoreCreator("milvus")
online_store_creator.create_online_store()

# create dummy table to delete
db_table_delete_name = "Collection2"
schema2 = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])

MilvusOnlineStoreConfig(host=HOST)

logging.info(utility.list_collections())

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema2)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

logging.info(utility.list_collections())

# # logging.info(utility.has_collection("new_collection")) # Output: False
# logging.info(utility.list_collections())
# schema = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int64", is_primary=True), FieldSchema("float_vector", DataType.FLOAT_VECTOR, is_primary=False, dim=128), ])
# Collection(name="old_collection", schema=schema)
# logging.info(utility.list_collections())
# utility.rename_collection("old_collection", "new_collection") # Output: True

# utility.drop_collection("new_collection")
# logging.info(utility.list_collections())
# logging.info(utility.has_collection("new_collection"))

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[MockFeatureView(name="ew", schema=None)],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema2)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

schema3 = CollectionSchema(fields=[FieldSchema("int64", DataType.INT64, description="int is new", is_primary=False), FieldSchema("varchar_vector", DataType.VARCHAR, is_primary=True, dim=128)])

MilvusOnlineStore().update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name="Collection1", schema=schema3)],
entities_to_delete=None,
entities_to_keep=None,
partial=None
)

online_store_creator.teardown()


connect_from_connections()
7 changes: 6 additions & 1 deletion sdk/python/tests/expediagroup/test_milvus_online_store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from pymilvus.client.stub import Milvus
from pymilvus.client.types import DataType
from dataclasses import dataclass

from tests.expediagroup.milvus_online_store_creator import MilvusOnlineStoreCreator


@dataclass
class MockFeatureView:
name: str


def test_milvus_start_stop():
# this is just an example how to start / stop Milvus. Once a real test is implemented this test can be deleted
online_store_creator = MilvusOnlineStoreCreator("milvus")
Expand Down Expand Up @@ -33,5 +39,4 @@ def test_milvus_start_stop():

collection = milvus.describe_collection(collection_name)
assert collection.get("collection_name") == collection_name

online_store_creator.teardown()

0 comments on commit 0578b9b

Please sign in to comment.