Skip to content

Commit

Permalink
[fix][client] add support to TableView to read encrypted messages (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
rbarbey authored and tisonkun committed Dec 27, 2022
1 parent 136b694 commit 95c06b0
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
@Test(groups = "broker-impl")
public class TableViewTest extends MockedPulsarServiceBaseTest {

private static final String ECDSA_PUBLIC_KEY = "src/test/resources/certificate/public-key.client-ecdsa.pem";
private static final String ECDSA_PRIVATE_KEY = "src/test/resources/certificate/private-key.client-ecdsa.pem";

@BeforeClass
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -87,6 +90,10 @@ public static Object[] topicDomain() {
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, count, enableBatch, false);
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
Expand All @@ -100,6 +107,10 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
} else {
builder.enableBatching(false);
}
if (enableEncryption) {
builder.addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader("file:./" + ECDSA_PUBLIC_KEY);
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -343,4 +354,29 @@ public void accept(String s, String s2) {

assertEquals(mockAction.acceptedCount, 5);
}

@Test(timeOut = 30 * 1000)
public void testTableViewWithEncryptedMessages() throws Exception {
String topic = "persistent://public/default/tableview-encryption-test";
admin.topics().createPartitionedTopic(topic, 3);

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);

// TableView can read them using the private key
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
.topic(topic)
.autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
.defaultCryptoKeyReader("file:" + ECDSA_PRIVATE_KEY)
.create();
log.info("start tv size: {}", tv.size());
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
assertEquals(tv.size(), count);
});
assertEquals(tv.keySet(), keys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,41 @@ public interface TableViewBuilder<T> {
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> subscriptionName(String subscriptionName);

/**
* Set the {@link CryptoKeyReader} to decrypt the message payloads.
*
* @param cryptoKeyReader CryptoKeyReader object
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

/**
* Set the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt message payloads.
*
* @param privateKey the private key that is always used to decrypt message payloads.
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> defaultCryptoKeyReader(String privateKey);

/**
* Set the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt message payloads.
*
* @param privateKeys the map of private key names and their URIs
* used to decrypt message payloads.
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

/**
* Set the {@link ConsumerCryptoFailureAction} to specify.
*
* @param action the action to take when the decoding fails
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
Expand Down Expand Up @@ -82,4 +85,28 @@ public TableViewBuilder<T> subscriptionName(String subscriptionName) {
conf.setSubscriptionName(StringUtils.trim(subscriptionName));
return this;
}

@Override
public TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}

@Override
public TableViewBuilder<T> defaultCryptoKeyReader(String privateKey) {
checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build());
}

@Override
public TableViewBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> privateKeys) {
checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
}

@Override
public TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.Serializable;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;

@Data
@NoArgsConstructor
Expand All @@ -32,6 +34,9 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private long autoUpdatePartitionsSeconds = 60;
private String topicCompactionStrategyClassName = null;

private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@Override
public TableViewConfigurationData clone() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -75,6 +76,14 @@ public class TableViewImpl<T> implements TableView<T> {
if (isPersistentTopic) {
readerBuilder.readCompacted(true);
}

CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader();
if (cryptoKeyReader != null) {
readerBuilder.cryptoKeyReader(cryptoKeyReader);
}

readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());

this.reader = readerBuilder.createAsync();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;

/**
* Unit tests of {@link TablewViewBuilderImpl}.
*/
public class TableViewBuilderImplTest {

private static final String TOPIC_NAME = "testTopicName";
private PulsarClientImpl client;
private TableViewBuilderImpl tableViewBuilderImpl;

@BeforeClass(alwaysRun = true)
public void setup() {
Reader reader = mock(Reader.class);
when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf());
client = mock(PulsarClientImpl.class);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
when(client.createReaderAsync(any(ReaderConfigurationData.class), any(Schema.class)))
.thenReturn(CompletableFuture.completedFuture(reader));
tableViewBuilderImpl = new TableViewBuilderImpl(client, Schema.BYTES);
}

@Test
public void testTableViewBuilderImpl() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.subscriptionName("testSubscriptionName")
.cryptoKeyReader(mock(CryptoKeyReader.class))
.cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
.create();

assertNotNull(tableView);
}

@Test
public void testTableViewBuilderImplWhenOnlyTopicNameIsSet() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.create();

assertNotNull(tableView);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenTopicIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenTopicIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic("").create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenAutoUpdatePartitionsIntervalIsSmallerThanOneSecond() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).autoUpdatePartitionsInterval(100, TimeUnit.MILLISECONDS).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenSubscriptionNameIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).subscriptionName(null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenSubscriptionNameIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).subscriptionName("").create();
}

@Test
public void testTableViewBuilderImplWithCryptoKeyReader() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.cryptoKeyReader(mock(CryptoKeyReader.class))
.create();

assertNotNull(tableView);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((String) null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader("").create();
}

@Test(expectedExceptions = NullPointerException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsNullMap() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((Map<String, String>) null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsEmptyMap() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader(new HashMap<String, String>()).create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.TableViewConfigurationData;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;

public class TableViewImplTest {

private PulsarClientImpl client;
private TableViewConfigurationData data;

@BeforeClass(alwaysRun = true)
public void setup() {
client = mock(PulsarClientImpl.class);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));

data = new TableViewConfigurationData();
data.setTopicName("testTopicName");
}

@Test
public void testTableViewImpl() {
data.setCryptoKeyReader(mock(CryptoKeyReader.class));
TableView tableView = new TableViewImpl(client, Schema.BYTES, data);

assertNotNull(tableView);
}
}

0 comments on commit 95c06b0

Please sign in to comment.