Skip to content

Commit

Permalink
Fix KeyValueSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 10, 2021
1 parent 2787101 commit b585bb2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -835,11 +835,10 @@ public void testAutoKeyValueConsumeGenericObject() throws Exception {
assertEquals(data.getValue().getField("i"), i * 1000);
assertEquals(data.getKey().getField("j"), i);
assertEquals(data.getValue().getField("j"), i * 20);
KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}

KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get();
assertNotNull(keyValueSchema.getKeySchema());
assertNotNull(keyValueSchema.getValueSchema());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,27 @@ public byte[] encode(KeyValue<K, V> message) {
}
}

public KeyValue<K, V> decode(ByteBuf byteBuf) {
return decode(byteBuf, null);
@Override
public KeyValue<K, V> decode(byte[] bytes) {
return decode(bytes, null);
}

public KeyValue<K, V> decode(ByteBuf bytes, byte[] schemaVersion) {
@Override
public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {
if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
}
byte[] array = ByteBufUtil.getBytes(bytes);
return KeyValue.decode(array, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion));
return KeyValue.decode(bytes, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion));
}

@Override
public KeyValue<K, V> decode(ByteBuf byteBuf) {
return decode(ByteBufUtil.getBytes(byteBuf));
}

@Override
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion);
}

public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
Expand Down

0 comments on commit b585bb2

Please sign in to comment.