diff --git a/serving/src/main/java/feast/serving/service/HistoricalServingService.java b/serving/src/main/java/feast/serving/service/HistoricalServingService.java index 7d389a9e12..3c42c96d24 100644 --- a/serving/src/main/java/feast/serving/service/HistoricalServingService.java +++ b/serving/src/main/java/feast/serving/service/HistoricalServingService.java @@ -23,7 +23,6 @@ import feast.storage.api.retrieval.FeatureSetRequest; import feast.storage.api.retrieval.HistoricalRetrievalResult; import feast.storage.api.retrieval.HistoricalRetriever; -import feast.storage.connectors.bigquery.retrieval.BigQueryHistoricalRetriever; import io.grpc.Status; import java.util.List; import java.util.Optional; @@ -50,18 +49,10 @@ public HistoricalServingService( @Override public GetFeastServingInfoResponse getFeastServingInfo( GetFeastServingInfoRequest getFeastServingInfoRequest) { - try { - BigQueryHistoricalRetriever bigQueryHistoricalRetriever = - (BigQueryHistoricalRetriever) retriever; - return GetFeastServingInfoResponse.newBuilder() - .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) - .setJobStagingLocation(bigQueryHistoricalRetriever.jobStagingLocation()) - .build(); - } catch (Exception e) { - return GetFeastServingInfoResponse.newBuilder() - .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) - .build(); - } + return GetFeastServingInfoResponse.newBuilder() + .setType(FeastServingType.FEAST_SERVING_TYPE_BATCH) + .setJobStagingLocation(retriever.getStagingLocation()) + .build(); } /** {@inheritDoc} */ diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java index 4e0baed17b..c6f2178a7f 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingService.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java @@ -74,21 +74,32 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ Map> featureValuesMap = entityRows.stream() .collect(Collectors.toMap(row -> row, row -> Maps.newHashMap(row.getFieldsMap()))); - + // Get all feature rows from the retriever. Each feature row list corresponds to a single + // feature set request. List> featureRows = retriever.getOnlineFeatures(entityRows, featureSetRequests); + // For each feature set request, read the feature rows returned by the retriever, and + // populate the featureValuesMap with the feature values corresponding to that entity row. for (var fsIdx = 0; fsIdx < featureRows.size(); fsIdx++) { List featureRowsForFs = featureRows.get(fsIdx); FeatureSetRequest featureSetRequest = featureSetRequests.get(fsIdx); + + // In order to return values containing the same feature references provided by the user, + // we reuse the feature references in the request as the keys in the featureValuesMap Map featureNames = featureSetRequest.getFeatureReferences().stream() .collect( Collectors.toMap( FeatureReference::getName, featureReference -> featureReference)); + + // Each feature row returned (per feature set request) corresponds to a given entity row. + // For each feature row, update the featureValuesMap. for (var entityRowIdx = 0; entityRowIdx < entityRows.size(); entityRowIdx++) { FeatureRow featureRow = featureRowsForFs.get(entityRowIdx); EntityRow entityRow = entityRows.get(entityRowIdx); + + // If the row is stale, put an empty value into the featureValuesMap. if (isStale(featureSetRequest, entityRow, featureRow)) { featureSetRequest .getFeatureReferences() @@ -117,6 +128,8 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ String.format("%s:%d", ref.getName(), ref.getVersion())) .inc()); + // Else populate the featureValueMap at this entityRow with the values in the feature + // row. featureRow.getFieldsList().stream() .filter(field -> featureNames.containsKey(field.getName())) .forEach( diff --git a/serving/src/main/java/feast/serving/service/ServingService.java b/serving/src/main/java/feast/serving/service/ServingService.java index 83adcb73ba..5e662229ee 100644 --- a/serving/src/main/java/feast/serving/service/ServingService.java +++ b/serving/src/main/java/feast/serving/service/ServingService.java @@ -26,12 +26,75 @@ import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; public interface ServingService { + /** + * Get information about the Feast serving deployment. + * + *

For Bigquery deployments, this includes the default job staging location to load + * intermediate files to. Otherwise, this method only returns the current Feast Serving backing + * store type. + * + * @param getFeastServingInfoRequest {@link GetFeastServingInfoRequest} + * @return {@link GetFeastServingInfoResponse} + */ GetFeastServingInfoResponse getFeastServingInfo( GetFeastServingInfoRequest getFeastServingInfoRequest); + /** + * Get features from an online serving store, given a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved values + * to. + * + *

Features can be queried across feature sets, but each {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} must contain all entities for + * all feature sets included in the request. + * + *

This request is fulfilled synchronously. + * + * @param getFeaturesRequest {@link GetOnlineFeaturesRequest} containing list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve and list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved + * values to. + * @return {@link GetOnlineFeaturesResponse} with list of {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues} for each {@link + * feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} supplied. + */ GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest getFeaturesRequest); + /** + * Get features from a batch serving store, given a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link + * feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join + * retrieved features to. All columns in the provided dataset will be preserved in the output + * dataset. + * + *

Due to the potential size of batch retrieval requests, this request is fulfilled + * asynchronously, and returns a retrieval job id, which when supplied to {@link + * #getJob(GetJobRequest)} will return the status of the retrieval job. + * + * @param getFeaturesRequest {@link GetBatchFeaturesRequest} containing a list of {@link + * feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link + * feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join + * retrieved features to. + * @return {@link GetBatchFeaturesResponse} containing reference to a retrieval {@link + * feast.serving.ServingAPIProto.Job}. + */ GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest); + /** + * Get the status of a retrieval job from a batch serving store. + * + *

The client should check the status of the returned job periodically by calling ReloadJob to + * determine if the job has completed successfully or with an error. If the job completes + * successfully i.e. status = JOB_STATUS_DONE with no error, then the client can check the + * file_uris for the location to download feature values data. The client is assumed to have + * access to these file URIs. + * + *

If an error occurred during retrieval, the {@link GetJobResponse} will also contain the + * error that resulted in termination. + * + * @param getJobRequest {@link GetJobRequest} containing reference to a retrieval job + * @return {@link GetJobResponse} + */ GetJobResponse getJob(GetJobRequest getJobRequest); } diff --git a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java index 3533ed140f..760eeedc7e 100644 --- a/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java +++ b/storage/api/src/main/java/feast/storage/api/retrieval/HistoricalRetriever.java @@ -25,6 +25,14 @@ */ public interface HistoricalRetriever { + /** + * Get temporary staging location if applicable. If not applicable to this store, returns an empty + * string. + * + * @return staging location uri + */ + String getStagingLocation(); + /** * Get all features corresponding to the provided batch features request. * diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java index 6364860433..17efb07011 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryHistoricalRetriever.java @@ -85,6 +85,11 @@ public abstract static class Builder { public abstract BigQueryHistoricalRetriever build(); } + @Override + public String getStagingLocation() { + return jobStagingLocation(); + } + @Override public HistoricalRetrievalResult getHistoricalFeatures( String retrievalId, DatasetSource datasetSource, List featureSetRequests) {