Skip to content

Commit

Permalink
Merge pull request #11 from OSGP/feature/FDP-1737-consumer-record-for…
Browse files Browse the repository at this point in the history
…-verifying

FDP-1737: verify signing on ConsumerRecord instead of on ProducerRecord
  • Loading branch information
smvdheijden authored Feb 8, 2024
2 parents 46b1181 + 6c478eb commit 84999b1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.regex.Pattern;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

Expand Down Expand Up @@ -250,24 +251,24 @@ public boolean verify(final SignableMessageWrapper<?> message) {
}

/**
* Verifies the signature of the provided {@code producerRecord}.
* Verifies the signature of the provided {@code consumerRecord}.
*
* @param producerRecord the record to be verified
* @return {@code true} if the signature of the given {@code producerRecord} was verified; {@code false}
* @param consumerRecord the record to be verified
* @return {@code true} if the signature of the given {@code consumerRecord} was verified; {@code false}
* if not.
* @throws IllegalStateException if this message signer has a private key needed for signing
* messages, but does not have the public key for signature verification.
* @throws UncheckedIOException if determining the bytes throws an IOException.
* @throws UncheckedSecurityException if the signature verification process throws a
* SignatureException.
*/
public boolean verify(final ProducerRecord<String, ? extends SpecificRecordBase> producerRecord) {
public boolean verify(final ConsumerRecord<String, ? extends SpecificRecordBase> consumerRecord) {
if (!this.canVerifyMessageSignatures()) {
throw new IllegalStateException(
"This MessageSigner is not configured for verification, it can only be used for signing");
}

final Header header = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE);
final Header header = consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE);
if(header == null) {
throw new IllegalStateException(
"This ProducerRecord does not contain a signature header");
Expand All @@ -278,9 +279,9 @@ public boolean verify(final ProducerRecord<String, ? extends SpecificRecordBase>
}

try {
producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE);
consumerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE);
synchronized (this.verificationSignature) {
final SpecificRecordBase specificRecordBase = producerRecord.value();
final SpecificRecordBase specificRecordBase = consumerRecord.value();
return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(specificRecordBase));
}
} catch (final SignatureException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -111,9 +112,9 @@ void verifiesMessagesWithValidSignature() {

@Test
void verifiesRecordsWithValidSignature() {
final ProducerRecord<String, Message> producerRecord = this.properlySignedRecord();
final ConsumerRecord<String, Message> signedRecord = this.properlySignedRecord();

final boolean signatureWasVerified = this.messageSigner.verify(producerRecord);
final boolean signatureWasVerified = this.messageSigner.verify(signedRecord);

assertThat(signatureWasVerified).isTrue();
}
Expand All @@ -129,11 +130,11 @@ void doesNotVerifyMessagesWithoutSignature() {

@Test
void doesNotVerifyRecordsWithoutSignature() {
final ProducerRecord<String, Message> producerRecord = this.producerRecord();
final String expectedMessage = "This ProducerRecord does not contain a signature header";
final ConsumerRecord<String, Message> consumerRecord = this.consumerRecord();

final Exception exception = assertThrows(IllegalStateException.class, () ->
this.messageSigner.verify(producerRecord)
this.messageSigner.verify(consumerRecord)
);
final String actualMessage = exception.getMessage();

Expand Down Expand Up @@ -212,7 +213,8 @@ void recordHeaderSigningWorksWithKeysFromPemEncodedResources() {

final ProducerRecord<String, Message> producerRecord = this.producerRecord();
messageSignerWithKeysFromResources.sign(producerRecord);
final boolean signatureWasVerified = messageSignerWithKeysFromResources.verify(producerRecord);
final ConsumerRecord<String, Message> consumerRecord = this.producerRecordToConsumerRecord(producerRecord);
final boolean signatureWasVerified = messageSignerWithKeysFromResources.verify(consumerRecord);

assertThat(signatureWasVerified).isTrue();
}
Expand Down Expand Up @@ -242,10 +244,16 @@ private TestableWrapper properlySignedMessage() {
return messageWrapper;
}

private ProducerRecord<String, Message> properlySignedRecord() {
private ConsumerRecord<String, Message> properlySignedRecord() {
final ProducerRecord<String, Message> producerRecord = this.producerRecord();
this.messageSigner.sign(producerRecord);
return producerRecord;
return this.producerRecordToConsumerRecord(producerRecord);
}

private <K, V> ConsumerRecord<K, V> producerRecordToConsumerRecord(final ProducerRecord<K, V> producerRecord) {
final ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(producerRecord.topic(), 0, 123L, producerRecord.key(), producerRecord.value());
producerRecord.headers().forEach(header -> consumerRecord.headers().add(header));
return consumerRecord;
}

private byte[] randomSignature() {
Expand All @@ -264,9 +272,15 @@ private byte[] bytes(final ByteBuffer byteBuffer) {
}

private ProducerRecord<String, Message> producerRecord() {
final Message message = new Message("super special message");
return new ProducerRecord<>("topic", this.message());
}

private ConsumerRecord<String, Message> consumerRecord() {
return new ConsumerRecord<>("topic", 0, 123L, null, this.message());
}

return new ProducerRecord<>("topic", message);
private Message message() {
return new Message("super special message");
}

static class Message extends SpecificRecordBase {
Expand Down

0 comments on commit 84999b1

Please sign in to comment.