From cf5179e64758ba05bf58db80eb2e0165b4e4525f Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sat, 21 Oct 2023 00:48:26 +0700 Subject: [PATCH 1/2] fix: Resolve hbase hotspot issue when materializing Signed-off-by: Hai Nguyen --- .../contrib/hbase_online_store/hbase.py | 44 ++++++++++++++++--- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index aff0c6c42c..a900b4dcac 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -8,7 +8,7 @@ from feast import Entity from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -108,10 +108,11 @@ def online_write_batch( b = hbase.batch(table_name) for entity_key, values, timestamp, created_ts in data: - row_key = serialize_entity_key( + row_key = self._hbase_row_key( entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() + feature_view_name=table.name, + config=config, + ) values_dict = {} for feature_name, val in values.items(): values_dict[ @@ -157,10 +158,11 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] row_keys = [ - serialize_entity_key( + self._hbase_row_key( entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() + feature_view_name=table.name, + config=config, + ) for entity_key in entity_keys ] rows = hbase.rows(table_name, row_keys=row_keys) @@ -234,6 +236,34 @@ def teardown( table_name = _table_id(project, table) hbase.delete_table(table_name) + def _hbase_row_key( + self, + entity_key: EntityKeyProto, + feature_view_name: str, + config: RepoConfig, + ) -> bytes: + """ + Computes the HBase row key for a given entity key and feature view name. + + Args: + entity_key (EntityKeyProto): The entity key to compute the row key for. + feature_view_name (str): The name of the feature view to compute the row key for. + config (RepoConfig): The configuration for the Feast repository. + + Returns: + bytes: The HBase row key for the given entity key and feature view name. + """ + entity_id = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + # Even though `entity_id` uniquely identifies an entity, we use the same table + # for multiple feature_views with the same set of entities. + # To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself. + # This also ensures that features for entities from various feature_views are + # colocated. + return f"{entity_id}#{feature_view_name}".encode() + def _table_id(project: str, table: FeatureView) -> str: """ From 14109d042a8ad6cb5cd3bd2a3f5b9ca479b86843 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Sat, 21 Oct 2023 00:48:58 +0700 Subject: [PATCH 2/2] chore: Refactor internal table id generator Signed-off-by: Hai Nguyen --- .../contrib/hbase_online_store/hbase.py | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index a900b4dcac..2636cf95e2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -104,7 +104,7 @@ def online_write_batch( hbase = HbaseUtils(self._get_conn(config)) project = config.project - table_name = _table_id(project, table) + table_name = self._table_id(project, table) b = hbase.batch(table_name) for entity_key, values, timestamp, created_ts in data: @@ -134,6 +134,9 @@ def online_write_batch( b.put(row_key, values_dict) b.send() + if progress: + progress(len(data)) + @log_exceptions_and_usage(online_store="hbase") def online_read( self, @@ -153,7 +156,7 @@ def online_read( """ hbase = HbaseUtils(self._get_conn(config)) project = config.project - table_name = _table_id(project, table) + table_name = self._table_id(project, table) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] @@ -208,12 +211,12 @@ def update( # We don't create any special state for the entites in this implementation. for table in tables_to_keep: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) if not hbase.check_if_table_exist(table_name): hbase.create_table_with_default_cf(table_name) for table in tables_to_delete: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) hbase.delete_table(table_name) def teardown( @@ -233,7 +236,7 @@ def teardown( project = config.project for table in tables: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) hbase.delete_table(table_name) def _hbase_row_key( @@ -264,13 +267,12 @@ def _hbase_row_key( # colocated. return f"{entity_id}#{feature_view_name}".encode() + def _table_id(self, project: str, table: FeatureView) -> str: + """ + Returns table name given the project_name and the feature_view. -def _table_id(project: str, table: FeatureView) -> str: - """ - Returns table name given the project_name and the feature_view. - - Args: - project: Name of the feast project. - table: Feast FeatureView. - """ - return f"{project}_{table.name}" + Args: + project: Name of the feast project. + table: Feast FeatureView. + """ + return f"{project}:{table.name}"