Skip to content

Commit

Permalink
Don't send unrecognized featureSets to deadletter in IngestionJob (#845)
Browse files Browse the repository at this point in the history
* do not send unrecognized featureSets to deadletter

* pr comments
  • Loading branch information
Oleksii Moskalenko authored Jun 30, 2020
1 parent d6c2a65 commit 8331482
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
public abstract class ValidateFeatureRowDoFn extends DoFn<FeatureRow, FeatureRow> {
private static final Logger log = LoggerFactory.getLogger(ValidateFeatureRowDoFn.class);

public abstract PCollectionView<Map<String, Iterable<FeatureSetProto.FeatureSetSpec>>>
getFeatureSets();
Expand Down Expand Up @@ -65,40 +68,43 @@ public void processElement(ProcessContext context) {
FeatureRow featureRow = context.element();
Iterable<FeatureSetProto.FeatureSetSpec> featureSetSpecs =
context.sideInput(getFeatureSets()).get(featureRow.getFeatureSet());
if (featureSetSpecs == null) {
log.warn(
String.format(
"FeatureRow contains invalid featureSetReference %s."
+ " Please check that the feature rows are being published"
+ " to the correct topic on the feature stream.",
featureRow.getFeatureSet()));
return;
}

List<FieldProto.Field> fields = new ArrayList<>();
if (featureSetSpecs != null) {
FeatureSetProto.FeatureSetSpec latestSpec = Iterators.getLast(featureSetSpecs.iterator());
FeatureSet featureSet = new FeatureSet(latestSpec);

for (FieldProto.Field field : featureRow.getFieldsList()) {
Field fieldSpec = featureSet.getField(field.getName());
if (fieldSpec == null) {
// skip
continue;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
int expectedTypeFieldNumber = fieldSpec.getType().getNumber();
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
error =
String.format(
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
field.getName(), field.getValue().getValCase(), fieldSpec.getType());
break;
}
}
if (!fields.contains(field)) {
fields.add(field);

FeatureSetProto.FeatureSetSpec latestSpec = Iterators.getLast(featureSetSpecs.iterator());
FeatureSet featureSet = new FeatureSet(latestSpec);

for (FieldProto.Field field : featureRow.getFieldsList()) {
Field fieldSpec = featureSet.getField(field.getName());
if (fieldSpec == null) {
// skip
continue;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
int expectedTypeFieldNumber = fieldSpec.getType().getNumber();
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
error =
String.format(
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
field.getName(), field.getValue().getValCase(), fieldSpec.getType());
break;
}
}
} else {
error =
String.format(
"FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.",
featureRow.getFeatureSet());
if (!fields.contains(field)) {
fields.add(field);
}
}

if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.ingestion.transform;

import static feast.common.models.FeatureSet.getFeatureSetStringRef;

import feast.proto.core.FeatureSetProto.EntitySpec;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
Expand Down Expand Up @@ -104,7 +106,17 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() {
expected.add(randomRow);
}

input.add(FeatureRow.newBuilder().setFeatureSet("invalid").build());
FeatureRow invalidRow =
FeatureRow.newBuilder()
.setFeatureSet(getFeatureSetStringRef(fs1))
.addFields(
Field.newBuilder()
.setName("feature_1")
.setValue(Value.newBuilder().setBoolVal(false).build())
.build())
.build();

input.add(invalidRow);

PCollectionView<Map<String, Iterable<FeatureSetSpec>>> specsView =
p.apply("StaticSpecs", Create.of(featureSetSpecs)).apply(View.asMultimap());
Expand Down

0 comments on commit 8331482

Please sign in to comment.