diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index efbad2ef47ae0..06f66f60380d9 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -437,8 +437,8 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { if (nativeObject instanceof KeyValue) { KeyValue kv = (KeyValue) nativeObject; - key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema); - value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema); + key = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getKey(), keySchema); + value = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getValue(), valueSchema); } else if (nativeObject != null) { throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass()); } else { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index 757241d411034..a308ef01ddcf1 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -54,6 +54,14 @@ private static List arrayToList(Object nativeObject, Schema kafkaValueSc return out; } + + public static Object getKafkaConnectDataFromSchema(Object nativeObject, Schema kafkaSchema) { + if (kafkaSchema != null && nativeObject == null) { + return null; + } + return getKafkaConnectData(nativeObject, kafkaSchema); + } + @SuppressWarnings("unchecked") public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) { if (kafkaSchema == null) { @@ -380,6 +388,7 @@ private static Object defaultOrThrow(Schema kafkaSchema) { if (kafkaSchema.isOptional()) { return null; } + throw new DataException("Invalid null value for required " + kafkaSchema.type() + " field"); } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java index 2eb6573374cdb..faf28585e8aed 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java @@ -26,11 +26,14 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.confluent.connect.avro.AvroData; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Time; @@ -41,6 +44,76 @@ @Slf4j public class PulsarSchemaToKafkaSchema { + + private static class OptionalForcingSchema implements Schema { + + Schema sourceSchema; + + public OptionalForcingSchema(Schema sourceSchema) { + this.sourceSchema = sourceSchema; + } + + @Override + public Type type() { + return sourceSchema.type(); + } + + @Override + public boolean isOptional() { + return true; + } + + @Override + public Object defaultValue() { + return sourceSchema.defaultValue(); + } + + @Override + public String name() { + return sourceSchema.name(); + } + + @Override + public Integer version() { + return sourceSchema.version(); + } + + @Override + public String doc() { + return sourceSchema.doc(); + } + + @Override + public Map parameters() { + return sourceSchema.parameters(); + } + + @Override + public Schema keySchema() { + return sourceSchema.keySchema(); + } + + @Override + public Schema valueSchema() { + return sourceSchema.valueSchema(); + } + + @Override + public List fields() { + return sourceSchema.fields(); + } + + @Override + public Field field(String s) { + return sourceSchema.field(s); + } + + @Override + public Schema schema() { + return sourceSchema.schema(); + } + } + private static final ImmutableMap pulsarSchemaTypeToKafkaSchema; private static final ImmutableSet kafkaLogicalSchemas; private static final AvroData avroData = new AvroData(1000); @@ -80,6 +153,11 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) { return parser.parse(schemaJson); } + public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { + Schema s = getKafkaConnectSchema(pulsarSchema); + return new OptionalForcingSchema(s); + } + public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) { throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null); @@ -122,7 +200,7 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()), - getKafkaConnectSchema(kvSchema.getValueSchema())) + getOptionalKafkaConnectSchema(kvSchema.getValueSchema())) .build(); } org.apache.avro.Schema avroSchema = parseAvroSchema( diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 567562d338b98..e9d454ed2fd5a 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -51,6 +51,9 @@ import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -60,6 +63,7 @@ import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.util.MessageIdUtils; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; @@ -734,6 +738,56 @@ public void schemaKeyValueSchemaTest() throws Exception { Assert.assertEquals(key, 11); } + @Test + public void schemaKeyValueSchemaNullValueTest() throws Exception { + RecordSchemaBuilder builder = SchemaBuilder + .record("test"); + builder.field("test").type(SchemaType.STRING); + GenericSchema schema = GenericAvroSchema.of(builder.build(SchemaType.AVRO)); + KeyValue kv = new KeyValue<>(11, null); + SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, schema), 11, + "INT32", null, "STRUCT"); + Assert.assertNull(sinkRecord.value()); + int key = (int) sinkRecord.key(); + Assert.assertEquals(key, 11); + } + + @Test + public void schemaKeyValueSchemaNullValueNoUnwrapTest() throws Exception { + props.put("unwrapKeyValueIfAvailable", "false"); + JSONSchema jsonSchema = JSONSchema + .of(SchemaDefinition.builder() + .withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class) + .withAlwaysAllowNull(true) + .build()); + KeyValue kv = new KeyValue<>(11, null); + Map expected = new HashMap(); + expected.put("11", null); + SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, jsonSchema), "key", + "STRING", expected, "MAP"); + Assert.assertNull(((Map)sinkRecord.value()).get(11)); + String key =(String)sinkRecord.key(); + Assert.assertEquals(key, "key"); + } + + @Test + public void schemaKeyValueSchemaNullValueNoUnwrapTestAvro() throws Exception { + props.put("unwrapKeyValueIfAvailable", "false"); + RecordSchemaBuilder builder = SchemaBuilder + .record("test"); + builder.property("op", "test"); + builder.field("test").type(SchemaType.STRING); + GenericSchema schema = GenericAvroSchema.of(builder.build(SchemaType.AVRO)); + KeyValue kv = new KeyValue<>(11, null); + Map expected = new HashMap(); + expected.put("11", null); + SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, schema), "key", + "STRING", expected, "MAP"); + Assert.assertNull(((Map)sinkRecord.value()).get(11)); + String key =(String)sinkRecord.key(); + Assert.assertEquals(key, "key"); + } + @Test public void kafkaLogicalTypesTimestampTest() { Schema schema = new TestSchema(SchemaInfoImpl.builder()