Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Data Source specific format options with DataFormat message #1049

Merged
merged 24 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cefa62f
Proto: Add DataFormat proto and use when specifying data format/options
mrzzy Oct 12, 2020
bd64202
Core: Update DataSource model to support new DataFormat proto
mrzzy Oct 12, 2020
8697e61
Python SDK: Update SDK to use new data format API.
mrzzy Oct 13, 2020
977f4e4
Fix python lint
mrzzy Oct 13, 2020
4fa2e0a
Fix Python unit tests refercing old argument `class_path'
mrzzy Oct 13, 2020
395e1a5
Fix DataSource's from_proto() not coverting DataFormat protos to nati…
mrzzy Oct 14, 2020
0754899
Fix python lint
mrzzy Oct 14, 2020
aa2bdfa
Core: Add convience methods to DataGenerator to create Kinesis source…
mrzzy Oct 14, 2020
9b328d6
Core: Add DataSourceValidator to valid data source specs in Feature T…
mrzzy Oct 14, 2020
f88cfaa
E2E: Fix specifying data format in FeatureTables in e2e tests
mrzzy Oct 14, 2020
4afae3f
Rebase on master
mrzzy Oct 14, 2020
6b51d35
E2E: Fix typo
mrzzy Oct 14, 2020
0e3092d
Serving: Fix IT TestUtils not producing Protobuf specs inline with th…
mrzzy Oct 14, 2020
afdfc8a
Fix java lint
mrzzy Oct 14, 2020
919856a
E2E: Fix another typo
mrzzy Oct 14, 2020
92c8319
Proto: Split DataFormat message into StreamFormat and FileFormat mess…
mrzzy Oct 16, 2020
dae055d
Core: Update Core to support split StreamFormat and FileFormat messages
mrzzy Oct 16, 2020
e0d0e5d
Proto: Split data formats from DataSource.proto into new
mrzzy Oct 18, 2020
eff0115
Python SDK: Update SDK to support split StreamFormat and FileFormat
mrzzy Oct 18, 2020
02a4924
Python SDK: Fix python lint
mrzzy Oct 18, 2020
252d64b
Python SDK: Fix missing data_format module due to file being untracke…
mrzzy Oct 19, 2020
2dabb3a
Python SDK: Fix Lint
mrzzy Oct 19, 2020
1bb704e
Python SDK: Fix data_format specification format
mrzzy Oct 19, 2020
dc872a4
Python SDK: Fix issue where _source_to_argument pass FileFormat class…
mrzzy Oct 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions common-test/src/main/java/feast/common/it/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import feast.proto.core.DataFormatProto.FileFormat;
import feast.proto.core.DataFormatProto.FileFormat.ParquetFormat;
import feast.proto.core.DataFormatProto.StreamFormat;
import feast.proto.core.DataFormatProto.StreamFormat.AvroFormat;
import feast.proto.core.DataFormatProto.StreamFormat.ProtoFormat;
import feast.proto.core.DataSourceProto.DataSource;
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
import feast.proto.core.DataSourceProto.DataSource.FileOptions;
import feast.proto.core.DataSourceProto.DataSource.KafkaOptions;
import feast.proto.core.DataSourceProto.DataSource.KinesisOptions;
import feast.proto.core.EntityProto;
import feast.proto.core.FeatureProto;
import feast.proto.core.FeatureProto.FeatureSpecV2;
Expand Down Expand Up @@ -266,11 +272,14 @@ public static FeatureTableSpec createFeatureTableSpec(
}

public static DataSource createFileDataSourceSpec(
String fileURL, String fileFormat, String timestampColumn, String datePartitionColumn) {
String fileURL, String timestampColumn, String datePartitionColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_FILE)
.setFileOptions(
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
FileOptions.newBuilder()
.setFileFormat(createParquetFormat())
.setFileUrl(fileURL)
.build())
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
Expand All @@ -294,7 +303,7 @@ public static DataSource createKafkaDataSourceSpec(
KafkaOptions.newBuilder()
.setTopic(topic)
.setBootstrapServers(servers)
.setClassPath(classPath)
.setMessageFormat(createProtoFormat("class.path"))
.build())
.setEventTimestampColumn(timestampColumn)
.build();
Expand Down Expand Up @@ -327,4 +336,34 @@ public static ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow createEntityR
.putFields(entityName, entityValue)
.build();
}

public static DataSource createKinesisDataSourceSpec(
String region, String streamName, String classPath, String timestampColumn) {
return DataSource.newBuilder()
.setType(DataSource.SourceType.STREAM_KINESIS)
.setKinesisOptions(
KinesisOptions.newBuilder()
.setRegion("ap-nowhere1")
.setStreamName("stream")
.setRecordFormat(createProtoFormat(classPath))
.build())
.setEventTimestampColumn(timestampColumn)
.build();
}

public static FileFormat createParquetFormat() {
return FileFormat.newBuilder().setParquetFormat(ParquetFormat.getDefaultInstance()).build();
}

public static StreamFormat createAvroFormat(String schemaJSON) {
return StreamFormat.newBuilder()
.setAvroFormat(AvroFormat.newBuilder().setSchemaJson(schemaJSON).build())
.build();
}

public static StreamFormat createProtoFormat(String classPath) {
return StreamFormat.newBuilder()
.setProtoFormat(ProtoFormat.newBuilder().setClassPath(classPath).build())
.build();
}
}
53 changes: 45 additions & 8 deletions core/src/main/java/feast/core/model/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package feast.core.model;

import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import feast.core.util.TypeConversion;
import feast.proto.core.DataFormatProto.FileFormat;
import feast.proto.core.DataFormatProto.StreamFormat;
import feast.proto.core.DataSourceProto;
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
import feast.proto.core.DataSourceProto.DataSource.FileOptions;
Expand Down Expand Up @@ -91,20 +95,23 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
switch (spec.getType()) {
case BATCH_FILE:
dataSourceConfigMap.put("file_url", spec.getFileOptions().getFileUrl());
dataSourceConfigMap.put("file_format", spec.getFileOptions().getFileFormat());
dataSourceConfigMap.put("file_format", printJSON(spec.getFileOptions().getFileFormat()));
break;
case BATCH_BIGQUERY:
dataSourceConfigMap.put("table_ref", spec.getBigqueryOptions().getTableRef());
break;
case STREAM_KAFKA:
dataSourceConfigMap.put("bootstrap_servers", spec.getKafkaOptions().getBootstrapServers());
dataSourceConfigMap.put("class_path", spec.getKafkaOptions().getClassPath());
dataSourceConfigMap.put(
"message_format", printJSON(spec.getKafkaOptions().getMessageFormat()));
dataSourceConfigMap.put("topic", spec.getKafkaOptions().getTopic());
break;
case STREAM_KINESIS:
dataSourceConfigMap.put("class_path", spec.getKinesisOptions().getClassPath());
dataSourceConfigMap.put(
"record_format", printJSON(spec.getKinesisOptions().getRecordFormat()));
dataSourceConfigMap.put("region", spec.getKinesisOptions().getRegion());
dataSourceConfigMap.put("stream_name", spec.getKinesisOptions().getStreamName());

break;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -137,7 +144,11 @@ public DataSourceProto.DataSource toProto() {
case BATCH_FILE:
FileOptions.Builder fileOptions = FileOptions.newBuilder();
fileOptions.setFileUrl(dataSourceConfigMap.get("file_url"));
fileOptions.setFileFormat(dataSourceConfigMap.get("file_format"));

FileFormat.Builder fileFormat = FileFormat.newBuilder();
parseMessage(dataSourceConfigMap.get("file_format"), fileFormat);
fileOptions.setFileFormat(fileFormat.build());

spec.setFileOptions(fileOptions.build());
break;
case BATCH_BIGQUERY:
Expand All @@ -148,15 +159,23 @@ public DataSourceProto.DataSource toProto() {
case STREAM_KAFKA:
KafkaOptions.Builder kafkaOptions = KafkaOptions.newBuilder();
kafkaOptions.setBootstrapServers(dataSourceConfigMap.get("bootstrap_servers"));
kafkaOptions.setClassPath(dataSourceConfigMap.get("class_path"));
kafkaOptions.setTopic(dataSourceConfigMap.get("topic"));

StreamFormat.Builder messageFormat = StreamFormat.newBuilder();
parseMessage(dataSourceConfigMap.get("message_format"), messageFormat);
kafkaOptions.setMessageFormat(messageFormat.build());

spec.setKafkaOptions(kafkaOptions.build());
break;
case STREAM_KINESIS:
KinesisOptions.Builder kinesisOptions = KinesisOptions.newBuilder();
kinesisOptions.setClassPath(dataSourceConfigMap.get("class_path"));
kinesisOptions.setRegion(dataSourceConfigMap.get("region"));
kinesisOptions.setStreamName(dataSourceConfigMap.get("stream_name"));

StreamFormat.Builder recordFormat = StreamFormat.newBuilder();
parseMessage(dataSourceConfigMap.get("record_format"), recordFormat);
kinesisOptions.setRecordFormat(recordFormat.build());

spec.setKinesisOptions(kinesisOptions.build());
break;
default:
Expand Down Expand Up @@ -194,4 +213,22 @@ public boolean equals(Object o) {
DataSource other = (DataSource) o;
return this.toProto().equals(other.toProto());
}

/** Print the given Message into its JSON string representation */
private static String printJSON(MessageOrBuilder message) {
try {
return JsonFormat.printer().print(message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Unexpected exception convering Proto to JSON", e);
}
}

/** Parse the given Message in JSON representation into the given Message Builder */
private static void parseMessage(String json, Message.Builder message) {
try {
JsonFormat.parser().merge(json, message);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Unexpected exception convering JSON to Proto", e);
}
}
}
81 changes: 81 additions & 0 deletions core/src/main/java/feast/core/validators/DataSourceValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.validators;

import static feast.core.validators.Matchers.*;
import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;

import feast.proto.core.DataFormatProto.FileFormat;
import feast.proto.core.DataFormatProto.StreamFormat;
import feast.proto.core.DataSourceProto.DataSource;

public class DataSourceValidator {
/** Validate if the given DataSource protobuf spec is valid. */
public static void validate(DataSource spec) {
switch (spec.getType()) {
case BATCH_FILE:
FileFormat.FormatCase fileFormat = spec.getFileOptions().getFileFormat().getFormatCase();
switch (fileFormat) {
case PARQUET_FORMAT:
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported File Format: %s", fileFormat));
}
break;

case BATCH_BIGQUERY:
checkValidBigQueryTableRef(spec.getBigqueryOptions().getTableRef(), "FeatureTable");
break;

case STREAM_KAFKA:
StreamFormat.FormatCase messageFormat =
spec.getKafkaOptions().getMessageFormat().getFormatCase();
switch (messageFormat) {
case PROTO_FORMAT:
checkValidClassPath(
spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported Stream Format for Kafka Source Type: %s", messageFormat));
}
break;

case STREAM_KINESIS:
// Verify tht DataFormat is supported by kinesis data source
StreamFormat.FormatCase recordFormat =
spec.getKinesisOptions().getRecordFormat().getFormatCase();
switch (recordFormat) {
case PROTO_FORMAT:
checkValidClassPath(
spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat));
}
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported Feature Store Type: %s", spec.getType()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static feast.core.validators.Matchers.*;

import feast.proto.core.DataSourceProto.DataSource.SourceType;
import feast.proto.core.FeatureProto.FeatureSpecV2;
import feast.proto.core.FeatureTableProto.FeatureTableSpec;
import java.util.ArrayList;
Expand Down Expand Up @@ -49,12 +50,6 @@ public static void validateSpec(FeatureTableSpec spec) {
checkValidCharacters(spec.getName(), "FeatureTable");
spec.getFeaturesList().forEach(FeatureTableValidator::validateFeatureSpec);

// Check that BigQuery reference defined for BigQuery source is valid
if (!spec.getBatchSource().getBigqueryOptions().getTableRef().isEmpty()) {
checkValidBigQueryTableRef(
spec.getBatchSource().getBigqueryOptions().getTableRef(), "FeatureTable");
}

// Check that features and entities defined in FeatureTable do not use reserved names
ArrayList<String> fieldNames = new ArrayList<>(spec.getEntitiesList());
fieldNames.addAll(
Expand All @@ -70,6 +65,14 @@ public static void validateSpec(FeatureTableSpec spec) {
throw new IllegalArgumentException(
String.format("Entity and Feature names within a Feature Table should be unique."));
}

// Check that the data sources defined in the feature table are valid
if (!spec.getBatchSource().getType().equals(SourceType.INVALID)) {
DataSourceValidator.validate(spec.getBatchSource());
}
if (!spec.getStreamSource().getType().equals(SourceType.INVALID)) {
DataSourceValidator.validate(spec.getStreamSource());
}
}

private static void validateFeatureSpec(FeatureSpecV2 spec) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/feast/core/validators/Matchers.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class Matchers {

private static Pattern BIGQUERY_TABLE_REF_REGEX =
Pattern.compile("[a-zA-Z0-9-]+[:]+[a-zA-Z0-9_]+[.]+[a-zA-Z0-9_]*");
private static Pattern CLASS_PATH_REGEX =
Pattern.compile("[a-zA-Z_$][a-zA-Z0-9_$]*(\\.[a-zA-Z_$][a-zA-Z0-9_$]*)");
private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$");
private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$");
private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");
Expand Down Expand Up @@ -92,6 +94,14 @@ public static void checkValidBigQueryTableRef(String input, String resource)
}
}

public static void checkValidClassPath(String input, String resource) {
if (!CLASS_PATH_REGEX.matcher(input).matches()) {
throw new IllegalArgumentException(
String.format(
ERROR_MESSAGE_TEMPLATE, resource, input, "argument must be a valid Java Classpath"));
}
}

public static boolean hasDuplicates(Collection<String> strings) {
return (new HashSet<>(strings)).size() < strings.size();
}
Expand Down
21 changes: 3 additions & 18 deletions core/src/test/java/feast/core/model/DataSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*/
package feast.core.model;

import static feast.proto.core.DataSourceProto.DataSource.SourceType.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;

import feast.common.it.DataGenerator;
import feast.proto.core.DataSourceProto;
import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions;
import feast.proto.core.DataSourceProto.DataSource.KinesisOptions;
import java.util.List;
import java.util.Map;
import org.junit.Test;
Expand Down Expand Up @@ -55,21 +52,9 @@ public void shouldFromProtoBeReversableWithToProto() {

private List<DataSourceProto.DataSource> getTestSpecs() {
return List.of(
DataGenerator.createFileDataSourceSpec("file:///path/to/file", "parquet", "ts_col", ""),
DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", ""),
DataGenerator.createKafkaDataSourceSpec("localhost:9092", "topic", "class.path", "ts_col"),
DataSourceProto.DataSource.newBuilder()
.setType(BATCH_BIGQUERY)
.setBigqueryOptions(
BigQueryOptions.newBuilder().setTableRef("project:dataset.table").build())
.build(),
DataSourceProto.DataSource.newBuilder()
.setType(STREAM_KINESIS)
.setKinesisOptions(
KinesisOptions.newBuilder()
.setRegion("ap-nowhere1")
.setStreamName("stream")
.setClassPath("class.path")
.build())
.build());
DataGenerator.createBigQueryDataSourceSpec("project:dataset.table", "ts_col", "dt_col"),
DataGenerator.createKinesisDataSourceSpec("ap-nowhere1", "stream", "class.path", "ts_col"));
}
}
Loading