Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Feast/IKV upgrade client version #4200

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def online_write_batch(
progress: Function to be called once a batch of rows is written to the online store, used
to show progress.
"""
# update should have been called before
if self._writer is None:
return
self._init_writer(config=config)
assert self._writer is not None

for entity_key, features, event_timestamp, _ in data:
entity_id: str = compute_entity_id(
Expand Down Expand Up @@ -120,6 +119,8 @@ def online_read(
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
self._init_reader(config=config)

if not len(entity_keys):
return []

Expand Down Expand Up @@ -174,7 +175,6 @@ def _decode_fields_for_primary_key(

return dt, features

# called before any read/write requests are issued
@log_exceptions_and_usage(online_store="ikv")
def update(
self,
Expand All @@ -199,7 +199,7 @@ def update(
partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so
infrastructure corresponding to other feature views should be not be touched.
"""
self._init_clients(config=config)
self._init_writer(config=config)
assert self._writer is not None

# note: we assume tables_to_keep does not overlap with tables_to_delete
Expand All @@ -223,7 +223,7 @@ def teardown(
tables: Feature views whose corresponding infrastructure should be deleted.
entities: Entities whose corresponding infrastructure should be deleted.
"""
self._init_clients(config=config)
self._init_writer(config=config)
assert self._writer is not None

# drop fields corresponding to this feature-view
Expand Down Expand Up @@ -269,20 +269,28 @@ def _create_document(

return builder.build()

def _init_clients(self, config: RepoConfig):
"""Initializes (if required) reader/writer ikv clients."""
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

def _init_writer(self, config: RepoConfig):
"""Initializes ikv writer client."""
# initialize writer
if self._writer is None:
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

self._writer = create_new_writer(client_options)
self._writer.startup() # blocking operation

# initialize reader, iff mount_dir is specified
def _init_reader(self, config: RepoConfig):
"""Initializes ikv reader client."""
# initialize reader
if self._reader is None:
online_config = config.online_store
assert isinstance(online_config, IKVOnlineStoreConfig)
client_options = IKVOnlineStore._config_to_client_options(online_config)

if online_config.mount_directory and len(online_config.mount_directory) > 0:
self._reader = create_new_reader(client_options)
self._reader.startup() # blocking operation

@staticmethod
def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
]

IKV_REQUIRED = [
"ikvpy>=0.0.23",
"ikvpy>=0.0.36",
]

HAZELCAST_REQUIRED = [
Expand Down
Loading