Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-85 Message.getSchema #14

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ public boolean eligible(Message message) {
@Override
public boolean eligible(Message message) {
return SchemaType.STRING.equals(
((MessageImpl)message).getSchema().getSchemaInfo().getType());
((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
}
};
BaseInterceptor interceptor3 = new BaseInterceptor("int3") {
@Override
public boolean eligible(Message message) {
return SchemaType.INT32.equals(
((MessageImpl)message).getSchema().getSchemaInfo().getType());
((MessageImpl)message).getSchemaInternal().getSchemaInfo().getType());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
Expand Down Expand Up @@ -64,6 +67,7 @@
import java.util.concurrent.TimeUnit;

@Test(groups = "broker-api")
@Slf4j
public class SimpleSchemaTest extends ProducerConsumerBase {

@DataProvider(name = "batchingModes")
Expand Down Expand Up @@ -527,8 +531,11 @@ public void testAutoConsume(boolean batching) throws Exception {
assertEquals(data.getValue().getField("i"), i);
MessageImpl impl = (MessageImpl) data;

org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get();
org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchemaInternal().getNativeSchema().get();
assertNotNull(avroSchema);

org.apache.avro.Schema avroSchema2 = (org.apache.avro.Schema) data.getSchema().get().getNativeSchema().get();
assertNotNull(avroSchema2);
}

}
Expand Down Expand Up @@ -598,6 +605,9 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}

// verify c2
Expand All @@ -606,6 +616,9 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}

// verify c3
Expand All @@ -614,6 +627,9 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}

// verify c4
Expand All @@ -622,6 +638,9 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().getField("i"), i * 1000);
KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}
}
}
Expand Down Expand Up @@ -653,6 +672,46 @@ public void testGetSchemaByVersion() throws PulsarClientException, PulsarAdminEx
Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
}

@Test
public void testGetNativeSchemaWithAutoConsumeWithMultiVersion() throws Exception {
final String topic = "persistent://my-property/my-ns/testGetSchemaWithMultiVersion";

@Cleanup
Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test")
.topic(topic)
.subscribe();

@Cleanup
Producer<V1Data> v1DataProducer = pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.create();

@Cleanup
Producer<V2Data> v2DataProducer = pulsarClient.newProducer(Schema.AVRO(V2Data.class))
.topic(topic)
.create();

Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);

v1DataProducer.send(new V1Data());
v2DataProducer.send(new V2Data());

Message<?> messageV1 = consumer.receive();
Schema<?> schemaV1 = messageV1.getSchema().get();
Message<?> messageV2 = consumer.receive();
Schema<?> schemaV2 = messageV2.getSchema().get();
log.info("schemaV1 {} {}", schemaV1.getSchemaInfo(), schemaV1.getNativeSchema());
log.info("schemaV2 {} {}", schemaV2.getSchemaInfo(), schemaV2.getNativeSchema());
assertTrue(schemaV1.getSchemaInfo().getSchemaDefinition().contains("V1Data"));
assertTrue(schemaV2.getSchemaInfo().getSchemaDefinition().contains("V2Data"));
org.apache.avro.Schema avroSchemaV1 = (org.apache.avro.Schema) schemaV1.getNativeSchema().get();
org.apache.avro.Schema avroSchemaV2 = (org.apache.avro.Schema) schemaV2.getNativeSchema().get();
assertNotEquals(avroSchemaV1.toString(false), avroSchemaV2.toString(false));
assertTrue(avroSchemaV1.toString(false).contains("V1Data"));
assertTrue(avroSchemaV2.toString(false).contains("V2Data"));
}

@Test(dataProvider = "topicDomain")
public void testAutoCreatedSchema(String domain) throws Exception {
final String topic1 = domain + "my-property/my-ns/testAutoCreatedSchema-1";
Expand Down
172 changes: 160 additions & 12 deletions pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -131,10 +132,11 @@ public void testMultiTopicSetSchemaProvider() throws Exception {
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());

admin.schemas().createSchema(fqtnTwo, Schema.AVRO(
Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnTwo, personTwoSchema.getSchemaInfo());

Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
Expand All @@ -156,18 +158,140 @@ public void testMultiTopicSetSchemaProvider() throws Exception {
.topic(fqtnOne, fqtnTwo)
.subscribe();

Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test2")
.topic(fqtnOne, fqtnTwo)
.subscribe();

producer.send(personTwo);

Schemas.PersonTwo personConsume = consumer.receive().getValue();
Message<Schemas.PersonTwo> message = consumer.receive();
Schemas.PersonTwo personConsume = message.getValue();
assertEquals(personConsume.getName(), "Tom");
assertEquals(personConsume.getId(), 1);
Schema<?> schema = message.getSchema().get();
log.info("the-schema {}", schema);
assertEquals(personTwoSchema.getSchemaInfo(), schema.getSchemaInfo());
org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema);
assertNotNull(nativeSchema);

// verify that with AUTO_CONSUME we can access the original schema
// and the Native AVRO schema
Message<?> message2 = consumer2.receive();
Schema<?> schema2 = message2.getSchema().get();
log.info("the-schema {}", schema2);
assertEquals(personTwoSchema.getSchemaInfo(), schema2.getSchemaInfo());
org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema2);
assertNotNull(nativeSchema2);

producer.close();
consumer.close();
}

@Test
public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String topicTwo = "test-multi-version-schema-two";
final String fqtnOne = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicOne
).toString();

final String fqtnTwo = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicTwo
).toString();


admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

admin.topics().createPartitionedTopic(fqtnOne, 3);
admin.topics().createPartitionedTopic(fqtnTwo, 3);

Schema<Schemas.PersonOne> schemaOne = Schema.AVRO(
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonOne.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaOne).getSchemaInfo());

Schema<Schemas.PersonTwo> schemaTwo = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());

Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnTwo, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());

Producer<KeyValue<String, Schemas.PersonTwo>> producer = pulsarClient.newProducer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.topic(fqtnOne)
.create();

Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");


Consumer<KeyValue<String, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.subscriptionName("test")
.topic(fqtnOne, fqtnTwo)
.subscribe();

Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test2")
.topic(fqtnOne, fqtnTwo)
.subscribe();

producer.send(new KeyValue<>("foo", personTwo));

Message<KeyValue<String, Schemas.PersonTwo>> message = consumer.receive();
assertEquals("foo", message.getValue().getKey());
Schemas.PersonTwo personConsume = message.getValue().getValue();
assertEquals(personConsume.getName(), "Tom");
assertEquals(personConsume.getId(), 1);
KeyValueSchema schema = (KeyValueSchema) message.getSchema().get();
log.info("the-schema {}", schema);
assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema);
assertNotNull(nativeSchema);

// verify that with AUTO_CONSUME we can access the original schema
// and the Native AVRO schema
Message<?> message2 = consumer2.receive();
KeyValueSchema schema2 = (KeyValueSchema) message2.getSchema().get();
log.info("the-schema {}", schema2);
assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema2);
assertNotNull(nativeSchema2);

producer.close();
consumer.close();
}

@Test
public void testBytesSchemaDeserialize() throws Exception {
public void testJSONSchemaDeserialize() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-bytes-schema";
Expand Down Expand Up @@ -213,6 +337,12 @@ public void testBytesSchemaDeserialize() throws Exception {
assertEquals(message.getValue().getField("address").getClass(),
message1.getValue().getAddress().getClass());

Schema<?> schema = message.getSchema().get();
Schema<?> schema1 = message1.getSchema().get();
log.info("schema {}", schema);
log.info("schema1 {}", schema1);
assertEquals(schema.getSchemaInfo(), schema1.getSchemaInfo());

producer.close();
consumer.close();
consumer1.close();
Expand Down Expand Up @@ -256,12 +386,14 @@ public void testStringSchema() throws Exception {
producer.send("foo");

Message<String> message = consumer.receive();
Message<GenericRecord> message3 = consumer2.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(SchemaType.STRING, message.getSchema().get().getSchemaInfo().getType());
assertEquals(SchemaType.STRING, message2.getSchema().get().getSchemaInfo().getType());

assertEquals("foo", message.getValue());
assertEquals(message3.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.STRING, message3.getValue().getSchemaType());
assertEquals("foo", message3.getValue().getNativeObject());
assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.STRING, message2.getValue().getSchemaType());
assertEquals("foo", message2.getValue().getNativeObject());

producer.close();
consumer.close();
Expand Down Expand Up @@ -324,12 +456,22 @@ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exc
producer.send("foo".getBytes(StandardCharsets.UTF_8));

Message<byte[]> message = consumer.receive();
Message<GenericRecord> message3 = consumer2.receive();
Message<GenericRecord> message2 = consumer2.receive();
if (schema == SchemaType.BYTES) {
assertEquals(schema, message.getSchema().get().getSchemaInfo().getType());
assertEquals(schema, message2.getSchema().get().getSchemaInfo().getType());
} else if (schema == SchemaType.NONE) {
// schema NONE is always reported as BYTES
assertEquals(SchemaType.BYTES, message.getSchema().get().getSchemaInfo().getType());
assertEquals(SchemaType.BYTES, message2.getSchema().get().getSchemaInfo().getType());
} else {
fail();
}

assertEquals("foo".getBytes(StandardCharsets.UTF_8), message.getValue());
assertEquals(message3.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.BYTES, message3.getValue().getSchemaType());
assertEquals("foo".getBytes(StandardCharsets.UTF_8), message3.getValue().getNativeObject());
assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
assertEquals(SchemaType.BYTES, message2.getValue().getSchemaType());
assertEquals("foo".getBytes(StandardCharsets.UTF_8), message2.getValue().getNativeObject());

producer.close();
consumer.close();
Expand Down Expand Up @@ -448,6 +590,12 @@ private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncoding
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(message.getValue(), message2.getValue().getNativeObject());

Schema<?> schema = message.getSchema().get();
Schema<?> schemaFromGenericRecord = message.getSchema().get();
KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema) schemaFromGenericRecord;
assertEquals(keyValueSchema.getSchemaInfo(), keyValueSchemaFromGenericRecord.getSchemaInfo());

if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
// with "SEPARATED encoding the routing key is the key of the KeyValue
assertNotNull(message.getKeyBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@ public interface Message<T> {
*/
byte[] getSchemaVersion();

/**
* Get the schema associated to the message.
* Please note that this schema is usually equal to the Schema you passed
* during the construction of the Consumer or the Reader.
* But if you are consuming the topic using the GenericObject interface
* this method will return the schema associated with the message.
* @return The schema used to decode the payload of message.
* @see Schema#AUTO_CONSUME()
*/
default Optional<Schema<?>> getSchema() {
return Optional.empty();
}

/**
* Check whether the message is replicated from other cluster.
*
Expand Down
Loading