Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Protobuf Support #723

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ dependencies {

compileOnly group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion

testImplementation (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) {
exclude group: 'org.slf4j', module: 'slf4j-api'
Expand All @@ -146,6 +147,7 @@ dependencies {
testImplementation group: 'org.apache.flink', name: 'flink-table-planner_' + flinkScalaVersion, classifier: 'tests', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: hamcrestVersion
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: testcontainersVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: junit5Version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.util.MessageToRowConverter;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
import io.pravega.schemaregistry.client.SchemaRegistryClientFactory;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
Expand All @@ -36,13 +38,15 @@
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataConverters;
import org.apache.flink.formats.protobuf.PbFormatConfig.PbFormatConfigBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import com.google.protobuf.GeneratedMessageV3;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -51,12 +55,17 @@
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Deserialization schema from Pravega Schema Registry to Flink Table/SQL internal data structure {@link RowData}.
* Deserialization schema from Pravega Schema Registry to Flink Table/SQL
* internal data structure {@link RowData}.
*
* <p>Deserializes a <code>byte[]</code> message as a Pravega Schema Registry and reads the specified fields.
* <p>
* Deserializes a <code>byte[]</code> message as a Pravega Schema Registry and
* reads the specified fields.
*
* <p>Failures during deserialization are forwarded as wrapped IOExceptions.
* <p>
* Failures during deserialization are forwarded as wrapped IOExceptions.
*/
public class PravegaRegistryRowDataDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -103,21 +112,33 @@ public class PravegaRegistryRowDataDeserializationSchema implements Deserializat
/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;

/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
/**
* Flag indicating whether to ignore invalid fields/rows (default: throw an
* exception).
*/
private final boolean ignoreParseErrors;

/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;

// --------------------------------------------------------------------------------------------
// Protobuf fields
// --------------------------------------------------------------------------------------------

/** Protobuf serialization schema. */
private transient ProtobufSchema pbSchema;

/** Protobuf Message Class generated from static .proto file. */
private GeneratedMessageV3 pbMessage;

public PravegaRegistryRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> typeInfo,
String groupId,
PravegaConfig pravegaConfig,
boolean failOnMissingField,
boolean ignoreParseErrors,
TimestampFormat timestampFormat
) {
TimestampFormat timestampFormat) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
Expand All @@ -135,8 +156,8 @@ public PravegaRegistryRowDataDeserializationSchema(
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
SchemaRegistryClientConfig schemaRegistryClientConfig =
SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
Expand All @@ -153,8 +174,7 @@ public void open(InitializationContext context) throws Exception {
break;
case Json:
ObjectMapper objectMapper = new ObjectMapper();
boolean hasDecimalType =
LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
Expand All @@ -166,6 +186,10 @@ public void open(InitializationContext context) throws Exception {
config.isWriteEncodingHeader(),
objectMapper);
break;
case Protobuf:
pbSchema = ProtobufSchema.of(pbMessage.getClass());
deserializer = SerializerFactory.protobufDeserializer(config, pbSchema);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -190,20 +214,26 @@ public Object deserializeToObject(byte[] message) {
return deserializer.deserialize(ByteBuffer.wrap(message));
}

public RowData convertToRowData(Object message) {
public RowData convertToRowData(Object message) throws Exception {
Object o;
switch (serializationFormat) {
case Avro:
AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
AvroToRowDataConverters.createRowConverter(rowType);
AvroToRowDataConverters.AvroToRowDataConverter avroConverter = AvroToRowDataConverters
.createRowConverter(rowType);
o = avroConverter.convert(message);
break;
case Json:
JsonToRowDataConverters.JsonToRowDataConverter jsonConverter =
new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
JsonToRowDataConverters.JsonToRowDataConverter jsonConverter = new JsonToRowDataConverters(
failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
o = jsonConverter.convert((JsonNode) message);
break;
case Protobuf:
PbFormatConfigBuilder pbConfigBuilder = new PbFormatConfigBuilder()
.messageClassName(pbMessage.getClass().getName());
MessageToRowConverter pbMessageConverter = new MessageToRowConverter(rowType, pbConfigBuilder.build());
o = pbMessageConverter.convertMessageToRow(message);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -214,16 +244,16 @@ private static class FlinkJsonGenericDeserializer extends AbstractDeserializer<J
private final ObjectMapper objectMapper;

public FlinkJsonGenericDeserializer(String groupId, SchemaRegistryClient client,
SerializerConfig.Decoders decoders, EncodingCache encodingCache,
boolean encodeHeader, ObjectMapper objectMapper) {
SerializerConfig.Decoders decoders, EncodingCache encodingCache,
boolean encodeHeader, ObjectMapper objectMapper) {
super(groupId, client, null, false, decoders, encodingCache, encodeHeader);
this.objectMapper = objectMapper;
}

@Override
public final JsonNode deserialize(InputStream inputStream,
SchemaInfo writerSchemaInfo,
SchemaInfo readerSchemaInfo) throws IOException {
SchemaInfo writerSchemaInfo,
SchemaInfo readerSchemaInfo) throws IOException {
return objectMapper.readTree(inputStream);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.table.catalog.pravega.util.PravegaSchemaUtils;
import io.pravega.connectors.flink.util.RowToMessageConverter;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
Expand All @@ -27,6 +28,7 @@
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema;
import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.codec.Encoder;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractSerializer;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
Expand All @@ -41,24 +43,33 @@
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.formats.protobuf.PbCodegenException;
import org.apache.flink.formats.protobuf.PbFormatConfig.PbFormatConfigBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import com.google.protobuf.AbstractMessage;
import com.google.protobuf.GeneratedMessageV3;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

/**
* Serialization schema that serializes an object of Flink internal data structure {@link RowData} into
* Serialization schema that serializes an object of Flink internal data
* structure {@link RowData} into
* Pravega Schema Registry bytes.
*
* <p>Serializes the input Flink object into GenericRecord and converts it into <code>byte[]</code>.
* <p>
* Serializes the input Flink object into GenericRecord and converts it into
* <code>byte[]</code>.
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link
* <p>
* Result <code>byte[]</code> messages can be deserialized using {@link
* PravegaRegistryRowDataDeserializationSchema}.
*/
public class PravegaRegistryRowDataSerializationSchema implements SerializationSchema<RowData> {
Expand Down Expand Up @@ -113,6 +124,16 @@ public class PravegaRegistryRowDataSerializationSchema implements SerializationS
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;

// --------------------------------------------------------------------------------------------
// Protobuf fields
// --------------------------------------------------------------------------------------------

/** Protobuf serialization schema. */
private transient ProtobufSchema pbSchema;

/** Protobuf Message Class generated from static .proto file. */
private GeneratedMessageV3 pbMessage;

public PravegaRegistryRowDataSerializationSchema(
RowType rowType,
String groupId,
Expand All @@ -137,8 +158,8 @@ public PravegaRegistryRowDataSerializationSchema(
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
SchemaRegistryClientConfig schemaRegistryClientConfig =
SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
Expand All @@ -162,6 +183,10 @@ public void open(InitializationContext context) throws Exception {
config.isRegisterSchema(),
config.isWriteEncodingHeader());
break;
case Protobuf:
pbSchema = ProtobufSchema.of(pbMessage.getClass());
serializer = SerializerFactory.protobufSerializer(config, pbSchema);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -176,6 +201,8 @@ public byte[] serialize(RowData row) {
return convertToByteArray(serializeToGenericRecord(row));
case Json:
return convertToByteArray(serializaToJsonNode(row));
case Protobuf:
return convertToByteArray(serializeToMessage(row));
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -185,8 +212,8 @@ public byte[] serialize(RowData row) {
}

public GenericRecord serializeToGenericRecord(RowData row) {
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter =
RowDataToAvroConverters.createConverter(rowType);
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter = RowDataToAvroConverters
.createConverter(rowType);
return (GenericRecord) runtimeConverter.convert(avroSchema, row);
}

Expand All @@ -200,6 +227,13 @@ public JsonNode serializaToJsonNode(RowData row) {
return runtimeConverter.convert(mapper, node, row);
}

public AbstractMessage serializeToMessage(RowData row) throws Exception {
PbFormatConfigBuilder pbConfigBuilder = new PbFormatConfigBuilder()
.messageClassName(pbMessage.getClass().getName());
RowToMessageConverter runtimeConverter = new RowToMessageConverter(rowType, pbConfigBuilder.build());
return runtimeConverter.convertRowToProtoMessage(row);
}

@SuppressWarnings("unchecked")
public byte[] convertToByteArray(Object message) {
return serializer.serialize(message).array();
Expand All @@ -208,14 +242,16 @@ public byte[] convertToByteArray(Object message) {
@VisibleForTesting
protected static class FlinkJsonSerializer extends AbstractSerializer<JsonNode> {
private final ObjectMapper objectMapper;

public FlinkJsonSerializer(String groupId, SchemaRegistryClient client, JSONSchema schema,
Encoder encoder, boolean registerSchema, boolean encodeHeader) {
Encoder encoder, boolean registerSchema, boolean encodeHeader) {
super(groupId, client, schema, encoder, registerSchema, encodeHeader);
objectMapper = new ObjectMapper();
}

@Override
protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream) throws IOException {
protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream)
throws IOException {
objectMapper.writeValue(outputStream, jsonNode);
outputStream.flush();
}
Expand Down
Loading
Loading