Skip to content

Commit

Permalink
Add IngestionId & EventTimestamp to FeatureRowBatch to calculate lag …
Browse files Browse the repository at this point in the history
…metric correctly (#874)

* ingestionId & eventTimestamp in FeatureRowBatch

* refactor idx operations using schema

* dummy

Co-authored-by: Oleksii Moskalenko <oleksii.moskalenko@go-jek.com>
  • Loading branch information
Oleksii Moskalenko and Oleksii Moskalenko committed Jul 17, 2020
1 parent daf2041 commit 1c05617
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static feast.proto.types.ValueProto.Value.ValCase.*;
import static feast.storage.connectors.bigquery.common.TypeUtil.*;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.Timestamp;
import feast.proto.types.FeatureRowProto;
import feast.proto.types.FieldProto;
import feast.proto.types.ValueProto;
Expand All @@ -40,6 +42,8 @@
* <p>getFeatureRows provides reverse transformation
*/
public class FeatureRowsBatch implements Serializable {
public static final ImmutableList<String> SERVICE_FIELDS =
ImmutableList.of("eventTimestamp", "ingestionId");
private final Schema schema;
private String featureSetReference;
private List<Object> values = new ArrayList<>();
Expand Down Expand Up @@ -118,6 +122,12 @@ private Schema inferCommonSchema(Iterable<FeatureRowProto.FeatureRow> featureRow
featureSetReference = row.getFeatureSet();
}
}));

fieldsInOrder.add(
Schema.Field.of("eventTimestamp", Schema.FieldType.array(Schema.FieldType.INT64)));
fieldsInOrder.add(
Schema.Field.of("ingestionId", Schema.FieldType.array(Schema.FieldType.STRING)));

Schema schema = Schema.builder().addFields(fieldsInOrder).build();
schema.setUUID(UUID.randomUUID());
return schema;
Expand All @@ -132,16 +142,33 @@ private void initValues() {
}

private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {
int timestampColumnIdx = schema.indexOf("eventTimestamp");
int ingestionIdColumnIdx = schema.indexOf("ingestionId");

featureRows.forEach(
row -> {
Map<String, ValueProto.Value> rowValues =
row.getFieldsList().stream()
.collect(Collectors.toMap(FieldProto.Field::getName, FieldProto.Field::getValue));
Map<String, ValueProto.Value> rowValues;
try {
rowValues =
row.getFieldsList().stream()
.collect(
Collectors.toMap(FieldProto.Field::getName, FieldProto.Field::getValue));
} catch (IllegalStateException e) {
// row contains feature duplicates
// omitting for now
return;
}

IntStream.range(0, schema.getFieldCount())
schema
.getFieldNames()
.forEach(
idx -> {
Schema.Field field = schema.getField(idx);
fieldName -> {
if (SERVICE_FIELDS.contains(fieldName)) {
return;
}
Schema.Field field = schema.getField(fieldName);
int idx = schema.indexOf(fieldName);

if (rowValues.containsKey(field.getName())) {
Object o = protoValueToObject(rowValues.get(field.getName()));
if (o != null) {
Expand All @@ -152,6 +179,10 @@ private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {

((List<Object>) values.get(idx)).add(defaultValues.get(field.getName()));
});

// adding service fields
((List<Object>) values.get(timestampColumnIdx)).add(row.getEventTimestamp().getSeconds());
((List<Object>) values.get(ingestionIdColumnIdx)).add(row.getIngestionId());
});
}

Expand All @@ -177,27 +208,45 @@ public static FeatureRowsBatch fromRow(Row row) {
}

public Iterator<FeatureRowProto.FeatureRow> getFeatureRows() {
int timestampColumnIdx = schema.indexOf("eventTimestamp");
int ingestionIdColumnIdx = schema.indexOf("ingestionId");

return IntStream.range(0, ((List<Object>) values.get(0)).size())
.parallel()
.mapToObj(
rowIdx ->
FeatureRowProto.FeatureRow.newBuilder()
.setFeatureSet(getFeatureSetReference())
.setEventTimestamp(
Timestamp.newBuilder()
.setSeconds(
(long)
(((List<Object>) values.get(timestampColumnIdx)).get(rowIdx)))
.build())
.setIngestionId(
(String) (((List<Object>) values.get(ingestionIdColumnIdx)).get(rowIdx)))
.addAllFields(
IntStream.range(0, schema.getFieldCount())
.mapToObj(
fieldIdx ->
FieldProto.Field.newBuilder()
.setName(schema.getField(fieldIdx).getName())
.setValue(
objectToProtoValue(
((List<Object>) values.get(fieldIdx)).get(rowIdx),
schemaToProtoTypes.get(
schema
.getField(fieldIdx)
.getType()
.getCollectionElementType())))
.build())
schema.getFieldNames().stream()
.map(
fieldName -> {
if (SERVICE_FIELDS.contains(fieldName)) {
return null;
}
int fieldIdx = schema.indexOf(fieldName);

return FieldProto.Field.newBuilder()
.setName(schema.getField(fieldIdx).getName())
.setValue(
objectToProtoValue(
((List<Object>) values.get(fieldIdx)).get(rowIdx),
schemaToProtoTypes.get(
schema
.getField(fieldIdx)
.getType()
.getCollectionElementType())))
.build();
})
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.build())
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ private FeatureRow generateRow(String featureSet) {
FeatureRow.Builder row =
FeatureRow.newBuilder()
.setFeatureSet(featureSet)
.setEventTimestamp(
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() / 1000)
.build())
.setIngestionId("ingestion-id")
.addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64))
.addFields(FieldProto.Field.newBuilder().setName("null_value").build());

Expand Down Expand Up @@ -499,6 +504,8 @@ private List<FeatureRow> dropNullFeature(List<FeatureRow> input) {
r ->
FeatureRow.newBuilder()
.setFeatureSet(r.getFeatureSet())
.setIngestionId(r.getIngestionId())
.setEventTimestamp(r.getEventTimestamp())
.addAllFields(copyFieldsWithout(r, "null_value"))
.build())
.collect(Collectors.toList());
Expand All @@ -520,6 +527,8 @@ public static List<FeatureRow> sortFeaturesByName(List<FeatureRow> rows) {

return FeatureRow.newBuilder()
.setFeatureSet(row.getFeatureSet())
.setEventTimestamp(row.getEventTimestamp())
.setIngestionId(row.getIngestionId())
.addAllFields(fieldsList)
.build();
})
Expand Down

0 comments on commit 1c05617

Please sign in to comment.