diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 0f141d09680e38..66a9f782e9f836 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -534,7 +534,7 @@ public void testAutoConsume(boolean batching) throws Exception { org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchemaInternal().getNativeSchema().get(); assertNotNull(avroSchema); - org.apache.avro.Schema avroSchema2 = (org.apache.avro.Schema) data.getSchema().get().getNativeSchema().get(); + org.apache.avro.Schema avroSchema2 = (org.apache.avro.Schema) data.getReaderSchema().get().getNativeSchema().get(); assertNotNull(avroSchema2); } @@ -605,7 +605,7 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception { assertNotNull(data.getSchemaVersion()); assertEquals(data.getValue().getKey().getField("i"), i * 100); assertEquals(data.getValue().getValue().getField("i"), i * 1000); - KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); assertNotNull(keyValueSchema.getKeySchema()); assertNotNull(keyValueSchema.getValueSchema()); } @@ -616,7 +616,7 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception { assertNotNull(data.getSchemaVersion()); assertEquals(data.getValue().getKey().i, i * 100); assertEquals(data.getValue().getValue().i, i * 1000); - KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); assertNotNull(keyValueSchema.getKeySchema()); assertNotNull(keyValueSchema.getValueSchema()); } @@ -627,7 +627,7 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception { assertNotNull(data.getSchemaVersion()); assertEquals(data.getValue().getKey().getField("i"), i * 100); assertEquals(data.getValue().getValue().i, i * 1000); - KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); assertNotNull(keyValueSchema.getKeySchema()); assertNotNull(keyValueSchema.getValueSchema()); } @@ -638,7 +638,7 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception { assertNotNull(data.getSchemaVersion()); assertEquals(data.getValue().getKey().i, i * 100); assertEquals(data.getValue().getValue().getField("i"), i * 1000); - KeyValueSchema keyValueSchema = (KeyValueSchema) data.getSchema().get(); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); assertNotNull(keyValueSchema.getKeySchema()); assertNotNull(keyValueSchema.getValueSchema()); } @@ -698,9 +698,9 @@ public void testGetNativeSchemaWithAutoConsumeWithMultiVersion() throws Exceptio v2DataProducer.send(new V2Data()); Message messageV1 = consumer.receive(); - Schema schemaV1 = messageV1.getSchema().get(); + Schema schemaV1 = messageV1.getReaderSchema().get(); Message messageV2 = consumer.receive(); - Schema schemaV2 = messageV2.getSchema().get(); + Schema schemaV2 = messageV2.getReaderSchema().get(); log.info("schemaV1 {} {}", schemaV1.getSchemaInfo(), schemaV1.getNativeSchema()); log.info("schemaV2 {} {}", schemaV2.getSchemaInfo(), schemaV2.getNativeSchema()); assertTrue(schemaV1.getSchemaInfo().getSchemaDefinition().contains("V1Data")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 078a056697a0be..744a9504bc2944 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -169,7 +169,7 @@ public void testMultiTopicSetSchemaProvider() throws Exception { Schemas.PersonTwo personConsume = message.getValue(); assertEquals(personConsume.getName(), "Tom"); assertEquals(personConsume.getId(), 1); - Schema schema = message.getSchema().get(); + Schema schema = message.getReaderSchema().get(); log.info("the-schema {}", schema); assertEquals(personTwoSchema.getSchemaInfo(), schema.getSchemaInfo()); org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getNativeSchema().get(); @@ -179,7 +179,7 @@ public void testMultiTopicSetSchemaProvider() throws Exception { // verify that with AUTO_CONSUME we can access the original schema // and the Native AVRO schema Message message2 = consumer2.receive(); - Schema schema2 = message2.getSchema().get(); + Schema schema2 = message2.getReaderSchema().get(); log.info("the-schema {}", schema2); assertEquals(personTwoSchema.getSchemaInfo(), schema2.getSchemaInfo()); org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getNativeSchema().get(); @@ -269,7 +269,7 @@ public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception { Schemas.PersonTwo personConsume = message.getValue().getValue(); assertEquals(personConsume.getName(), "Tom"); assertEquals(personConsume.getId(), 1); - KeyValueSchema schema = (KeyValueSchema) message.getSchema().get(); + KeyValueSchema schema = (KeyValueSchema) message.getReaderSchema().get(); log.info("the-schema {}", schema); assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo()); org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get(); @@ -279,7 +279,7 @@ public void testMultiTopicSetSchemaProviderWithKeyValue() throws Exception { // verify that with AUTO_CONSUME we can access the original schema // and the Native AVRO schema Message message2 = consumer2.receive(); - KeyValueSchema schema2 = (KeyValueSchema) message2.getSchema().get(); + KeyValueSchema schema2 = (KeyValueSchema) message2.getReaderSchema().get(); log.info("the-schema {}", schema2); assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo()); org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get(); @@ -337,8 +337,8 @@ public void testJSONSchemaDeserialize() throws Exception { assertEquals(message.getValue().getField("address").getClass(), message1.getValue().getAddress().getClass()); - Schema schema = message.getSchema().get(); - Schema schema1 = message1.getSchema().get(); + Schema schema = message.getReaderSchema().get(); + Schema schema1 = message1.getReaderSchema().get(); log.info("schema {}", schema); log.info("schema1 {}", schema1); assertEquals(schema.getSchemaInfo(), schema1.getSchemaInfo()); @@ -387,8 +387,8 @@ public void testStringSchema() throws Exception { Message message = consumer.receive(); Message message2 = consumer2.receive(); - assertEquals(SchemaType.STRING, message.getSchema().get().getSchemaInfo().getType()); - assertEquals(SchemaType.STRING, message2.getSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.STRING, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.STRING, message2.getReaderSchema().get().getSchemaInfo().getType()); assertEquals("foo", message.getValue()); assertEquals(message2.getValue().getClass().getName(), "org.apache.pulsar.client.impl.schema.GenericObjectWrapper"); @@ -458,12 +458,12 @@ private void testUseAutoConsumeWithSchemalessTopic(SchemaType schema) throws Exc Message message = consumer.receive(); Message message2 = consumer2.receive(); if (schema == SchemaType.BYTES) { - assertEquals(schema, message.getSchema().get().getSchemaInfo().getType()); - assertEquals(schema, message2.getSchema().get().getSchemaInfo().getType()); + assertEquals(schema, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(schema, message2.getReaderSchema().get().getSchemaInfo().getType()); } else if (schema == SchemaType.NONE) { // schema NONE is always reported as BYTES - assertEquals(SchemaType.BYTES, message.getSchema().get().getSchemaInfo().getType()); - assertEquals(SchemaType.BYTES, message2.getSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.BYTES, message.getReaderSchema().get().getSchemaInfo().getType()); + assertEquals(SchemaType.BYTES, message2.getReaderSchema().get().getSchemaInfo().getType()); } else { fail(); } @@ -590,8 +590,8 @@ private void testKeyValueSchemaWithStructs(KeyValueEncodingType keyValueEncoding Message message2 = consumer2.receive(); assertEquals(message.getValue(), message2.getValue().getNativeObject()); - Schema schema = message.getSchema().get(); - Schema schemaFromGenericRecord = message.getSchema().get(); + Schema schema = message.getReaderSchema().get(); + Schema schemaFromGenericRecord = message.getReaderSchema().get(); KeyValueSchema keyValueSchema = (KeyValueSchema) schema; KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema) schemaFromGenericRecord; assertEquals(keyValueSchema.getSchemaInfo(), keyValueSchemaFromGenericRecord.getSchemaInfo()); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java index f6ca2894bd9940..b456373a8feb84 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java @@ -217,7 +217,7 @@ public interface Message { * @return The schema used to decode the payload of message. * @see Schema#AUTO_CONSUME() */ - default Optional> getSchema() { + default Optional> getReaderSchema() { return Optional.empty(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index d64517fb5c09b6..c494c0ce1265a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -389,7 +389,7 @@ public Schema getSchemaInternal() { } @Override - public Optional> getSchema() { + public Optional> getReaderSchema() { ensureSchemaIsLoaded(); if (schema == null) { return Optional.empty(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 228d78eece5d38..e41d1b2fe19828 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -191,8 +191,8 @@ public Schema getSchemaInternal() { } @Override - public Optional> getSchema() { - return msg.getSchema(); + public Optional> getReaderSchema() { + return msg.getReaderSchema(); } @Override