Skip to content

Commit

Permalink
fix: Resolve hbase hotspot issue when materializing (feast-dev#3790)
Browse files Browse the repository at this point in the history
* fix: Resolve hbase hotspot issue when materializing

Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>

* chore: Refactor internal table id generator

Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>

---------

Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
  • Loading branch information
sudohainguyen authored Oct 20, 2023
1 parent 9b0e5ce commit 7376db8
Showing 1 changed file with 52 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -104,14 +104,15 @@ def online_write_batch(

hbase = HbaseUtils(self._get_conn(config))
project = config.project
table_name = _table_id(project, table)
table_name = self._table_id(project, table)

b = hbase.batch(table_name)
for entity_key, values, timestamp, created_ts in data:
row_key = serialize_entity_key(
row_key = self._hbase_row_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
feature_view_name=table.name,
config=config,
)
values_dict = {}
for feature_name, val in values.items():
values_dict[
Expand All @@ -133,6 +134,9 @@ def online_write_batch(
b.put(row_key, values_dict)
b.send()

if progress:
progress(len(data))

@log_exceptions_and_usage(online_store="hbase")
def online_read(
self,
Expand All @@ -152,15 +156,16 @@ def online_read(
"""
hbase = HbaseUtils(self._get_conn(config))
project = config.project
table_name = _table_id(project, table)
table_name = self._table_id(project, table)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

row_keys = [
serialize_entity_key(
self._hbase_row_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
feature_view_name=table.name,
config=config,
)
for entity_key in entity_keys
]
rows = hbase.rows(table_name, row_keys=row_keys)
Expand Down Expand Up @@ -206,12 +211,12 @@ def update(

# We don't create any special state for the entites in this implementation.
for table in tables_to_keep:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
if not hbase.check_if_table_exist(table_name):
hbase.create_table_with_default_cf(table_name)

for table in tables_to_delete:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
hbase.delete_table(table_name)

def teardown(
Expand All @@ -231,16 +236,43 @@ def teardown(
project = config.project

for table in tables:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
hbase.delete_table(table_name)

def _hbase_row_key(
self,
entity_key: EntityKeyProto,
feature_view_name: str,
config: RepoConfig,
) -> bytes:
"""
Computes the HBase row key for a given entity key and feature view name.
def _table_id(project: str, table: FeatureView) -> str:
"""
Returns table name given the project_name and the feature_view.
Args:
entity_key (EntityKeyProto): The entity key to compute the row key for.
feature_view_name (str): The name of the feature view to compute the row key for.
config (RepoConfig): The configuration for the Feast repository.
Args:
project: Name of the feast project.
table: Feast FeatureView.
"""
return f"{project}_{table.name}"
Returns:
bytes: The HBase row key for the given entity key and feature view name.
"""
entity_id = compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
# Even though `entity_id` uniquely identifies an entity, we use the same table
# for multiple feature_views with the same set of entities.
# To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself.
# This also ensures that features for entities from various feature_views are
# colocated.
return f"{entity_id}#{feature_view_name}".encode()

def _table_id(self, project: str, table: FeatureView) -> str:
"""
Returns table name given the project_name and the feature_view.
Args:
project: Name of the feast project.
table: Feast FeatureView.
"""
return f"{project}:{table.name}"

0 comments on commit 7376db8

Please sign in to comment.