Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 3, 2021
1 parent 791a560 commit 93a7c2d
Showing 1 changed file with 107 additions and 2 deletions.
109 changes: 107 additions & 2 deletions pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,106 @@ public void testMultiTopicSetSchemaProvider() throws Exception {
consumer.close();
}

@Test
public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicOne = "test-multi-version-schema-one";
final String topicTwo = "test-multi-version-schema-two";
final String fqtnOne = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicOne
).toString();

final String fqtnTwo = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicTwo
).toString();


admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

admin.topics().createPartitionedTopic(fqtnOne, 3);
admin.topics().createPartitionedTopic(fqtnTwo, 3);

Schema<Schemas.PersonOne> schemaOne = Schema.AVRO(
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonOne.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaOne).getSchemaInfo());

Schema<Schemas.PersonTwo> schemaTwo = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnOne, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());

Schema<Schemas.PersonTwo> personTwoSchema = Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build());
admin.schemas().createSchema(fqtnTwo, Schema.KeyValue(Schema.STRING, schemaTwo).getSchemaInfo());

Producer<KeyValue<String, Schemas.PersonTwo>> producer = pulsarClient.newProducer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.topic(fqtnOne)
.create();

Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");


Consumer<KeyValue<String, Schemas.PersonTwo>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build())))
.subscriptionName("test")
.topic(fqtnOne, fqtnTwo)
.subscribe();

Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test2")
.topic(fqtnOne, fqtnTwo)
.subscribe();

producer.send(new KeyValue<>("foo", personTwo));

Message<KeyValue<String, Schemas.PersonTwo>> message = consumer.receive();
assertEquals("foo", message.getValue().getKey());
Schemas.PersonTwo personConsume = message.getValue().getValue();
assertEquals(personConsume.getName(), "Tom");
assertEquals(personConsume.getId(), 1);
KeyValueSchema schema = (KeyValueSchema) message.getSchema().get();
log.info("the-schema {}", schema);
assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema);
assertNotNull(nativeSchema);

// verify that with AUTO_CONSUME we can access the original schema
// and the Native AVRO schema
Message<?> message2 = consumer2.receive();
KeyValueSchema schema2 = (KeyValueSchema) message2.getSchema().get();
log.info("the-schema {}", schema2);
assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo());
org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
log.info("nativeSchema-schema {}", nativeSchema2);
assertNotNull(nativeSchema2);

producer.close();
consumer.close();
}

@Test
public void testJSONSchemaDeserialize() throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down Expand Up @@ -357,8 +457,13 @@ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exc

Message<byte[]> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertFalse(message.getSchema().isPresent());
assertFalse(message2.getSchema().isPresent());
if (schema == SchemaType.BYTES) {
assertEquals(schema, message.getSchema().get().getSchemaInfo().getType());
assertEquals(schema, message2.getSchema().get().getSchemaInfo().getType());
} else {
assertFalse(message.getSchema().isPresent());
assertEquals(schema, message2.getSchema().get().getSchemaInfo().getType());
}

assertEquals("foo".getBytes(StandardCharsets.UTF_8), message.getValue());
assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper");
Expand Down

0 comments on commit 93a7c2d

Please sign in to comment.