Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Resolve hbase hotspot issue when materializing #3790

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -104,14 +104,15 @@ def online_write_batch(

hbase = HbaseUtils(self._get_conn(config))
project = config.project
table_name = _table_id(project, table)
table_name = self._table_id(project, table)

b = hbase.batch(table_name)
for entity_key, values, timestamp, created_ts in data:
row_key = serialize_entity_key(
row_key = self._hbase_row_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
feature_view_name=table.name,
config=config,
)
values_dict = {}
for feature_name, val in values.items():
values_dict[
Expand All @@ -133,6 +134,9 @@ def online_write_batch(
b.put(row_key, values_dict)
b.send()

if progress:
progress(len(data))

@log_exceptions_and_usage(online_store="hbase")
def online_read(
self,
Expand All @@ -152,15 +156,16 @@ def online_read(
"""
hbase = HbaseUtils(self._get_conn(config))
project = config.project
table_name = _table_id(project, table)
table_name = self._table_id(project, table)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

row_keys = [
serialize_entity_key(
self._hbase_row_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
feature_view_name=table.name,
config=config,
)
for entity_key in entity_keys
]
rows = hbase.rows(table_name, row_keys=row_keys)
Expand Down Expand Up @@ -206,12 +211,12 @@ def update(

# We don't create any special state for the entites in this implementation.
for table in tables_to_keep:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
if not hbase.check_if_table_exist(table_name):
hbase.create_table_with_default_cf(table_name)

for table in tables_to_delete:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
hbase.delete_table(table_name)

def teardown(
Expand All @@ -231,16 +236,43 @@ def teardown(
project = config.project

for table in tables:
table_name = _table_id(project, table)
table_name = self._table_id(project, table)
hbase.delete_table(table_name)

def _hbase_row_key(
self,
entity_key: EntityKeyProto,
feature_view_name: str,
config: RepoConfig,
) -> bytes:
"""
Computes the HBase row key for a given entity key and feature view name.
def _table_id(project: str, table: FeatureView) -> str:
"""
Returns table name given the project_name and the feature_view.
Args:
entity_key (EntityKeyProto): The entity key to compute the row key for.
feature_view_name (str): The name of the feature view to compute the row key for.
config (RepoConfig): The configuration for the Feast repository.
Args:
project: Name of the feast project.
table: Feast FeatureView.
"""
return f"{project}_{table.name}"
Returns:
bytes: The HBase row key for the given entity key and feature view name.
"""
entity_id = compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
# Even though `entity_id` uniquely identifies an entity, we use the same table
# for multiple feature_views with the same set of entities.
# To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself.
# This also ensures that features for entities from various feature_views are
# colocated.
return f"{entity_id}#{feature_view_name}".encode()

def _table_id(self, project: str, table: FeatureView) -> str:
"""
Returns table name given the project_name and the feature_view.
Args:
project: Name of the feast project.
table: Feast FeatureView.
"""
return f"{project}:{table.name}"
Loading