Skip to content

Commit

Permalink
Add better code docs to storage refactor (#601)
Browse files Browse the repository at this point in the history
* Add better code documentation, make GetFeastServingInfo independent of retriever

* Make getStagingLocation method of historical retriever

* Apply spotless
  • Loading branch information
Chen Zhiling committed Apr 7, 2020
1 parent 4874725 commit 986e337
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import feast.storage.api.retrieval.FeatureSetRequest;
import feast.storage.api.retrieval.HistoricalRetrievalResult;
import feast.storage.api.retrieval.HistoricalRetriever;
import feast.storage.connectors.bigquery.retrieval.BigQueryHistoricalRetriever;
import io.grpc.Status;
import java.util.List;
import java.util.Optional;
Expand All @@ -50,18 +49,10 @@ public HistoricalServingService(
@Override
public GetFeastServingInfoResponse getFeastServingInfo(
GetFeastServingInfoRequest getFeastServingInfoRequest) {
try {
BigQueryHistoricalRetriever bigQueryHistoricalRetriever =
(BigQueryHistoricalRetriever) retriever;
return GetFeastServingInfoResponse.newBuilder()
.setType(FeastServingType.FEAST_SERVING_TYPE_BATCH)
.setJobStagingLocation(bigQueryHistoricalRetriever.jobStagingLocation())
.build();
} catch (Exception e) {
return GetFeastServingInfoResponse.newBuilder()
.setType(FeastServingType.FEAST_SERVING_TYPE_BATCH)
.build();
}
return GetFeastServingInfoResponse.newBuilder()
.setType(FeastServingType.FEAST_SERVING_TYPE_BATCH)
.setJobStagingLocation(retriever.getStagingLocation())
.build();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,32 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ
Map<EntityRow, Map<String, Value>> featureValuesMap =
entityRows.stream()
.collect(Collectors.toMap(row -> row, row -> Maps.newHashMap(row.getFieldsMap())));

// Get all feature rows from the retriever. Each feature row list corresponds to a single
// feature set request.
List<List<FeatureRow>> featureRows =
retriever.getOnlineFeatures(entityRows, featureSetRequests);

// For each feature set request, read the feature rows returned by the retriever, and
// populate the featureValuesMap with the feature values corresponding to that entity row.
for (var fsIdx = 0; fsIdx < featureRows.size(); fsIdx++) {
List<FeatureRow> featureRowsForFs = featureRows.get(fsIdx);
FeatureSetRequest featureSetRequest = featureSetRequests.get(fsIdx);

// In order to return values containing the same feature references provided by the user,
// we reuse the feature references in the request as the keys in the featureValuesMap
Map<String, FeatureReference> featureNames =
featureSetRequest.getFeatureReferences().stream()
.collect(
Collectors.toMap(
FeatureReference::getName, featureReference -> featureReference));

// Each feature row returned (per feature set request) corresponds to a given entity row.
// For each feature row, update the featureValuesMap.
for (var entityRowIdx = 0; entityRowIdx < entityRows.size(); entityRowIdx++) {
FeatureRow featureRow = featureRowsForFs.get(entityRowIdx);
EntityRow entityRow = entityRows.get(entityRowIdx);

// If the row is stale, put an empty value into the featureValuesMap.
if (isStale(featureSetRequest, entityRow, featureRow)) {
featureSetRequest
.getFeatureReferences()
Expand Down Expand Up @@ -117,6 +128,8 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ
String.format("%s:%d", ref.getName(), ref.getVersion()))
.inc());

// Else populate the featureValueMap at this entityRow with the values in the feature
// row.
featureRow.getFieldsList().stream()
.filter(field -> featureNames.containsKey(field.getName()))
.forEach(
Expand Down
63 changes: 63 additions & 0 deletions serving/src/main/java/feast/serving/service/ServingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,75 @@
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;

public interface ServingService {
/**
* Get information about the Feast serving deployment.
*
* <p>For Bigquery deployments, this includes the default job staging location to load
* intermediate files to. Otherwise, this method only returns the current Feast Serving backing
* store type.
*
* @param getFeastServingInfoRequest {@link GetFeastServingInfoRequest}
* @return {@link GetFeastServingInfoResponse}
*/
GetFeastServingInfoResponse getFeastServingInfo(
GetFeastServingInfoRequest getFeastServingInfoRequest);

/**
* Get features from an online serving store, given a list of {@link
* feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and list of {@link
* feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved values
* to.
*
* <p>Features can be queried across feature sets, but each {@link
* feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} must contain all entities for
* all feature sets included in the request.
*
* <p>This request is fulfilled synchronously.
*
* @param getFeaturesRequest {@link GetOnlineFeaturesRequest} containing list of {@link
* feast.serving.ServingAPIProto.FeatureReference}s to retrieve and list of {@link
* feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow}s to join the retrieved
* values to.
* @return {@link GetOnlineFeaturesResponse} with list of {@link
* feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues} for each {@link
* feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow} supplied.
*/
GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest getFeaturesRequest);

/**
* Get features from a batch serving store, given a list of {@link
* feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link
* feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join
* retrieved features to. All columns in the provided dataset will be preserved in the output
* dataset.
*
* <p>Due to the potential size of batch retrieval requests, this request is fulfilled
* asynchronously, and returns a retrieval job id, which when supplied to {@link
* #getJob(GetJobRequest)} will return the status of the retrieval job.
*
* @param getFeaturesRequest {@link GetBatchFeaturesRequest} containing a list of {@link
* feast.serving.ServingAPIProto.FeatureReference}s to retrieve, and {@link
* feast.serving.ServingAPIProto.DatasetSource} pointing to remote location of dataset to join
* retrieved features to.
* @return {@link GetBatchFeaturesResponse} containing reference to a retrieval {@link
* feast.serving.ServingAPIProto.Job}.
*/
GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest);

/**
* Get the status of a retrieval job from a batch serving store.
*
* <p>The client should check the status of the returned job periodically by calling ReloadJob to
* determine if the job has completed successfully or with an error. If the job completes
* successfully i.e. status = JOB_STATUS_DONE with no error, then the client can check the
* file_uris for the location to download feature values data. The client is assumed to have
* access to these file URIs.
*
* <p>If an error occurred during retrieval, the {@link GetJobResponse} will also contain the
* error that resulted in termination.
*
* @param getJobRequest {@link GetJobRequest} containing reference to a retrieval job
* @return {@link GetJobResponse}
*/
GetJobResponse getJob(GetJobRequest getJobRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
*/
public interface HistoricalRetriever {

/**
* Get temporary staging location if applicable. If not applicable to this store, returns an empty
* string.
*
* @return staging location uri
*/
String getStagingLocation();

/**
* Get all features corresponding to the provided batch features request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public abstract static class Builder {
public abstract BigQueryHistoricalRetriever build();
}

@Override
public String getStagingLocation() {
return jobStagingLocation();
}

@Override
public HistoricalRetrievalResult getHistoricalFeatures(
String retrievalId, DatasetSource datasetSource, List<FeatureSetRequest> featureSetRequests) {
Expand Down

0 comments on commit 986e337

Please sign in to comment.