Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update protos with Tensorflow data validation schema #438

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.grpc;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void getFeatureSet(
GetFeatureSetResponse response = specService.getFeatureSet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
} catch (RetrievalException | StatusRuntimeException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
Expand All @@ -91,7 +92,7 @@ public void listFeatureSets(
ListFeatureSetsResponse response = specService.listFeatureSets(request.getFilter());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | IllegalArgumentException e) {
} catch (RetrievalException | IllegalArgumentException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in ListFeatureSet method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private Job startJob(

return job;
} catch (Exception e) {
log.error(e.getMessage());
AuditLogger.log(
Resource.JOB,
jobId,
Expand Down
31 changes: 21 additions & 10 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,24 @@ public Runner getRunnerType() {

@Override
public Job startJob(Job job) {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
try {
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
return submitDataflowJob(
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
false);

} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to START job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand All @@ -101,14 +108,18 @@ public Job startJob(Job job) {
@Override
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);

List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
return submitDataflowJob(job.getId(), featureSetProtos, job.getSource().toProto(),
job.getStore().toProto(), true);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to update job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
Expand All @@ -38,7 +37,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -75,8 +73,10 @@ public Runner getRunnerType() {
@Override
public Job startJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
Expand Down Expand Up @@ -131,10 +131,6 @@ public Job updateJob(Job job) {
String jobId = job.getExtId();
abortJob(jobId);
try {
List<FeatureSetSpec> featureSetSpecs = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetSpecs.add(featureSet.toProto().getSpec());
}
return startJob(job);
} catch (JobExecutionException e) {
throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);
Expand Down
158 changes: 136 additions & 22 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package feast.core.model;

import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetMeta;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSetStatus;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.types.ValueProto.ValueType;
import feast.types.ValueProto.ValueType.Enum;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -47,6 +48,20 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.hibernate.annotations.Fetch;
import org.hibernate.annotations.FetchMode;
import org.tensorflow.metadata.v0.BoolDomain;
import org.tensorflow.metadata.v0.FeaturePresence;
import org.tensorflow.metadata.v0.FeaturePresenceWithinGroup;
import org.tensorflow.metadata.v0.FixedShape;
import org.tensorflow.metadata.v0.FloatDomain;
import org.tensorflow.metadata.v0.ImageDomain;
import org.tensorflow.metadata.v0.IntDomain;
import org.tensorflow.metadata.v0.NaturalLanguageDomain;
import org.tensorflow.metadata.v0.StringDomain;
import org.tensorflow.metadata.v0.StructDomain;
import org.tensorflow.metadata.v0.TimeDomain;
import org.tensorflow.metadata.v0.TimeOfDayDomain;
import org.tensorflow.metadata.v0.URLDomain;
import org.tensorflow.metadata.v0.ValueCount;

@Getter
@Setter
Expand Down Expand Up @@ -157,23 +172,23 @@ public static FeatureSet fromProto(FeatureSetProto.FeatureSet featureSetProto) {
FeatureSetSpec featureSetSpec = featureSetProto.getSpec();
Source source = Source.fromProto(featureSetSpec.getSource());

List<Field> features = new ArrayList<>();
for (FeatureSpec feature : featureSetSpec.getFeaturesList()) {
features.add(new Field(feature.getName(), feature.getValueType()));
List<Field> featureSpecs = new ArrayList<>();
for (FeatureSpec featureSpec : featureSetSpec.getFeaturesList()) {
featureSpecs.add(new Field(featureSpec));
}

List<Field> entities = new ArrayList<>();
for (EntitySpec entity : featureSetSpec.getEntitiesList()) {
entities.add(new Field(entity.getName(), entity.getValueType()));
List<Field> entitySpecs = new ArrayList<>();
for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
entitySpecs.add(new Field(entitySpec));
}

return new FeatureSet(
featureSetProto.getSpec().getName(),
featureSetProto.getSpec().getProject(),
featureSetProto.getSpec().getVersion(),
featureSetSpec.getMaxAge().getSeconds(),
entities,
features,
entitySpecs,
featureSpecs,
source,
featureSetProto.getMeta().getStatus());
}
Expand Down Expand Up @@ -202,24 +217,21 @@ public void addFeature(Field field) {
features.add(field);
}

public FeatureSetProto.FeatureSet toProto() {
public FeatureSetProto.FeatureSet toProto() throws InvalidProtocolBufferException {
List<EntitySpec> entitySpecs = new ArrayList<>();
for (Field entity : entities) {
entitySpecs.add(
EntitySpec.newBuilder()
.setName(entity.getName())
.setValueType(ValueType.Enum.valueOf(entity.getType()))
.build());
for (Field entityField : entities) {
EntitySpec.Builder entitySpecBuilder = EntitySpec.newBuilder();
setEntitySpecFields(entitySpecBuilder, entityField);
entitySpecs.add(entitySpecBuilder.build());
}

List<FeatureSpec> featureSpecs = new ArrayList<>();
for (Field feature : features) {
featureSpecs.add(
FeatureSpec.newBuilder()
.setName(feature.getName())
.setValueType(ValueType.Enum.valueOf(feature.getType()))
.build());
for (Field featureField : features) {
FeatureSpec.Builder featureSpecBuilder = FeatureSpec.newBuilder();
setFeatureSpecFields(featureSpecBuilder, featureField);
featureSpecs.add(featureSpecBuilder.build());
}

FeatureSetMeta.Builder meta =
FeatureSetMeta.newBuilder()
.setCreatedTimestamp(
Expand All @@ -239,6 +251,108 @@ public FeatureSetProto.FeatureSet toProto() {
return FeatureSetProto.FeatureSet.newBuilder().setMeta(meta).setSpec(spec).build();
}

// setEntitySpecFields and setFeatureSpecFields methods contain duplicated code because
// Feast internally treat EntitySpec and FeatureSpec as Field class. However, the proto message
// builder for EntitySpec and FeatureSpec are of different class.
@SuppressWarnings("DuplicatedCode")
private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field entityField)
throws InvalidProtocolBufferException {
entitySpecBuilder
.setName(entityField.getName())
.setValueType(Enum.valueOf(entityField.getType()));

if (entityField.getPresence() != null) {
entitySpecBuilder.setPresence(FeaturePresence.parseFrom(entityField.getPresence()));
} else if (entityField.getGroupPresence() != null) {
entitySpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence()));
}

if (entityField.getShape() != null) {
entitySpecBuilder.setShape(FixedShape.parseFrom(entityField.getShape()));
} else if (entityField.getValueCount() != null) {
entitySpecBuilder.setValueCount(ValueCount.parseFrom(entityField.getValueCount()));
}

if (entityField.getDomain() != null) {
entitySpecBuilder.setDomain(entityField.getDomain());
} else if (entityField.getIntDomain() != null) {
entitySpecBuilder.setIntDomain(IntDomain.parseFrom(entityField.getIntDomain()));
} else if (entityField.getFloatDomain() != null) {
entitySpecBuilder.setFloatDomain(FloatDomain.parseFrom(entityField.getFloatDomain()));
} else if (entityField.getStringDomain() != null) {
entitySpecBuilder.setStringDomain(StringDomain.parseFrom(entityField.getStringDomain()));
} else if (entityField.getBoolDomain() != null) {
entitySpecBuilder.setBoolDomain(BoolDomain.parseFrom(entityField.getBoolDomain()));
} else if (entityField.getStructDomain() != null) {
entitySpecBuilder.setStructDomain(StructDomain.parseFrom(entityField.getStructDomain()));
} else if (entityField.getNaturalLanguageDomain() != null) {
entitySpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(entityField.getNaturalLanguageDomain()));
} else if (entityField.getImageDomain() != null) {
entitySpecBuilder.setImageDomain(ImageDomain.parseFrom(entityField.getImageDomain()));
} else if (entityField.getMidDomain() != null) {
entitySpecBuilder.setIntDomain(IntDomain.parseFrom(entityField.getIntDomain()));
} else if (entityField.getUrlDomain() != null) {
entitySpecBuilder.setUrlDomain(URLDomain.parseFrom(entityField.getUrlDomain()));
} else if (entityField.getTimeDomain() != null) {
entitySpecBuilder.setTimeDomain(TimeDomain.parseFrom(entityField.getTimeDomain()));
} else if (entityField.getTimeOfDayDomain() != null) {
entitySpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain()));
}
}

// Refer to setEntitySpecFields method for the reason for code duplication.
@SuppressWarnings("DuplicatedCode")
private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field featureField)
throws InvalidProtocolBufferException {
featureSpecBuilder
.setName(featureField.getName())
.setValueType(Enum.valueOf(featureField.getType()));

if (featureField.getPresence() != null) {
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(featureField.getPresence()));
} else if (featureField.getGroupPresence() != null) {
featureSpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence()));
}

if (featureField.getShape() != null) {
featureSpecBuilder.setShape(FixedShape.parseFrom(featureField.getShape()));
} else if (featureField.getValueCount() != null) {
featureSpecBuilder.setValueCount(ValueCount.parseFrom(featureField.getValueCount()));
}

if (featureField.getDomain() != null) {
featureSpecBuilder.setDomain(featureField.getDomain());
} else if (featureField.getIntDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(featureField.getIntDomain()));
} else if (featureField.getFloatDomain() != null) {
featureSpecBuilder.setFloatDomain(FloatDomain.parseFrom(featureField.getFloatDomain()));
} else if (featureField.getStringDomain() != null) {
featureSpecBuilder.setStringDomain(StringDomain.parseFrom(featureField.getStringDomain()));
} else if (featureField.getBoolDomain() != null) {
featureSpecBuilder.setBoolDomain(BoolDomain.parseFrom(featureField.getBoolDomain()));
} else if (featureField.getStructDomain() != null) {
featureSpecBuilder.setStructDomain(StructDomain.parseFrom(featureField.getStructDomain()));
} else if (featureField.getNaturalLanguageDomain() != null) {
featureSpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(featureField.getNaturalLanguageDomain()));
} else if (featureField.getImageDomain() != null) {
featureSpecBuilder.setImageDomain(ImageDomain.parseFrom(featureField.getImageDomain()));
} else if (featureField.getMidDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(featureField.getIntDomain()));
} else if (featureField.getUrlDomain() != null) {
featureSpecBuilder.setUrlDomain(URLDomain.parseFrom(featureField.getUrlDomain()));
} else if (featureField.getTimeDomain() != null) {
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(featureField.getTimeDomain()));
} else if (featureField.getTimeOfDayDomain() != null) {
featureSpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain()));
}
}

/**
* Checks if the given featureSet's schema and source has is different from this one.
*
Expand Down
Loading