From 25e8c50dc24858e6fe9c57188f08940685e9d0df Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 14 Feb 2024 13:41:49 -0500 Subject: [PATCH 1/7] vertex ai v1 --- .../vertex_ai_feature_store.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py new file mode 100644 index 000000000000..bf2b16b5b275 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging + +__all__ = [] + +from apache_beam.io.requestresponse import RequestT, ResponseT +from apache_beam.transforms.enrichment import EnrichmentSourceHandler + +_LOGGER = logging.getLogger(__name__) + + +class VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler): + def __init__(self): + pass + + def __enter__(self): + pass + + def __call__(self, request: RequestT, *args, **kwargs) -> ResponseT: + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def get_cache_request_key(self): + pass From 4a13130796f93cceb042c3f2d5e364c1eec9d19b Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Tue, 20 Feb 2024 15:51:20 -0500 Subject: [PATCH 2/7] vertex ai base --- .../vertex_ai_feature_store.py | 75 ++++++++++++++++--- .../vertex_ai_feature_store_it_test.py | 25 +++++++ ...tex_ai_feature_store_tests_requirement.txt | 18 +++++ 3 files changed, 107 insertions(+), 11 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_tests_requirement.txt diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py index bf2b16b5b275..e825c5a6a0fa 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py @@ -14,28 +14,81 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging -__all__ = [] +__all__ = [ + 'VertexAIFeatureStoreEnrichmentHandler', +] -from apache_beam.io.requestresponse import RequestT, ResponseT +from typing import List + +from google.cloud.aiplatform_v1 import FetchFeatureValuesRequest, FeatureOnlineStoreServiceClient + +import apache_beam as beam from apache_beam.transforms.enrichment import EnrichmentSourceHandler _LOGGER = logging.getLogger(__name__) -class VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler): - def __init__(self): - pass +class VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, + beam.Row]): + """Handler to interact with Vertex AI feature store using + :class:`apache_beam.transforms.enrichment.Enrichment` transform. + """ + def __init__( + self, + project: str, + location: str, + api_endpoint: str, + feature_store_name: str, + feature_view_name: str, + entity_type_name: str, + feature_ids: List[str]): + """Initializes an instance of `VertexAIFeatureStoreEnrichmentHandler`. + + Args: + project (str): The GCP project for the Vertex AI feature store. + location (str): The region for the Vertex AI feature store. + api_endpoint (str): The API endpoint for the Vertex AI feature store. + feature_store_name (str): The name of the Vertex AI feature store. + feature_view_name (str): The name of the feature view within the + feature store. + entity_type_name (str): The name of the entity type within the + feature store. + feature_ids (List[str]): A list of feature IDs to fetch + from the feature store. + """ + self.project = project + self.location = location + self.api_endpoint = api_endpoint + self.feature_store_name = feature_store_name + self.feature_view_name = feature_view_name + self.entity_type_name = entity_type_name + self.feature_ids = feature_ids def __enter__(self): - pass + self.client = FeatureOnlineStoreServiceClient( + client_options={"api_endpoint": self.api_endpoint}) - def __call__(self, request: RequestT, *args, **kwargs) -> ResponseT: - pass + def __call__(self, request, *args, **kwargs): + entity_id = request._asdict()[self.entity_type_name] + response = self.client.fetch_feature_values( + FetchFeatureValuesRequest( + feature_view=( + "projects/%s/locations/%s/featureOnlineStores/%s/feature" + "Views/%s" % ( + self.project, + self.location, + self.feature_store_name, + self.feature_view_name)), + data_key=entity_id, + )) + response_dict = json.loads(response.key_values) + return request, response_dict def __exit__(self, exc_type, exc_val, exc_tb): - pass + self.client.__exit__() - def get_cache_request_key(self): - pass + def get_cache_key(self, request): + return 'entity_id: %s' diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py new file mode 100644 index 000000000000..466ca10c79c6 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + + +class TestVertexAIFeatureStoreHandler(unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_tests_requirement.txt b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_tests_requirement.txt new file mode 100644 index 000000000000..cd74683a51c1 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_tests_requirement.txt @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +redis +google-cloud-aiplatform From 8962b2f384ae19a76e4a25c4d0c9412ed8871aeb Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 22 Feb 2024 13:03:09 -0500 Subject: [PATCH 3/7] bigtable and legacy support --- .../trigger_files/beam_PostCommit_Python.json | 0 .../apache_beam/transforms/enrichment.py | 6 +- .../enrichment_handlers/bigtable.py | 20 +- .../transforms/enrichment_handlers/utils.py | 38 +++ .../vertex_ai_feature_store.py | 277 +++++++++++++++--- .../vertex_ai_feature_store_it_test.py | 264 ++++++++++++++++- sdks/python/test-suites/direct/common.gradle | 5 +- 7 files changed, 546 insertions(+), 64 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python.json create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/utils.py diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/python/apache_beam/transforms/enrichment.py b/sdks/python/apache_beam/transforms/enrichment.py index 93344835e930..ddfbba5337fb 100644 --- a/sdks/python/apache_beam/transforms/enrichment.py +++ b/sdks/python/apache_beam/transforms/enrichment.py @@ -161,8 +161,10 @@ def expand(self, throttler=self._throttler) # EnrichmentSourceHandler returns a tuple of (request,response). - return fetched_data | beam.Map( - lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())) + return ( + fetched_data + | "enrichment_join" >> + beam.Map(lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict()))) def with_redis_cache( self, diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py b/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py index 943000a9f6bb..af35f91a42f3 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py @@ -15,7 +15,6 @@ # limitations under the License. # import logging -from enum import Enum from typing import Any from typing import Dict from typing import Optional @@ -28,30 +27,15 @@ import apache_beam as beam from apache_beam.transforms.enrichment import EnrichmentSourceHandler +from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel __all__ = [ 'BigTableEnrichmentHandler', - 'ExceptionLevel', ] _LOGGER = logging.getLogger(__name__) -class ExceptionLevel(Enum): - """ExceptionLevel defines the exception level options to either - log a warning, or raise an exception, or do nothing when a BigTable query - returns an empty row. - - Members: - - RAISE: Raise the exception. - - WARN: Log a warning for exception without raising it. - - QUIET: Neither log nor raise the exception. - """ - RAISE = 0 - WARN = 1 - QUIET = 2 - - class BigTableEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): """A handler for :class:`apache_beam.transforms.enrichment.Enrichment` transform to interact with GCP BigTable. @@ -70,7 +54,7 @@ class BigTableEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): encoding (str): encoding type to convert the string to bytes and vice-versa from BigTable. Default is `utf-8`. exception_level: a `enum.Enum` value from - ``apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel`` + ``apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel`` to set the level when an empty row is returned from the BigTable query. Defaults to ``ExceptionLevel.WARN``. include_timestamp (bool): If enabled, the timestamp associated with the diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/utils.py b/sdks/python/apache_beam/transforms/enrichment_handlers/utils.py new file mode 100644 index 000000000000..c61671402576 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/utils.py @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from enum import Enum + +__all__ = [ + 'ExceptionLevel', +] + + +class ExceptionLevel(Enum): + """Options to set the severity of exceptions. + + You can set the exception level option to either + log a warning, or raise an exception, or do nothing when an empty row + is fetched from the external service. + + Members: + - RAISE: Raise the exception. + - WARN: Log a warning for exception without raising it. + - QUIET: Neither log nor raise the exception. + """ + RAISE = 0 + WARN = 1 + QUIET = 2 diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py index e825c5a6a0fa..dc45249a1aef 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py @@ -14,27 +14,51 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import logging - -__all__ = [ - 'VertexAIFeatureStoreEnrichmentHandler', -] - from typing import List -from google.cloud.aiplatform_v1 import FetchFeatureValuesRequest, FeatureOnlineStoreServiceClient +import proto +from google.api_core.exceptions import NotFound +from google.cloud import aiplatform import apache_beam as beam from apache_beam.transforms.enrichment import EnrichmentSourceHandler +from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel + +__all__ = [ + 'VertexAIFeatureStoreEnrichmentHandler', + 'VertexAIFeatureStoreLegacyEnrichmentHandler' +] _LOGGER = logging.getLogger(__name__) +def _not_found_err_message( + feature_store_name: str, feature_view_name: str, entity_id: str) -> str: + """returns a string formatted with given parameters""" + return ( + "make sure the Feature Store: %s with Feature View " + "%s has entity_id: %s" % + (feature_store_name, feature_view_name, entity_id)) + + class VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]): - """Handler to interact with Vertex AI feature store using - :class:`apache_beam.transforms.enrichment.Enrichment` transform. + """Enrichment handler to interact with Vertex AI Feature Store. + + Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` + transform when the Vertex AI Feature Store is set up for + Bigtable Online serving. + + With the Bigtable Online serving approach, the client fetches all the + available features for an entity-id. To filter the features to enrich, use + the `join_fn` param in :class:`apache_beam.transforms.enrichment.Enrichment`. + + **NOTE:** The default severity to report exceptions is logging a warning. For + this handler, Vertex AI client returns the same exception + `Requested entity was not found` even though the feature store doesn't + exist. So make sure the feature store instance exists or set + `exception_level` as `ExceptionLevel.RAISE`. """ def __init__( self, @@ -43,52 +67,223 @@ def __init__( api_endpoint: str, feature_store_name: str, feature_view_name: str, - entity_type_name: str, - feature_ids: List[str]): + row_key: str, + *, + exception_level: ExceptionLevel = ExceptionLevel.WARN, + **kwargs, + ): """Initializes an instance of `VertexAIFeatureStoreEnrichmentHandler`. Args: - project (str): The GCP project for the Vertex AI feature store. - location (str): The region for the Vertex AI feature store. - api_endpoint (str): The API endpoint for the Vertex AI feature store. - feature_store_name (str): The name of the Vertex AI feature store. + project (str): The GCP project-id for the Vertex AI Feature Store. + location (str): The region for the Vertex AI Feature Store. + api_endpoint (str): The API endpoint for the Vertex AI Feature Store. + feature_store_name (str): The name of the Vertex AI Feature Store. feature_view_name (str): The name of the feature view within the - feature store. - entity_type_name (str): The name of the entity type within the - feature store. - feature_ids (List[str]): A list of feature IDs to fetch - from the feature store. + Feature Store. + row_key (str): The row key field name containing the unique id + for the feature values. + exception_level: a `enum.Enum` value from + `apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel` + to set the level when an empty row is returned from the BigTable query. + Defaults to `ExceptionLevel.WARN`. + kwargs: Optional keyword arguments to configure the + :class:`aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using + `kwargs`, the `api_endpoint` param will be overridden with this config + if multiple values are found for `client_options`. """ self.project = project self.location = location self.api_endpoint = api_endpoint self.feature_store_name = feature_store_name self.feature_view_name = feature_view_name - self.entity_type_name = entity_type_name + self.row_key = row_key + self.exception_level = exception_level + self.kwargs = kwargs if kwargs else {} + + def __enter__(self): + """Connect with the Vertex AI Feature Store.""" + if 'client_options' in self.kwargs: + self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient( + **self.kwargs) + else: + self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient( + client_options={"api_endpoint": self.api_endpoint}, **self.kwargs) + self.feature_view_path = self.client.feature_view_path( + self.project, + self.location, + self.feature_store_name, + self.feature_view_name) + + def __call__(self, request: beam.Row, *args, **kwargs): + """Fetches feature value for an entity-id from Vertex AI Feature Store. + + Args: + request: the input `beam.Row` to enrich. + """ + try: + entity_id = request._asdict()[self.row_key] + except KeyError: + raise ValueError( + "no entry found for row_key %s in input row" % self.row_key) + try: + response = self.client.fetch_feature_values( + request=aiplatform.gapic.FetchFeatureValuesRequest( + data_key=aiplatform.gapic.FeatureViewDataKey(key=entity_id), + feature_view=self.feature_view_path, + data_format=aiplatform.gapic.FeatureViewDataFormat.PROTO_STRUCT, + )) + except NotFound: + if self.exception_level == ExceptionLevel.WARN: + _LOGGER.warning( + _not_found_err_message( + self.feature_store_name, self.feature_view_name, entity_id)) + return request, beam.Row() + elif self.exception_level == ExceptionLevel.RAISE: + raise ValueError( + _not_found_err_message( + self.feature_store_name, self.feature_view_name, entity_id)) + response_dict = dict(response.proto_struct) + return request, beam.Row(**response_dict) + + def __exit__(self, exc_type, exc_val, exc_tb): + """Clean the instantiated Vertex AI client.""" + self.client = None + + def get_cache_key(self, request: beam.Row) -> str: + """Returns a string formatted with unique entity-id for the feature values. + """ + return 'entity_id: %s' + + +class VertexAIFeatureStoreLegacyEnrichmentHandler(EnrichmentSourceHandler): + """Enrichment handler to interact with Vertex AI Feature Store (Legacy). + + Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` + transform for the Vertex AI Feature Store (Legacy). + + By default, it fetches all the features values for an entity-id. You can + specify the features names using `feature_ids` to fetch specific features. + """ + def __init__( + self, + project: str, + location: str, + api_endpoint: str, + feature_store_id: str, + entity_type_id: str, + feature_ids: List[str], + row_key: str, + *, + exception_level: ExceptionLevel = ExceptionLevel.WARN, + **kwargs, + ): + """Initializes an instance of `VertexAIFeatureStoreLegacyEnrichmentHandler`. + + Args: + project (str): The GCP project for the Vertex AI Feature Store (Legacy). + location (str): The region for the Vertex AI Feature Store (Legacy). + api_endpoint (str): The API endpoint for the + Vertex AI Feature Store (Legacy). + feature_store_id (str): The id of the Vertex AI Feature Store (Legacy). + entity_type_id (str): The entity type of the feature store. + feature_ids (List[str]): A list of feature-ids to fetch + from the Feature Store. + row_key (str): The row key field name containing the entity id + for the feature values. + exception_level: a `enum.Enum` value from + `apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel` + to set the level when an empty row is returned from the BigTable query. + Defaults to `ExceptionLevel.WARN`. + kwargs: Optional keyword arguments to configure the + :class:`aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using + `kwargs`, the `api_endpoint` param will be overridden with this config + if multiple values are found for `client_options`. + """ + self.project = project + self.location = location + self.api_endpoint = api_endpoint + self.feature_store_id = feature_store_id + self.entity_type_id = entity_type_id self.feature_ids = feature_ids + self.row_key = row_key + self.exception_level = exception_level + self.kwargs = kwargs if kwargs else {} def __enter__(self): - self.client = FeatureOnlineStoreServiceClient( - client_options={"api_endpoint": self.api_endpoint}) - - def __call__(self, request, *args, **kwargs): - entity_id = request._asdict()[self.entity_type_name] - response = self.client.fetch_feature_values( - FetchFeatureValuesRequest( - feature_view=( - "projects/%s/locations/%s/featureOnlineStores/%s/feature" - "Views/%s" % ( - self.project, - self.location, - self.feature_store_name, - self.feature_view_name)), - data_key=entity_id, - )) - response_dict = json.loads(response.key_values) - return request, response_dict + """Connect with the Vertex AI Feature Store (Legacy).""" + try: + # checks if feature store exists + _ = aiplatform.Featurestore( + featurestore_name=self.feature_store_id, + project=self.project, + location=self.location, + credentials=self.kwargs.get('credentials'), + ) + if 'client_options' in self.kwargs: + self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient( + **self.kwargs) + else: + self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient( + client_options={'api_endpoint': self.api_endpoint}, **self.kwargs) + self.entity_type_path = self.client.entity_type_path( + self.project, + self.location, + self.feature_store_id, + self.entity_type_id) + except NotFound: + raise ValueError( + 'Vertex AI Feature Store %s does not exist' % self.feature_store_id) + + def __call__(self, request: beam.Row, *args, **kwargs): + """Fetches feature value for an entity-id from + Vertex AI Feature Store (Legacy). + + Args: + request: the input `beam.Row` to enrich. + """ + try: + entity_id = request._asdict()[self.row_key] + except KeyError: + raise ValueError( + "no entry found for row_key %s in input row" % self.row_key) + + try: + selector = aiplatform.gapic.FeatureSelector( + id_matcher=aiplatform.gapic.IdMatcher(ids=self.feature_ids)) + response = self.client.read_feature_values( + request=aiplatform.gapic.ReadFeatureValuesRequest( + entity_type=self.entity_type_path, + entity_id=entity_id, + feature_selector=selector)) + except NotFound: + raise ValueError( + _not_found_err_message( + self.feature_store_id, self.entity_type_id, entity_id)) + + response_dict = {} + proto_to_dict = proto.Message.to_dict(response.entity_view) + for key, msg in zip(response.header.feature_descriptors, + proto_to_dict['data']): + if msg and 'value' in msg: + for _, value in msg['value'].items(): + response_dict[key.id] = value + break # skip fetching the metadata + elif self.exception_level == ExceptionLevel.RAISE: + raise ValueError( + _not_found_err_message( + self.feature_store_id, self.entity_type_id, entity_id)) + elif self.exception_level == ExceptionLevel.WARN: + _LOGGER.warning( + _not_found_err_message( + self.feature_store_id, self.entity_type_id, entity_id)) + return request, beam.Row(**response_dict) def __exit__(self, exc_type, exc_val, exc_tb): - self.client.__exit__() + """Clean the instantiated Vertex AI client.""" + self.client = None - def get_cache_key(self, request): + def get_cache_key(self, request: beam.Row) -> str: + """Returns a string formatted with unique entity-id for the feature values. + """ return 'entity_id: %s' diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py index 466ca10c79c6..7a2304dc9887 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py @@ -14,11 +14,273 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import logging import unittest +from unittest.mock import MagicMock +import pytest +import apache_beam as beam +from apache_beam.coders import coders +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import BeamAssertException + +# pylint: disable=ungrouped-imports +try: + from testcontainers.redis import RedisContainer + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import \ + VertexAIFeatureStoreEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import \ + VertexAIFeatureStoreLegacyEnrichmentHandler +except ImportError: + raise unittest.SkipTest( + 'VertexAI Feature Store test dependencies ' + 'are not installed.') + +_LOGGER = logging.getLogger(__name__) + + +class ValidateResponse(beam.DoFn): + """ValidateResponse validates if a PCollection of `beam.Row` + has the required fields.""" + def __init__(self, expected_fields): + self.expected_fields = expected_fields + + def process(self, element: beam.Row, *args, **kwargs): + element_dict = element.as_dict() + if len(self.expected_fields) != len(element_dict.keys()): + raise BeamAssertException( + "Expected %d fields in enriched PCollection:" % + len(self.expected_fields)) + for field in self.expected_fields: + if field not in element_dict: + raise BeamAssertException( + f"Expected to fetch field: {field}" + f"from feature store") + + +@pytest.mark.uses_redis class TestVertexAIFeatureStoreHandler(unittest.TestCase): - pass + def setUp(self) -> None: + self.project = 'google.com:clouddfe' + self.location = 'us-central1' + self.feature_store_name = "the_look_demo_unique" + self.feature_view_name = "registry_product" + self.entity_type_name = "entity_id" + self.api_endpoint = "us-central1-aiplatform.googleapis.com" + self.feature_ids = ['title', 'genres'] + + self._start_container() + + def _start_container(self): + for i in range(3): + try: + self.container = RedisContainer(image='redis:7.2.4') + self.container.start() + self.host = self.container.get_container_host_ip() + self.port = self.container.get_exposed_port(6379) + self.client = self.container.get_client() + break + except Exception as e: + if i == self.retries - 1: + _LOGGER.error('Unable to start redis container for RRIO tests.') + raise e + + def tearDown(self) -> None: + self.container.stop() + self.client = None + + def test_vertex_ai_feature_store_bigtable_serving_enrichment(self): + requests = [ + beam.Row(entity_id="847", name='cardigan jacket'), + beam.Row(entity_id="16050", name='stripe t-shirt'), + ] + expected_fields = [ + 'entity_id', + 'bad_order_count', + 'good_order_count', + 'feature_timestamp', + 'category', + 'cost', + 'brand', + 'retail_price', + 'name' + ] + handler = VertexAIFeatureStoreEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_name=self.feature_store_name, + feature_view_name=self.feature_view_name, + row_key=self.entity_type_name, + ) + + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler) + | beam.ParDo(ValidateResponse(expected_fields))) + + def test_vertex_ai_feature_store_bigtable_serving_enrichment_bad(self): + requests = [ + beam.Row(entity_id="ui", name="fred perry men\'s sharp stripe t-shirt") + ] + handler = VertexAIFeatureStoreEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_name=self.feature_store_name, + feature_view_name=self.feature_view_name, + row_key=self.entity_type_name, + exception_level=ExceptionLevel.RAISE, + ) + with self.assertRaises(ValueError): + test_pipeline = beam.Pipeline() + _ = ( + test_pipeline + | "Create" >> beam.Create(requests) + | "Enrich w/ VertexAI" >> Enrichment(handler)) + res = test_pipeline.run() + res.wait_until_finish() + + def test_vertex_ai_legacy_feature_store_enrichment(self): + requests = [ + beam.Row(entity_id="movie_02", title="The Shining"), + beam.Row(entity_id="movie_04", title='The Dark Knight'), + ] + expected_fields = ['entity_id', 'title', 'genres'] + feature_store_id = "movie_prediction_unique" + entity_type_id = "movies" + handler = VertexAIFeatureStoreLegacyEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_id=feature_store_id, + entity_type_id=entity_type_id, + feature_ids=self.feature_ids, + row_key=self.entity_type_name, + ) + + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler) + | beam.ParDo(ValidateResponse(expected_fields))) + + def test_vertex_ai_legacy_feature_store_enrichment_bad(self): + requests = [ + beam.Row(entity_id="12345", title="The Shining"), + ] + feature_store_id = "movie_prediction_unique" + entity_type_id = "movies" + handler = VertexAIFeatureStoreLegacyEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_id=feature_store_id, + entity_type_id=entity_type_id, + feature_ids=self.feature_ids, + row_key=self.entity_type_name, + exception_level=ExceptionLevel.RAISE, + ) + + with self.assertRaises(ValueError): + test_pipeline = beam.Pipeline() + _ = ( + test_pipeline + | "Create" >> beam.Create(requests) + | "Enrichment" >> Enrichment(handler)) + res = test_pipeline.run() + res.wait_until_finish() + + def test_vertex_ai_legacy_feature_store_invalid_featurestore(self): + requests = [ + beam.Row(entity_id="movie_02", title="The Shining"), + ] + feature_store_id = "invalid_name" + entity_type_id = "movies" + handler = VertexAIFeatureStoreLegacyEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_id=feature_store_id, + entity_type_id=entity_type_id, + feature_ids=self.feature_ids, + row_key=self.entity_type_name, + exception_level=ExceptionLevel.RAISE, + ) + + with self.assertRaises(ValueError): + test_pipeline = beam.Pipeline() + _ = ( + test_pipeline + | "Create" >> beam.Create(requests) + | "Enrichment" >> Enrichment(handler)) + res = test_pipeline.run() + res.wait_until_finish() + + def test_feature_store_enrichment_with_redis(self): + """ + In this test, we run two pipelines back to back. + + In the first pipeline, we run a simple feature store enrichment pipeline + with zero cache records. Therefore, it makes call to the source + and ultimately writes to the cache with a TTL of 300 seconds. + + For the second pipeline, we mock the + `VertexAIFeatureStoreEnrichmentHandler`'s `__call__` method to always + return a `None` response. However, this change won't impact the second + pipeline because the Enrichment transform first checks the cache to fulfill + requests. Since all requests are cached, it will return from there without + making calls to the feature store instance. + """ + expected_fields = ['entity_id', 'title', 'genres'] + requests = [ + beam.Row(entity_id="movie_02", title="The Shining"), + beam.Row(entity_id="movie_04", title="The Dark Knight"), + ] + feature_store_id = "movie_prediction_unique" + entity_type_id = "movies" + handler = VertexAIFeatureStoreLegacyEnrichmentHandler( + project=self.project, + location=self.location, + api_endpoint=self.api_endpoint, + feature_store_id=feature_store_id, + entity_type_id=entity_type_id, + feature_ids=self.feature_ids, + row_key=self.entity_type_name, + ) + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler).with_redis_cache(self.host, self.port) + | beam.ParDo(ValidateResponse(expected_fields))) + + # manually check cache entry + c = coders.StrUtf8Coder() + for req in requests: + key = handler.get_cache_key(req) + response = self.client.get(c.encode(key)) + if not response: + raise ValueError("No cache entry found for %s" % key) + + actual = VertexAIFeatureStoreLegacyEnrichmentHandler.__call__ + VertexAIFeatureStoreLegacyEnrichmentHandler.__call__ = MagicMock( + return_value=( + beam.Row(entity_id="movie_02", title="The Shining"), beam.Row())) + + with TestPipeline(is_integration_test=True) as test_pipeline: + _ = ( + test_pipeline + | beam.Create(requests) + | Enrichment(handler).with_redis_cache(self.host, self.port) + | beam.ParDo(ValidateResponse(expected_fields))) + VertexAIFeatureStoreLegacyEnrichmentHandler.__call__ = actual if __name__ == '__main__': diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index 657f7adf801d..1f677fcacd07 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -368,7 +368,7 @@ task transformersInferenceTest { task enrichmentRedisTest { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - def requirementsFile = "${rootDir}/sdks/python/apache_beam/io/requestresponse_tests_requirements.txt" + def requirementsFile = "${rootDir}/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_tests_requirement.txt" doFirst { exec { executable 'sh' @@ -381,7 +381,8 @@ task enrichmentRedisTest { "test_opts": testOpts, "suite": "postCommitIT-direct-py${pythonVersionSuffix}", "collect": "uses_redis", - "runner": "TestDirectRunner" + "runner": "TestDirectRunner", + "region": "us-central1", ] def cmdArgs = mapToArgString(argMap) exec { From 404c267ad6ee0124680f3b27b6b331a434a2c4c1 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 23 Feb 2024 16:06:47 -0500 Subject: [PATCH 4/7] correct pydoc, project name --- .../transforms/enrichment_handlers/vertex_ai_feature_store.py | 4 ++-- .../enrichment_handlers/vertex_ai_feature_store_it_test.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py index dc45249a1aef..e88214ad9045 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py @@ -88,7 +88,7 @@ def __init__( to set the level when an empty row is returned from the BigTable query. Defaults to `ExceptionLevel.WARN`. kwargs: Optional keyword arguments to configure the - :class:`aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using + `aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using `kwargs`, the `api_endpoint` param will be overridden with this config if multiple values are found for `client_options`. """ @@ -196,7 +196,7 @@ def __init__( to set the level when an empty row is returned from the BigTable query. Defaults to `ExceptionLevel.WARN`. kwargs: Optional keyword arguments to configure the - :class:`aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using + `aiplatform.gapic.FeaturestoreOnlineServingServiceClient`. When using `kwargs`, the `api_endpoint` param will be overridden with this config if multiple values are found for `client_options`. """ diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py index 7a2304dc9887..d4224be060e9 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_it_test.py @@ -64,7 +64,7 @@ def process(self, element: beam.Row, *args, **kwargs): @pytest.mark.uses_redis class TestVertexAIFeatureStoreHandler(unittest.TestCase): def setUp(self) -> None: - self.project = 'google.com:clouddfe' + self.project = 'apache-beam-testing' self.location = 'us-central1' self.feature_store_name = "the_look_demo_unique" self.feature_view_name = "registry_product" From 3465d2a68d5add81c4770b93eeabf28e5d437543 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 28 Feb 2024 16:22:18 -0500 Subject: [PATCH 5/7] clear exception message, refactor, unittests --- CHANGES.md | 1 + .../vertex_ai_feature_store.py | 79 +++++++++++-------- .../vertex_ai_feature_store_test.py | 54 +++++++++++++ 3 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py diff --git a/CHANGES.md b/CHANGES.md index ea59c3e964a4..af51451eeb6b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ * Redis cache support added to RequestResponseIO and Enrichment transform (Python) ([#30307](https://github.com/apache/beam/pull/30307)) * Merged sdks/java/fn-execution and runners/core-construction-java into the main SDK. These artifacts were never meant for users, but noting that they no longer exist. These are steps to bring portability into the core SDK alongside all other core functionality. +* Added Vertex AI Feature Store handler for Enrichment transform (Python) ([#30388](https://github.com/apache/beam/pull/30388)) ## Breaking Changes diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py index e88214ad9045..b135739ef59c 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store.py @@ -51,8 +51,10 @@ class VertexAIFeatureStoreEnrichmentHandler(EnrichmentSourceHandler[beam.Row, Bigtable Online serving. With the Bigtable Online serving approach, the client fetches all the - available features for an entity-id. To filter the features to enrich, use - the `join_fn` param in :class:`apache_beam.transforms.enrichment.Enrichment`. + available features for an entity-id. The entity-id is extracted from the + `row_key` field in the input `beam.Row` object. To filter the features to + enrich, use the `join_fn` param in + :class:`apache_beam.transforms.enrichment.Enrichment`. **NOTE:** The default severity to report exceptions is logging a warning. For this handler, Vertex AI client returns the same exception @@ -88,9 +90,7 @@ def __init__( to set the level when an empty row is returned from the BigTable query. Defaults to `ExceptionLevel.WARN`. kwargs: Optional keyword arguments to configure the - `aiplatform.gapic.FeatureOnlineStoreServiceClient`. When using - `kwargs`, the `api_endpoint` param will be overridden with this config - if multiple values are found for `client_options`. + `aiplatform.gapic.FeatureOnlineStoreServiceClient`. """ self.project = project self.location = location @@ -100,15 +100,20 @@ def __init__( self.row_key = row_key self.exception_level = exception_level self.kwargs = kwargs if kwargs else {} + if 'client_options' in self.kwargs: + if not self.kwargs['client_options']['api_endpoint']: + self.kwargs['client_options']['api_endpoint'] = self.api_endpoint + elif self.kwargs['client_options']['api_endpoint'] != self.api_endpoint: + raise ValueError( + 'Multiple values received for api_endpoint in ' + 'api_endpoint and client_options parameters.') + else: + self.kwargs['client_options'] = {"api_endpoint": self.api_endpoint} def __enter__(self): """Connect with the Vertex AI Feature Store.""" - if 'client_options' in self.kwargs: - self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient( - **self.kwargs) - else: - self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient( - client_options={"api_endpoint": self.api_endpoint}, **self.kwargs) + self.client = aiplatform.gapic.FeatureOnlineStoreServiceClient( + **self.kwargs) self.feature_view_path = self.client.feature_view_path( self.project, self.location, @@ -124,8 +129,12 @@ def __call__(self, request: beam.Row, *args, **kwargs): try: entity_id = request._asdict()[self.row_key] except KeyError: - raise ValueError( - "no entry found for row_key %s in input row" % self.row_key) + raise KeyError( + "Enrichment requests to Vertex AI Feature Store should " + "contain a field: %s in the input `beam.Row` to join " + "the input with fetched response. This is used as the " + "`FeatureViewDataKey` to fetch feature values " + "corresponding to this key." % self.row_key) try: response = self.client.fetch_feature_values( request=aiplatform.gapic.FetchFeatureValuesRequest( @@ -153,7 +162,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def get_cache_key(self, request: beam.Row) -> str: """Returns a string formatted with unique entity-id for the feature values. """ - return 'entity_id: %s' + return 'entity_id: %s' % request._asdict()[self.row_key] class VertexAIFeatureStoreLegacyEnrichmentHandler(EnrichmentSourceHandler): @@ -162,8 +171,10 @@ class VertexAIFeatureStoreLegacyEnrichmentHandler(EnrichmentSourceHandler): Use this handler with :class:`apache_beam.transforms.enrichment.Enrichment` transform for the Vertex AI Feature Store (Legacy). - By default, it fetches all the features values for an entity-id. You can - specify the features names using `feature_ids` to fetch specific features. + By default, it fetches all the features values for an entity-id. The + entity-id is extracted from the `row_key` field in the input `beam.Row` + object.You can specify the features names using `feature_ids` to fetch + specific features. """ def __init__( self, @@ -196,9 +207,7 @@ def __init__( to set the level when an empty row is returned from the BigTable query. Defaults to `ExceptionLevel.WARN`. kwargs: Optional keyword arguments to configure the - `aiplatform.gapic.FeaturestoreOnlineServingServiceClient`. When using - `kwargs`, the `api_endpoint` param will be overridden with this config - if multiple values are found for `client_options`. + `aiplatform.gapic.FeaturestoreOnlineServingServiceClient`. """ self.project = project self.location = location @@ -209,6 +218,15 @@ def __init__( self.row_key = row_key self.exception_level = exception_level self.kwargs = kwargs if kwargs else {} + if 'client_options' in self.kwargs: + if not self.kwargs['client_options']['api_endpoint']: + self.kwargs['client_options']['api_endpoint'] = self.api_endpoint + elif self.kwargs['client_options']['api_endpoint'] != self.api_endpoint: + raise ValueError( + 'Multiple values received for api_endpoint in ' + 'api_endpoint and client_options parameters.') + else: + self.kwargs['client_options'] = {"api_endpoint": self.api_endpoint} def __enter__(self): """Connect with the Vertex AI Feature Store (Legacy).""" @@ -220,12 +238,8 @@ def __enter__(self): location=self.location, credentials=self.kwargs.get('credentials'), ) - if 'client_options' in self.kwargs: - self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient( - **self.kwargs) - else: - self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient( - client_options={'api_endpoint': self.api_endpoint}, **self.kwargs) + self.client = aiplatform.gapic.FeaturestoreOnlineServingServiceClient( + **self.kwargs) self.entity_type_path = self.client.entity_type_path( self.project, self.location, @@ -245,8 +259,12 @@ def __call__(self, request: beam.Row, *args, **kwargs): try: entity_id = request._asdict()[self.row_key] except KeyError: - raise ValueError( - "no entry found for row_key %s in input row" % self.row_key) + raise KeyError( + "Enrichment requests to Vertex AI Feature Store should " + "contain a field: %s in the input `beam.Row` to join " + "the input with fetched response. This is used as the " + "`FeatureViewDataKey` to fetch feature values " + "corresponding to this key." % self.row_key) try: selector = aiplatform.gapic.FeatureSelector( @@ -266,9 +284,8 @@ def __call__(self, request: beam.Row, *args, **kwargs): for key, msg in zip(response.header.feature_descriptors, proto_to_dict['data']): if msg and 'value' in msg: - for _, value in msg['value'].items(): - response_dict[key.id] = value - break # skip fetching the metadata + response_dict[key.id] = list(msg['value'].values())[0] + # skip fetching the metadata elif self.exception_level == ExceptionLevel.RAISE: raise ValueError( _not_found_err_message( @@ -286,4 +303,4 @@ def __exit__(self, exc_type, exc_val, exc_tb): def get_cache_key(self, request: beam.Row) -> str: """Returns a string formatted with unique entity-id for the feature values. """ - return 'entity_id: %s' + return 'entity_id: %s' % request._asdict()[self.row_key] diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py new file mode 100644 index 000000000000..21a3b8098e85 --- /dev/null +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +try: + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \ + import VertexAIFeatureStoreEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \ + import VertexAIFeatureStoreLegacyEnrichmentHandler +except ImportError: + raise unittest.SkipTest( + 'VertexAI Feature Store test dependencies ' + 'are not installed.') + + +class TestVertexAIFeatureStoreHandlerInit(unittest.TestCase): + def test_raise_error_duplicate_api_endpoint_online_store(self): + with self.assertRaises(ValueError): + _ = VertexAIFeatureStoreEnrichmentHandler( + project='project', + location='location', + api_endpoint='location@google.com', + feature_store_name='feature_store', + feature_view_name='feature_view', + row_key='row_key', + client_options={'api_endpoint': 'region@google.com'}, + ) + + def test_raise_error_duplicate_api_endpoint_legacy_store(self): + with self.assertRaises(ValueError): + _ = VertexAIFeatureStoreLegacyEnrichmentHandler( + project='project', + location='location', + api_endpoint='location@google.com', + feature_store_id='feature_store', + entity_type_id='entity_id', + feature_ids=['feature1', 'feature2'], + row_key='row_key', + client_options={'api_endpoint': 'region@google.com'}, + ) From 218547fdb2ffd4e8797afe59efa125d2ef247b41 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 29 Feb 2024 09:19:24 -0500 Subject: [PATCH 6/7] add main --- .../enrichment_handlers/vertex_ai_feature_store_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py index 21a3b8098e85..352146ecc078 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/vertex_ai_feature_store_test.py @@ -52,3 +52,7 @@ def test_raise_error_duplicate_api_endpoint_legacy_store(self): row_key='row_key', client_options={'api_endpoint': 'region@google.com'}, ) + + +if __name__ == '__main__': + unittest.main() From d4b155945f1f27f8aa8c04a16de17bcebed5fba7 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Wed, 6 Mar 2024 09:50:28 -0500 Subject: [PATCH 7/7] drop postcommit file --- .github/trigger_files/beam_PostCommit_Python.json | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .github/trigger_files/beam_PostCommit_Python.json diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json deleted file mode 100644 index e69de29bb2d1..000000000000