Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize _populate_result_rows_from_feature_view #2223

Merged
merged 10 commits into from
Jan 26, 2022

Conversation

judahrand
Copy link
Member

@judahrand judahrand commented Jan 18, 2022

Signed-off-by: Judah Rand 17158624+judahrand@users.noreply.github.com

What this PR does / why we need it:
This commit optimizes the fetching of features by only fetching
the features for each unique Entity once and then expands the result
to the shape of input EntityKeys.

Previously, if an Entity occurred twice the features would be fetched
from the OnlineStore twice. This can be hugely inefficient.

The only assumption that this makes is that the OnlineStore will return
the feature data in the same order as the EntityKeyProtos are provided.

Which issue(s) this PR fixes:

Fixes #

Does this PR introduce a user-facing change?:

Speed up `get_online_features` when duplicate Entities are present.

@codecov-commenter
Copy link

codecov-commenter commented Jan 18, 2022

Codecov Report

Merging #2223 (c49bcd4) into master (428c145) will increase coverage by 0.02%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2223      +/-   ##
==========================================
+ Coverage   84.94%   84.96%   +0.02%     
==========================================
  Files         105      105              
  Lines        8496     8515      +19     
==========================================
+ Hits         7217     7235      +18     
- Misses       1279     1280       +1     
Flag Coverage Δ
integrationtests 73.14% <97.05%> (-0.26%) ⬇️
unittests 59.80% <77.94%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdk/python/feast/feature_store.py 90.05% <100.00%> (+0.20%) ⬆️
sdk/python/feast/infra/online_stores/redis.py 94.07% <100.00%> (ø)
sdk/python/feast/repo_operations.py 48.85% <100.00%> (+0.48%) ⬆️
sdk/python/feast/type_map.py 66.14% <100.00%> (-0.27%) ⬇️
sdk/python/feast/value_type.py 100.00% <100.00%> (ø)
sdk/python/tests/data/data_creator.py 100.00% <100.00%> (ø)
...ts/integration/feature_repos/repo_configuration.py 94.23% <100.00%> (ø)
...n/feature_repos/universal/data_sources/bigquery.py 97.61% <100.00%> (ø)
...n/feature_repos/universal/data_sources/redshift.py 100.00% <100.00%> (ø)
...fline_store/test_universal_historical_retrieval.py 100.00% <100.00%> (ø)
... and 6 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 428c145...c49bcd4. Read the comment docs.

@adchia
Copy link
Collaborator

adchia commented Jan 19, 2022

Whats the usecase for having duplicate entities? Why not e.g. dedup before sending?

@pyalex
Copy link
Collaborator

pyalex commented Jan 19, 2022

Seems like we adding a lot of complexity and really unnecessary optimization for the use case that should be solved on client side.

@judahrand
Copy link
Member Author

judahrand commented Jan 19, 2022

Whats the usecase for having duplicate entities? Why not e.g. dedup before sending?

These would be example entity rows: [{"user_id" 1, "item_id: 214}, {"user_id" 1, "item_id: 2354}, {"user_id" 1, "item_id: 4736}, ]

The reason for not deduping before the request is that I'm requesting features that have user_id as an entity, features that have item_id as an entity, and also features that have user_id, item_id as an entity. For example feature refs that look like: user:age, item:age, user_x_item:times_viewed

In this case the features for the user_id are processed for every single row even though the user:age will be the same for every row. You also can't dedupe on the client side because user_x_item:times_viewed requires both join keys. This causes real issues in this case as the number of rows grows. You end up fetching large amounts of redundant data from the OnlineStore and processing it multiple times.

This change means that only the unique entities for each FeatureView are actually retrieved. For very large numbers of rows (~9k in my test case) this can reduce the time spent interfacing with RedisOnlineStore by almost half from about 600ms down to 300ms because for one of the two FeatureViews I'm retrieving features from only a single Entity is needed rather than 9k.

Does that make sense?

@judahrand
Copy link
Member Author

judahrand commented Jan 19, 2022

Seems like we adding a lot of complexity and really unnecessary optimization for the use case that should be solved on client side.

Absolutely disagree. See example in reply above. There are cases where doing that would require multiple calls to Feast which would somewhat defeat the purpose.

To me this is an obvious optimization and the current implementation is actually rather naive.

I think the issue here is probably one of terminology rather than an actual disagreement, however? The terminology perhaps being the distinction between an entity_row, an Entity, and an EntityKey.

I suppose the case I'm handling here is actually duplicate EntityKeys rather than duplicate Entity. Where an EntityKey is specific to a given FeatureView.

There is still an additional optimization that could be done here. Imagine I'm getting features from 2 FeatureViews which have the same EntityKey. At the moment the EntityKeyProtos would be created twice.

@pyalex
Copy link
Collaborator

pyalex commented Jan 20, 2022

There are cases where doing that would require multiple calls to Feast which would somewhat defeat the purpose.

But is it so bad thing (I mean multiple calls)? It will be converted to multiple calls to the backend in any case.

Also, the user knows much better their data model and all additional logic proposed in this PR can be expressed on the user side simpler:

user_features = fs.get_online_features(user_features,
                                       entity_rows=[{"user_id": user_id} for user_id in set(entity_df.user_ids)])
item_features = fs.get_online_features(item_features,
                                       entity_rows=[{"item_id": item_id} for user_id in set(entity_df.item_ids)]) 

entity_df.join(user_feature.to_df(), ...).join(item_features.to_df(), ...)

I'm trying to say that we cannot push all optimizations for all possible use cases to the Feast side. The code in FeatureStore.get_online_features is already quite complicated, pretty hard to read, and just partially tested (and with each optimization test coverage only decreases).

IMO, we should move towards simplifying logic on FeatureStore level and split it between online store side (actual implementation) and just leave the rest to the user to optimize (maybe we can gather some cookbook with examples like I showed above).

Complain MODE ON

Regarding already existing complexity, let me give a few examples:

  1. Redis implementation currently supports retrieving all features by entity, but on FeatureStore level, we force it to retrieve each feature view separately. We put a lot of effort and code to split those feature references by view and copy entities. FeatureStore class has too much responsibility, instead of just leaving optimizations to the specific implementation.

  2. another example:

There is still an additional optimization that could be done here. Imagine I'm getting features from 2 FeatureViews which have the same EntityKey. At the moment the EntityKeyProtos would be created twice.

EntityKeyProto is redundant and not even used as a key anymore, because proto serialization doesn't guarantee stability. And each implementation converts this proto into its own key. So we do a lot of not very useful job by massaging those input from user, packing it into different protos and then throw it away mostly.

Complain MODE OFF

Thanks for listening :)

@judahrand
Copy link
Member Author

judahrand commented Jan 20, 2022

There are cases where doing that would require multiple calls to Feast which would somewhat defeat the purpose.

But is it so bad thing (I mean multiple calls)? It will be converted to multiple calls to the backend in any case.

I'd say yes. Feast is supposed to make the serving logic simpler not require the user to know what Feast is going to do internally and optimize around that. It is in no way intuitive that if I provide [{"user_id" 1, "item_id: 214}, {"user_id" 1, "item_id: 2354}, {"user_id" 1, "item_id: 4736}] as entity_rows that the features for the user will be retrieved and processed 3 times.

Also, the user knows much better their data model

The data model is Feast's data model - Feast chooses how to store the data in the OnlineStore. Feast should be able to query its own data model efficiently, surely? The whole point of Feast / a FeatureStore is for the user to be able to query any features in the store for any set of given entities. Feast should be able to do this efficiently. The user should not have to understand the internal workings of the SDK! It is not intuitive that Feast will behave naively to duplicates. And in fact from the user's perspective they are not duplicate - a user_id, item_id set is unique!

and all additional logic proposed in this PR can be expressed on the user side simpler:

user_features = fs.get_online_features(user_features,
                                       entity_rows=[{"user_id": user_id} for user_id in set(entity_df.user_ids)])
item_features = fs.get_online_features(item_features,
                                       entity_rows=[{"item_id": item_id} for user_id in set(entity_df.item_ids)]) 

entity_df.join(user_feature.to_df(), ...).join(item_features.to_df(), ...)

You've neglected the user_x_item features in this example so it would actually become:

user_features = fs.get_online_features(user_features,
                                       entity_rows=[{"user_id": user_id} for user_id in set(entity_df.user_ids)])
item_features = fs.get_online_features(item_features,
                                       entity_rows=[{"item_id": item_id} for user_id in set(entity_df.item_ids)])
user_x_item_features = fs.get_online_features(user_x_item_features,
                                       entity_rows=df[['user_id', 'item_id']].drop_duplicates().to_dict(orient='records')

entity_df.join(user_feature.to_df(), ...).join(item_features.to_df(), ...).join(user_x_item_features, ...)

Furthermore, without Pandas this becomes much more cumbersome and it is (in my experience) a bad idea to put Pandas in a latency sensitive user request path.

I'm trying to say that we cannot push all optimizations for all possible use cases to the Feast side. The code in FeatureStore.get_online_features is already quite complicated, pretty hard to read, and just partially tested (and with each optimization test coverage only decreases).

I don't think this is 'an edge case'. This would be incredibly common for any recommender system.

IMO, we should move towards simplifying logic on FeatureStore level and split it between online store side (actual implementation) and just leave the rest to the user to optimize (maybe we can gather some cookbook with examples like I showed above).

If you're going to go this far then why not limit the user to only querying one FeatureView at a time? Or better yet they can just directly query the OnlineStore? The user can use and understands Redis or Datastore, right?

Complain MODE ON

Regarding already existing complexity, let me give a few examples:

  1. Redis implementation currently supports retrieving all features by entity, but on FeatureStore level, we force it to retrieve each feature view separately. We put a lot of effort and code to split those feature references by view and copy entities. FeatureStore class has too much responsibility, instead of just leaving optimizations to the specific implementation.

There are pros and cons here aren't there? It was my impression that one of the core features of Feast is the ease of implementing new Online and Offline Stores? If this is considered a core feature then the logic of these should be simple - so as to encourage the community to develop them? Therefore, I believe that they should not be expected to deduplicate. Whether or not they fetch by FeatureView or Entity is a more interesting question but isn't where we are right now.

I do, however, agree that the limitation of one call per FeatureView is problematic. But I don't think this is an argument against not asking for the same data multiple times when we don't have to.

  1. another example:

There is still an additional optimization that could be done here. Imagine I'm getting features from 2 FeatureViews which have the same EntityKey. At the moment the EntityKeyProtos would be created twice.

EntityKeyProto is redundant and not even used as a key anymore, because proto serialization doesn't guarantee stability. And each implementation converts this proto into its own key. So we do a lot of not very useful job by massaging those input from user, packing it into different protos and then throw it away mostly.

Okay, then that optimization doesn't need to be done. It can just naturally fall out when/if EntityKeyProto is removed.

Complain MODE OFF

Thanks for listening :)

I honestly don't think that the logic implemented in this PR is actually very complex:

  1. Work out at which indexes each Entity occurs
  2. Get each Entity once (rather than as many times as they occur)
  3. Update the result with the correct data at each index

As long as the OnlineStore always returns the Feature data in the same order it is requested then this is really a very simple algorithm. Do the work once and copy it.

It's 9 lines of additional code.

@pyalex
Copy link
Collaborator

pyalex commented Jan 21, 2022

@judahrand, I see your point that Feast should make lives easier, and ideally, users should not think about internal implementation details. Totally agree. But I would argue that it's two different things: (a) provide comprehensive functionality, so

the user to be able to query any features in the store for any set of given entities

in a reasonable time (and that time should be expected to grow with the growing complexity of the request) and (b) do some "magic" to run retrieval efficiently for 9K rows with features attached to different entities (with duplicated values in some keys) in the same request. And in the latter case, I'd say - yes it's fine to know some details of implementation and do this optimization in user code (as I proposed above).
And I still would consider this as an edge case since most of our users even with RecSys (and I worked with recommendation systems in Gojek a lot as well) don't retrieve more than a few hundred rows. Alternatively, rows could be clustered and features can be stored as arrays in clusters, hence much fewer rows to retrieve (this is what we did in Gojek).

That being said, I could be wrong and this optimization is indeed critical.
But my concern about code complexity still remains. The problem is that the proposed implementation is mixed in with other optimization and main retrieval (business) logic. Since there are no tests or documentation the code becomes even harder to read & maintain. Moreover, after one refactoring this optimization could be gone simply because there is no guarding test that would prevent it.
So my proposal is to rewrite it in a modular way, not coupling it with an already existing flow. If it is possible to implement on the user side, it should be possible to put the whole optimization logic into a single method and write a unit test for it.
Just an illustration for this idea (not necessarily best code):

def get_online_features(...):
     entities_groups = group_features_by_entities(features_ref)
     if len(entities_groups) > 1:
         return _get_online_features_with_split(entities_groups, entity_rows)
     
     ... usual flow _get_online_features() ...

def _get_online_features_with_split(entities_groups, entity_rows):
     all_responses = []
     for entities, features_refs in entities_groups:
         entity_rows_projected = project(entity_rows, entities)
         entity_rows_projected = set(entity_rows_projected)  # deduplication
         
         r = _get_online_features(features_refs, entity_rows_projected)   # use standard implementation
         all_responses.append(r)

    ... merge responses (without pandas) ...

_get_online_features_with_split in this example could be tested with mocks. Maybe, there's an even better way. And _get_online_features not changed.

@judahrand
Copy link
Member Author

judahrand commented Jan 21, 2022

do some "magic" to run retrieval efficiently for 9K rows with features attached to different entities (with duplicated values in some keys) in the same request.

It really isn't magic. It's a really simple algorithm. It also doesn't only apply to the case of 9K rows, it is also more efficient with 2 rows.

Moreover, after one refactoring this optimization could be gone simply because there is no guarding test that would prevent it.

This is a fair comment, perhaps I need to add some tests which test for the goal of this optimization. This should be completely doable using unittest.mock.patch.

def get_online_features(...):
     entities_groups = group_features_by_entities(features_ref)
     if len(entities_groups) > 1:
         return _get_online_features_with_split(entities_groups, entity_rows)
     
     ... usual flow _get_online_features() ...

def _get_online_features_with_split(entities_groups, entity_rows):
     all_responses = []
     for entities, features_refs in entities_groups:
         entity_rows_projected = project(entity_rows, entities)
         entity_rows_projected = set(entity_rows_projected)  # deduplication
         
         r = _get_online_features(features_refs, entity_rows_projected)   # use standard implementation
         all_responses.append(r)

    ... merge responses (without pandas) ...

_get_online_features_with_split in this example could be tested with mocks. Maybe, there's an even better way. And _get_online_features not changed.

Unfortunately, this approach cannot work with the way that _get_online_features works. The problem is not duplicate rows provided to get_online_features. The problem is duplicate subsets of the columns in the rows which correspond to the join keys of a subset of the Feature Views required.

I once again draw your attention to [{"user_id" 1, "item_id: 214}, {"user_id" 1, "item_id: 2354}, {"user_id" 1, "item_id: 4736}]. There are no duplicate rows. However, when features from a Feature View which has only User as an Entity are requested there are duplicate users. Your code snippet has no way of determining this at all.

This information can currently only be gleaned inside _get_online_features. Furthermore, you still get up producing multiple OnlineResponse objects and call to_dict multiple times. This has a number of issues: a) it cannot be used with the FeatureServer b) it will be slow.

I'm all for making this more modular, tested, etc. I think the modularity will be an issue given how the rest of get_online_features is implemented. However, I think that adding a regression test is perfectly doable and reasonable. It should deal with your concern that this optimization could be refactored out.

Another option would be to completely overhaul how get_online_features works to identify all subsets of entities for all requested Feature Views earlier. This could then decouple the retrieval from the OnlineStore from the population of results rows and may make the logic less coupled and more clear.

rows could be clustered and features can be stored as arrays in clusters, hence much fewer rows to retrieve

I'm not sure I entirely understand what you're getting at here, do you mind if I reach out on Slack to learn more as I'm not sure this is directly relevant to this PR?

@pyalex
Copy link
Collaborator

pyalex commented Jan 21, 2022

I once again draw your attention to [{"user_id" 1, "item_id: 214}, {"user_id" 1, "item_id: 2354}, {"user_id" 1, "item_id: 4736}]. There are no duplicate rows. However, when features from a Feature View which has only User as an Entity are requested there are duplicate users. Your code snippet has no way of determining this at all.

I got this. That's why there's projection before deduplication. Please pay attention to these lines:

for entities, features_refs in entities_groups:
   entity_rows_projected = project(entity_rows, entities)  # here we leave only {"user_id": 1} 
                                                           # because on this point we should know 
                                                           # that user features needs only this entity column
   entity_rows_projected = set(entity_rows_projected)  # deduplication

Yes, we would need to group feature_refs, so we will need to have another util function:

def group_features_by_entities(features_ref) -> List[Tuple[<list of join keys>, <list of related feature refs>]

but that would be trivial to implement (plus we already inside FeatureStore and have access to the registry)

Another option would be to completely overhaul how get_online_features works to identify all subsets of entities for all requested Feature Views earlier. This could then decouple the retrieval from the OnlineStore from the population of results rows and may make the logic less coupled and more clear.

Yeah, that seems similar to what I proposed.

This information can currently only be gleaned inside _get_online_features. Furthermore, you still get up producing multiple OnlineResponse objects and call to_dict multiple times. This has a number of issues: a) it cannot be used with the FeatureServer b) it will be slow.

Yeah, this is a reasonable concern. And it's not alone. The whole _get_online_features is a mess. And it definitely should not return OnlineResponse. But we may leave it out of this PR. And fix this inefficiency later. I'd put modularity first and then refactoring would be much easier.

I'm not sure I entirely understand what you're getting at here, do you mind if I reach out on Slack to learn more as I'm not sure this is directly relevant to this PR?

Sure, this is not relevant to this PR.

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
@pyalex pyalex assigned pyalex and unassigned woop Jan 24, 2022
@pyalex
Copy link
Collaborator

pyalex commented Jan 25, 2022

This is great, @judahrand . Just one request: can you please add a unit test for _get_unique_entities method? Its signature is quite complex, test would help to understand what exactly it does better.

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
@judahrand
Copy link
Member Author

judahrand commented Jan 25, 2022

@pyalex I've added a simple unittest and just as well too! I hadn't realized that itertools.groupby expects your Iterator to be pre-sorted. It wasn't breaking anything as such... it was just only grouping contiguous entities 🤦 I'd been playing with cases that were too simple! Easily fixed, however.

Thanks for all the feedback and working towards a maintainable solution with me.

@pyalex
Copy link
Collaborator

pyalex commented Jan 26, 2022

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

Approval requirements bypassed by manually added approval.

This pull-request has been approved by: judahrand, pyalex

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit 396f729 into feast-dev:master Jan 26, 2022
@@ -245,9 +245,9 @@ def get_local_server_port(self) -> int:

def table_name_from_data_source(ds: DataSource) -> Optional[str]:
if hasattr(ds, "table_ref"):
return ds.table_ref
return ds.table_ref # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, but why are we adding an ignore statement here? Was this an omission? We've not had to use ignores elsewhere in the code base, so this stood out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was because I believe neither table not table_ref are actually attributes of DataSource - only its subclasses. The other option would have been a complex cast, I believe. This might be preferable but especially as it was in a test util I decided not to do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There area handful of other # type: ignore comments around the codebase and I've also discovered that the typing around Protobufs is tricky and often seemingly wrong or unhelpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants