diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt index 590374d..595fe5d 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt @@ -7,6 +7,7 @@ import com.gxf.utilities.kafka.avro.AvroEncoder import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper import java.io.IOException import java.io.UncheckedIOException +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.security.* import java.security.spec.X509EncodedKeySpec @@ -83,7 +84,7 @@ class MessageSigner(properties: MessageSigningProperties) { ): ProducerRecord { if (this.signingEnabled) { val signature = this.signature(producerRecord) - producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature) + producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature.array()) } return producerRecord } @@ -101,15 +102,15 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedIOException if determining the bytes for the message throws an IOException. * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ - private fun signature(message: SignableMessageWrapper<*>): ByteArray { + private fun signature(message: SignableMessageWrapper<*>): ByteBuffer { check(this.canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } val oldSignature = message.getSignature() message.setSignature(null) - val byteArray = this.toByteArray(message) + val byteBuffer = this.toByteBuffer(message) try { - return signature(byteArray) + return signature(byteBuffer) } catch (e: SignatureException) { throw UncheckedSecurityException("Unable to sign message", e) } finally { @@ -130,16 +131,16 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedIOException if determining the bytes throws an IOException. * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ - private fun signature(producerRecord: ProducerRecord): ByteArray { + private fun signature(producerRecord: ProducerRecord): ByteBuffer { check(this.canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } val oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE) val specificRecordBase = producerRecord.value() - val byteArray = this.toByteArray(specificRecordBase) + val byteBuffer = this.toByteBuffer(specificRecordBase) try { - return signature(byteArray) + return signature(byteBuffer) } catch (e: SignatureException) { throw UncheckedSecurityException("Unable to sign message", e) } finally { @@ -149,16 +150,16 @@ class MessageSigner(properties: MessageSigningProperties) { } } - private fun signature(byteArray: ByteArray): ByteArray { - val messageBytes: ByteArray = + private fun signature(byteBuffer: ByteBuffer): ByteBuffer { + val messageBytes: ByteBuffer = if (this.stripAvroHeader) { - this.stripAvroHeader(byteArray) + this.stripAvroHeader(byteBuffer) } else { - byteArray + byteBuffer } val signingSignature = signatureInstance(signatureAlgorithm, signatureProvider, signingKey!!) signingSignature.update(messageBytes) - return signingSignature.sign() + return ByteBuffer.wrap(signingSignature.sign()) } fun canVerifyMessageSignatures(): Boolean { @@ -179,14 +180,14 @@ class MessageSigner(properties: MessageSigningProperties) { val messageSignature = message.getSignature() - if (messageSignature == null || messageSignature.isEmpty()) { + if (messageSignature == null) { logger.error("This message does not contain a signature") return false } try { message.setSignature(null) - return this.verifySignatureBytes(messageSignature, this.toByteArray(message)) + return this.verifySignatureBytes(messageSignature, this.toByteBuffer(message)) } catch (e: Exception) { logger.error("Unable to verify message signature", e) return false @@ -221,7 +222,7 @@ class MessageSigner(properties: MessageSigningProperties) { try { val specificRecordBase: SpecificRecordBase = consumerRecord.value() - return this.verifySignatureBytes(signatureBytes, this.toByteArray(specificRecordBase)) + return this.verifySignatureBytes(ByteBuffer.wrap(signatureBytes), this.toByteBuffer(specificRecordBase)) } catch (e: Exception) { logger.error("Unable to verify message signature", e) return false @@ -229,42 +230,42 @@ class MessageSigner(properties: MessageSigningProperties) { } @Throws(SignatureException::class) - private fun verifySignatureBytes(signatureBytes: ByteArray, messageByteArray: ByteArray): Boolean { - val messageBytes: ByteArray = + private fun verifySignatureBytes(signatureBytes: ByteBuffer, messageByteBuffer: ByteBuffer): Boolean { + val messageBytes: ByteBuffer = if (this.stripAvroHeader) { - this.stripAvroHeader(messageByteArray) + this.stripAvroHeader(messageByteBuffer) } else { - messageByteArray + messageByteBuffer } val verificationSignature = signatureInstance(signatureAlgorithm, signatureProvider, verificationKey!!) verificationSignature.update(messageBytes) - return verificationSignature.verify(signatureBytes) + return verificationSignature.verify(signatureBytes.array()) } - private fun hasAvroHeader(bytes: ByteArray): Boolean { - return (bytes.size >= AVRO_HEADER_LENGTH) && + private fun hasAvroHeader(bytes: ByteBuffer): Boolean { + return (bytes.array().size >= AVRO_HEADER_LENGTH) && ((bytes[0].toInt() and 0xFF) == 0xC3) && ((bytes[1].toInt() and 0xFF) == 0x01) } - private fun stripAvroHeader(bytes: ByteArray): ByteArray { + private fun stripAvroHeader(bytes: ByteBuffer): ByteBuffer { if (this.hasAvroHeader(bytes)) { - return Arrays.copyOfRange(bytes, AVRO_HEADER_LENGTH, bytes.size) + return ByteBuffer.wrap(Arrays.copyOfRange(bytes.array(), AVRO_HEADER_LENGTH, bytes.array().size)) } return bytes } - private fun toByteArray(message: SignableMessageWrapper<*>): ByteArray { + private fun toByteBuffer(message: SignableMessageWrapper<*>): ByteBuffer { try { - return message.toByteArray() + return message.toByteBuffer() } catch (e: IOException) { throw UncheckedIOException("Unable to determine bytes for message", e) } } - private fun toByteArray(message: SpecificRecordBase): ByteArray { + private fun toByteBuffer(message: SpecificRecordBase): ByteBuffer { try { - return AvroEncoder.encode(message) + return ByteBuffer.wrap(AvroEncoder.encode(message)) } catch (e: IOException) { throw UncheckedIOException("Unable to determine bytes for message", e) } diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt index 7033ae7..2a7af0c 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt @@ -4,6 +4,7 @@ package com.gxf.utilities.kafka.message.wrapper import java.io.IOException +import java.nio.ByteBuffer /** * Wrapper for signable messages. Because these messages are generated from Avro schemas, they can't be changed. This @@ -11,12 +12,12 @@ import java.io.IOException */ abstract class SignableMessageWrapper(val message: T) { - /** @return ByteArray of the whole message */ - @Throws(IOException::class) abstract fun toByteArray(): ByteArray + /** @return ByteBuffer of the whole message */ + @Throws(IOException::class) abstract fun toByteBuffer(): ByteBuffer - /** @return ByteArray of the signature in the message */ - abstract fun getSignature(): ByteArray? + /** @return ByteBuffer of the signature in the message */ + abstract fun getSignature(): ByteBuffer? - /** @param signature The signature in ByteArray form to be set on the message */ - abstract fun setSignature(signature: ByteArray?) + /** @param signature The signature in ByteBuffer form to be set on the message */ + abstract fun setSignature(signature: ByteBuffer?) } diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt index 8027201..c34122b 100644 --- a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt @@ -4,6 +4,7 @@ package com.gxf.utilities.kafka.message.signing import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.security.SecureRandom import java.util.Random @@ -68,10 +69,10 @@ class MessageSignerTest { fun signsRecordHeaderReplacingSignature() { val randomSignature = this.randomSignature() val record = this.producerRecord() - record.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature) + record.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) val actualSignatureBefore = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() - assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature) + assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature.array()) messageSigner.signUsingHeader(record) @@ -129,7 +130,7 @@ class MessageSignerTest { fun doesNotVerifyRecordsWithIncorrectSignature() { val consumerRecord = this.consumerRecord() val randomSignature = this.randomSignature() - consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature) + consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) val validSignature = messageSigner.verifyUsingHeader(consumerRecord) @@ -160,7 +161,7 @@ class MessageSignerTest { return TestableWrapper() } - private fun messageWrapper(signature: ByteArray): TestableWrapper { + private fun messageWrapper(signature: ByteBuffer): TestableWrapper { val testableWrapper = TestableWrapper() testableWrapper.setSignature(signature) return testableWrapper @@ -185,14 +186,14 @@ class MessageSignerTest { return consumerRecord } - private fun randomSignature(): ByteArray { + private fun randomSignature(): ByteBuffer { val random: Random = SecureRandom() val keySize = 2048 val signature = ByteArray(keySize / 8) random.nextBytes(signature) - return signature + return ByteBuffer.wrap(signature) } private fun producerRecord(): ProducerRecord { @@ -225,17 +226,17 @@ class MessageSignerTest { } private class TestableWrapper : SignableMessageWrapper("Some test message") { - private var signature: ByteArray? = null + private var signature: ByteBuffer? = null - override fun toByteArray(): ByteArray { - return message.toByteArray(StandardCharsets.UTF_8) + override fun toByteBuffer(): ByteBuffer { + return ByteBuffer.wrap(message.toByteArray(StandardCharsets.UTF_8)) } - override fun getSignature(): ByteArray? { + override fun getSignature(): ByteBuffer? { return this.signature } - override fun setSignature(signature: ByteArray?) { + override fun setSignature(signature: ByteBuffer?) { this.signature = signature } }