Skip to content

Commit

Permalink
Change serving and ingestion to use storage API (#553)
Browse files Browse the repository at this point in the history
* Change serving and ingestion to use storage API

* Remove extra exclusion clause
  • Loading branch information
Chen Zhiling authored Mar 25, 2020
1 parent 27b3c91 commit 5328a1b
Show file tree
Hide file tree
Showing 49 changed files with 646 additions and 3,906 deletions.
18 changes: 18 additions & 0 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-redis</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-storage-connector-bigquery</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
Expand Down
61 changes: 36 additions & 25 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package feast.ingestion;

import static feast.ingestion.utils.SpecUtil.getFeatureSetReference;
import static feast.ingestion.utils.StoreUtil.getFeatureSink;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSet;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.BZip2Decompressor;
Expand All @@ -28,12 +30,13 @@
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.utils.StoreUtil;
import feast.ingestion.values.FailedElement;
import feast.storage.api.write.FailedElement;
import feast.storage.api.write.FeatureSink;
import feast.storage.api.write.WriteResult;
import feast.types.FeatureRowProto.FeatureRow;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -93,17 +96,24 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSets);

// Generate tags by key
Map<String, FeatureSet> featureSetsByKey = new HashMap<>();
Map<String, FeatureSetSpec> featureSetSpecsByKey = new HashMap<>();
subscribedFeatureSets.stream()
.forEach(
fs -> {
String ref = getFeatureSetReference(fs);
featureSetsByKey.put(ref, fs);
String ref = getFeatureSetReference(fs.getSpec());
featureSetSpecsByKey.put(ref, fs.getSpec());
});

FeatureSink featureSink = getFeatureSink(store, featureSetSpecsByKey);

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSpec().getSource();

for (FeatureSet featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
featureSink.prepareWrite(featureSet);
}

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
Expand All @@ -114,28 +124,20 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSet featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
}

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows =
convertedFeatureRows
.get(FEATURE_ROW_OUT)
.apply(
ValidateFeatureRows.newBuilder()
.setFeatureSets(featureSetsByKey)
.setFeatureSetSpecs(featureSetSpecsByKey)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSets(featureSetsByKey).setStore(store).build());
WriteResult writeFeatureRows =
validatedRows.get(FEATURE_ROW_OUT).apply("WriteFeatureRowToStore", featureSink.write());

// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
Expand All @@ -156,16 +158,25 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());

writeFeatureRows
.getFailedInserts()
.apply(
"WriteFailedElements_WriteFeatureRowToStore",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 5. Write metrics to a metrics sink.
validatedRows.apply(
"WriteMetrics",
WriteMetricsTransform.newBuilder()
.setStoreName(store.getName())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());
writeFeatureRows
.getSuccessfulInserts()
.apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName()));

writeFeatureRows
.getFailedInserts()
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

return pipeline.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.storage.api.write.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.FeatureSet;
import feast.storage.api.write.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -36,7 +36,7 @@
public abstract class ValidateFeatureRows
extends PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract Map<String, FeatureSetProto.FeatureSet> getFeatureSets();
public abstract Map<String, FeatureSetProto.FeatureSetSpec> getFeatureSetSpecs();

public abstract TupleTag<FeatureRow> getSuccessTag();

Expand All @@ -49,7 +49,8 @@ public static Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSets(Map<String, FeatureSetProto.FeatureSet> featureSets);
public abstract Builder setFeatureSetSpecs(
Map<String, FeatureSetProto.FeatureSetSpec> featureSets);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

Expand All @@ -62,7 +63,7 @@ public abstract static class Builder {
public PCollectionTuple expand(PCollection<FeatureRow> input) {

Map<String, FeatureSet> featureSets =
getFeatureSets().entrySet().stream()
getFeatureSetSpecs().entrySet().stream()
.map(e -> Pair.of(e.getKey(), new FeatureSet(e.getValue())))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import feast.ingestion.values.FailedElement;
import feast.storage.api.write.FailedElement;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
Expand Down
168 changes: 0 additions & 168 deletions ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import com.google.auto.value.AutoValue;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.transform.ReadFromSource.Builder;
import feast.ingestion.values.FailedElement;
import feast.storage.api.write.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Base64;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package feast.ingestion.transform.fn;

import com.google.auto.value.AutoValue;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.FeatureSet;
import feast.ingestion.values.Field;
import feast.storage.api.write.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
Expand Down
Loading

0 comments on commit 5328a1b

Please sign in to comment.