From b585bb2fd743993f38284e251b3f2ba9119be1e6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 10 May 2021 08:51:55 +0200 Subject: [PATCH] Fix KeyValueSchema --- .../pulsar/client/api/SimpleSchemaTest.java | 7 +++---- .../client/impl/schema/KeyValueSchema.java | 21 ++++++++++++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 746c196f6acf01..c54149c9912246 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -835,11 +835,10 @@ public void testAutoKeyValueConsumeGenericObject() throws Exception { assertEquals(data.getValue().getField("i"), i * 1000); assertEquals(data.getKey().getField("j"), i); assertEquals(data.getValue().getField("j"), i * 20); + KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); + assertNotNull(keyValueSchema.getKeySchema()); + assertNotNull(keyValueSchema.getValueSchema()); } - - KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get(); - assertNotNull(keyValueSchema.getKeySchema()); - assertNotNull(keyValueSchema.getValueSchema()); } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index ce1fe668de8996..fe1c49f3eeb34c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -142,16 +142,27 @@ public byte[] encode(KeyValue message) { } } - public KeyValue decode(ByteBuf byteBuf) { - return decode(byteBuf, null); + @Override + public KeyValue decode(byte[] bytes) { + return decode(bytes, null); } - public KeyValue decode(ByteBuf bytes, byte[] schemaVersion) { + @Override + public KeyValue decode(byte[] bytes, byte[] schemaVersion) { if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) { throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type"); } - byte[] array = ByteBufUtil.getBytes(bytes); - return KeyValue.decode(array, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion)); + return KeyValue.decode(bytes, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion)); + } + + @Override + public KeyValue decode(ByteBuf byteBuf) { + return decode(ByteBufUtil.getBytes(byteBuf)); + } + + @Override + public T decode(ByteBuf byteBuf, byte[] schemaVersion) { + return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion); } public KeyValue decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {