Skip to content

Commit

Permalink
[improve][test] add integration test for reading encrypted messages i…
Browse files Browse the repository at this point in the history
…n TableView
  • Loading branch information
rbarbey committed Dec 21, 2022
1 parent 7d9b868 commit e15314c
Showing 1 changed file with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
public class TableViewTest extends MockedPulsarServiceBaseTest {

private static final String ECDSA_PUBLIC_KEY = "src/test/resources/certificate/public-key.client-ecdsa.pem";
private static final String ECDSA_PRIVATE_KEY = "src/test/resources/certificate/private-key.client-ecdsa.pem";

@BeforeClass
@Override
Expand Down Expand Up @@ -305,4 +306,29 @@ public void testAck(boolean partitionedTopic) throws Exception {


}

@Test(timeOut = 30 * 1000)
public void testTableViewWithEncryptedMessages() throws Exception {
String topic = "persistent://public/default/tableview-encryption-test";
admin.topics().createPartitionedTopic(topic, 3);

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);

// TableView can read them using the private key
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
.topic(topic)
.autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
.defaultCryptoKeyReader("file:" + ECDSA_PRIVATE_KEY)
.create();
log.info("start tv size: {}", tv.size());
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
assertEquals(tv.size(), count);
});
assertEquals(tv.keySet(), keys);
}
}

0 comments on commit e15314c

Please sign in to comment.