-
Notifications
You must be signed in to change notification settings - Fork 996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Write logged features to an offline store (Python API) #2574
feat: Write logged features to an offline store (Python API) #2574
Conversation
577294f
to
2067329
Compare
Codecov Report
@@ Coverage Diff @@
## master #2574 +/- ##
===========================================
- Coverage 81.49% 59.41% -22.08%
===========================================
Files 159 160 +1
Lines 12976 13198 +222
===========================================
- Hits 10575 7842 -2733
- Misses 2401 5356 +2955
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
2b8ae60
to
9f2e588
Compare
from feast.registry import Registry | ||
|
||
|
||
class LoggingSource: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to check my understanding, users should be able to instantiate LoggingSources that could be wrappers of existing data sources right? e.g. we append to an already existing BigQuery table that already has the equivalent of these features?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly.
- User doesn't instantiate logging source, this is being created mostly internally depending on where logs are coming from.
- Currently the only available logging source is a feature server. Other example could be materialization job or streaming job.
- We do not append to feature sources (like BigQueryDataSource). Instead logs are being written to
LoggingDestination
. This one is defined by user as part of LoggingConfig in feature service (see changes to proto). - LoggingDestination can be converted to DataSource (see LoggingDestination.to_data_source) when logs are loaded from offline store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some doc string.
|
||
table_ref: str | ||
|
||
def __init__(self, table_ref): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on allowing a BigQueryDataSource as an input too? where we then pull from the table specified in those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar for other DWH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I got the idea. See explanation above, this is not really a data source, rather data sink.
032b21e
to
35e2fca
Compare
oneof destination { | ||
FileDestination file_destination = 3; | ||
BigQueryDestination bigquery_destination = 4; | ||
RedshiftDestination redshift_destination = 5; | ||
SnowflakeDestination snowflake_destination = 6; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a goal of making feature logging extensible? How will third-party offline stores enable feature logging?
Suggestion: we could have destination be a map<string, string> (essentially a flag JSON). We're losing strong typing, but other offline stores can also use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's done similarly to data source proto, so the plan is to add CustomDestination with map or bytes later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added custom destination proto message.
sdk/python/feast/feature_logging.py
Outdated
fields["request_id"] = pa.string() | ||
fields["log_timestamp"] = pa.timestamp("us", tz=UTC) | ||
fields["log_date"] = pa.date32() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a list of "system columns" blacklisted somewhere when we define features? How do you avoid column collisions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We use full feature names here, so collisions with feature names is unlikely. However, there might be conflicts with entities or request data. So I will add prefixes to this names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added prefixes to system fields and put them into constants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot find any list of forbidden names for entities (although we already have some system names, like __dummy_entity
), so it's probably make sense to create an issue and address it in subsequent PR.
) | ||
|
||
def to_data_source(self) -> DataSource: | ||
return RedshiftSource(table=self.table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you also need to pass schema
(optional which defaults to public
in RedshiftSource). Confusing name, but schemas as like namespaces / databases. If you don't specify schema, we generally use public.table_name
, but you can override schema and say get my_schema.table_name
which is different from public.table_name
.
The idea is that user may want to use non-public Redshift schema for writing Feast logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was comment from @adchia suggesting that we should rather use schema from store configuration. See #2574 (comment)
7c1e0cb
to
66e231a
Compare
sdk/python/feast/feature_logging.py
Outdated
@abc.abstractmethod | ||
def get_partition_column(self, registry: "Registry") -> str: | ||
""" Return partition column that must exist in generated schema. """ | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little bit strange to me that the LoggingSource knows what the partition column is; shouldn't that be the responsibility of the LoggingDestination since partitioning is so datastore specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it should be part of logging config, since most of offline store do not even support partitioning explicitly.
sdk/python/tests/integration/feature_repos/universal/data_source_creator.py
Outdated
Show resolved
Hide resolved
store.write_logged_features( | ||
source=feature_service, logs=pa.Table.from_pandas(first_batch, schema=schema), | ||
) | ||
|
||
store.write_logged_features( | ||
source=feature_service, logs=pa.Table.from_pandas(second_batch, schema=schema), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see this call happening anywhere else in the code - is my understanding correct that this would need to be added to get_online_features
before we would log anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration wull be added in following PRs. Yes, this function will be called from ge_online_features
.
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
3cd1d54
to
ede26b2
Compare
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: achals, pyalex The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/lgtm |
# [0.21.0](v0.20.0...v0.21.0) (2022-05-13) ### Bug Fixes * Addresses ZeroDivisionError when materializing file source with same timestamps ([#2551](#2551)) ([1e398d9](1e398d9)) * Asynchronously refresh registry for the feast ui command ([#2672](#2672)) ([1b09ca2](1b09ca2)) * Build platform specific python packages with ci-build-wheel ([#2555](#2555)) ([b10a4cf](b10a4cf)) * Delete data sources from registry when using the diffing logic ([#2669](#2669)) ([fc00ca8](fc00ca8)) * Enforce kw args featureservice ([#2575](#2575)) ([160d7b7](160d7b7)) * Enforce kw args in datasources ([#2567](#2567)) ([0b7ec53](0b7ec53)) * Feature logging to Redshift is broken ([#2655](#2655)) ([479cd51](479cd51)) * Feature service to templates ([#2649](#2649)) ([1e02066](1e02066)) * Feature with timestamp type is incorrectly interpreted by Go FS ([#2588](#2588)) ([e3d9588](e3d9588)) * Fix `__hash__` methods ([#2556](#2556)) ([ebb7dfe](ebb7dfe)) * Fix AWS bootstrap template ([#2604](#2604)) ([c94a69c](c94a69c)) * Fix broken proto conversion methods for data sources ([#2603](#2603)) ([00ed65a](00ed65a)) * Fix case where on demand feature view tab is broken if no custom tabs are passed. ([#2682](#2682)) ([01d3568](01d3568)) * Fix DynamoDB fetches when there are entities that are not found ([#2573](#2573)) ([7076fe0](7076fe0)) * Fix Feast UI parser to work with new APIs ([#2668](#2668)) ([8d76751](8d76751)) * Fix java server after odfv update ([#2602](#2602)) ([0ca6297](0ca6297)) * Fix materialization with ttl=0 bug ([#2666](#2666)) ([ab78702](ab78702)) * Fix push sources and add docs / tests pushing via the python feature server ([#2561](#2561)) ([e8e418e](e8e418e)) * Fixed data mapping errors for Snowflake ([#2558](#2558)) ([53c2ce2](53c2ce2)) * Forcing ODFV udfs to be __main__ module and fixing false positive duplicate data source warning ([#2677](#2677)) ([2ce33cd](2ce33cd)) * Include the ui/build directory, and remove package data ([#2681](#2681)) ([0384f5f](0384f5f)) * Infer features for feature services when they depend on feature views without schemas ([#2653](#2653)) ([87c194c](87c194c)) * Pin dependencies to nearest major version ([#2647](#2647)) ([bb72b7c](bb72b7c)) * Pin pip<22.1 to get around breaking change in pip==22.1 ([#2678](#2678)) ([d3e01bc](d3e01bc)) * Punt deprecation warnings and clean up some warnings. ([#2670](#2670)) ([f775d2e](f775d2e)) * Reject undefined features when using `get_historical_features` or `get_online_features` ([#2665](#2665)) ([36849fb](36849fb)) * Remove ci extra from the feature transformation server dockerfile ([#2618](#2618)) ([25613b4](25613b4)) * Remove incorrect call to logging.basicConfig ([#2676](#2676)) ([8cbf51c](8cbf51c)) * Small typo in CLI ([#2578](#2578)) ([f372981](f372981)) * Switch from `join_key` to `join_keys` in tests and docs ([#2580](#2580)) ([d66c931](d66c931)) * Teardown trino container correctly after tests ([#2562](#2562)) ([72f1558](72f1558)) * Update build_go_protos to use a consistent python path ([#2550](#2550)) ([f136f8c](f136f8c)) * Update data source timestamp inference error message to make sense ([#2636](#2636)) ([3eaf6b7](3eaf6b7)) * Update field api to add tag parameter corresponding to labels in Feature. ([#2610](#2610)) ([689d20b](689d20b)) * Update java integration tests and add more logging ([#2637](#2637)) ([10e23b4](10e23b4)) * Update on demand feature view api ([#2587](#2587)) ([38cd7f9](38cd7f9)) * Update RedisCluster to use redis-py official implementation ([#2554](#2554)) ([ce5606f](ce5606f)) * Use cwd when getting module path ([#2577](#2577)) ([b550e59](b550e59)) * Use ParquetDataset for Schema Inference ([#2686](#2686)) ([4f85e3e](4f85e3e)) * Use timestamp type when converting unixtimestamp feature type to arrow ([#2593](#2593)) ([c439611](c439611)) ### Features * Add hbase online store support in feast ([#2590](#2590)) ([c9eda79](c9eda79)) * Adding SSL options for Postgres ([#2644](#2644)) ([0e809c2](0e809c2)) * Allow Feast UI to be spun up with CLI command: feast ui ([#2667](#2667)) ([44ca9f5](44ca9f5)) * Allow to pass secrets and environment variables to transformation service ([#2632](#2632)) ([ffa33ad](ffa33ad)) * CLI command 'feast serve' should start go-based server if flag is enabled ([#2617](#2617)) ([f3ff812](f3ff812)) * Create stream and batch feature view abstractions ([#2559](#2559)) ([d1f76e5](d1f76e5)) * Postgres supported as Registry, Online store, and Offline store ([#2401](#2401)) ([ed2f979](ed2f979)) * Support entity fields in feature view `schema` parameter by dropping them ([#2568](#2568)) ([c8fcc35](c8fcc35)) * Write logged features to an offline store (Python API) ([#2574](#2574)) ([134dc5f](134dc5f)) * Write logged features to Offline Store (Go - Python integration) ([#2621](#2621)) ([ccad832](ccad832)) ### Reverts * Revert "chore: Deprecate value type (#2611)" (#2643) ([4fbdfb1](4fbdfb1)), closes [#2611](#2611) [#2643](#2643)
Signed-off-by: pyalex moskalenko.alexey@gmail.com
What this PR does / why we need it:
This PR is part of Data Quality Monitoring. In order to validate served features we need first to log them and store in an offline store. This PR introduces "write logs" API in offline store and provider interfaces. As well as adds implementations for core offline stores: File, BigQuery, Redhift, Snowflake.
Which issue(s) this PR fixes:
Fixes #