Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] add support to TableView to read encrypted messages #19008

Merged
merged 9 commits into from
Dec 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
@Test(groups = "broker-impl")
public class TableViewTest extends MockedPulsarServiceBaseTest {

private static final String ECDSA_PUBLIC_KEY = "src/test/resources/certificate/public-key.client-ecdsa.pem";
private static final String ECDSA_PRIVATE_KEY = "src/test/resources/certificate/private-key.client-ecdsa.pem";

@BeforeClass
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -87,6 +90,10 @@ public static Object[] topicDomain() {
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, count, enableBatch, false);
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch, boolean enableEncryption) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
Expand All @@ -100,6 +107,10 @@ private Set<String> publishMessages(String topic, int count, boolean enableBatch
} else {
builder.enableBatching(false);
}
if (enableEncryption) {
builder.addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader("file:./" + ECDSA_PUBLIC_KEY);
}
try (Producer<byte[]> producer = builder.create()) {
CompletableFuture<?> lastFuture = null;
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -343,4 +354,29 @@ public void accept(String s, String s2) {

assertEquals(mockAction.acceptedCount, 5);
}

@Test(timeOut = 30 * 1000)
public void testTableViewWithEncryptedMessages() throws Exception {
String topic = "persistent://public/default/tableview-encryption-test";
admin.topics().createPartitionedTopic(topic, 3);

// publish encrypted messages
int count = 20;
Set<String> keys = this.publishMessages(topic, count, false, true);

// TableView can read them using the private key
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
.topic(topic)
.autoUpdatePartitionsInterval(60, TimeUnit.SECONDS)
.defaultCryptoKeyReader("file:" + ECDSA_PRIVATE_KEY)
.create();
log.info("start tv size: {}", tv.size());
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
assertEquals(tv.size(), count);
});
assertEquals(tv.keySet(), keys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,41 @@ public interface TableViewBuilder<T> {
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> subscriptionName(String subscriptionName);

/**
* Set the {@link CryptoKeyReader} to decrypt the message payloads.
*
* @param cryptoKeyReader CryptoKeyReader object
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

/**
* Set the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt message payloads.
*
* @param privateKey the private key that is always used to decrypt message payloads.
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> defaultCryptoKeyReader(String privateKey);

/**
* Set the default implementation of {@link CryptoKeyReader}.
*
* <p>Configure the key reader to be used to decrypt message payloads.
*
* @param privateKeys the map of private key names and their URIs
* used to decrypt message payloads.
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

/**
* Set the {@link ConsumerCryptoFailureAction} to specify.
*
* @param action the action to take when the decoding fails
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
Expand Down Expand Up @@ -82,4 +85,28 @@ public TableViewBuilder<T> subscriptionName(String subscriptionName) {
conf.setSubscriptionName(StringUtils.trim(subscriptionName));
return this;
}

@Override
public TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}

@Override
public TableViewBuilder<T> defaultCryptoKeyReader(String privateKey) {
checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build());
}

@Override
public TableViewBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> privateKeys) {
checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
}

@Override
public TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.Serializable;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;

@Data
@NoArgsConstructor
Expand All @@ -32,6 +34,9 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private long autoUpdatePartitionsSeconds = 60;
private String topicCompactionStrategyClassName = null;

private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@Override
public TableViewConfigurationData clone() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -75,6 +76,14 @@ public class TableViewImpl<T> implements TableView<T> {
if (isPersistentTopic) {
readerBuilder.readCompacted(true);
}

CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader();
if (cryptoKeyReader != null) {
readerBuilder.cryptoKeyReader(cryptoKeyReader);
}

readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());

this.reader = readerBuilder.createAsync();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;

/**
* Unit tests of {@link TablewViewBuilderImpl}.
*/
public class TableViewBuilderImplTest {

private static final String TOPIC_NAME = "testTopicName";
private PulsarClientImpl client;
private TableViewBuilderImpl tableViewBuilderImpl;

@BeforeClass(alwaysRun = true)
public void setup() {
Reader reader = mock(Reader.class);
when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf());
client = mock(PulsarClientImpl.class);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
when(client.createReaderAsync(any(ReaderConfigurationData.class), any(Schema.class)))
.thenReturn(CompletableFuture.completedFuture(reader));
tableViewBuilderImpl = new TableViewBuilderImpl(client, Schema.BYTES);
}

@Test
public void testTableViewBuilderImpl() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.subscriptionName("testSubscriptionName")
.cryptoKeyReader(mock(CryptoKeyReader.class))
.cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
.create();

assertNotNull(tableView);
}

@Test
public void testTableViewBuilderImplWhenOnlyTopicNameIsSet() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.create();

assertNotNull(tableView);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenTopicIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenTopicIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic("").create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenAutoUpdatePartitionsIntervalIsSmallerThanOneSecond() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).autoUpdatePartitionsInterval(100, TimeUnit.MILLISECONDS).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenSubscriptionNameIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).subscriptionName(null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewBuilderImplWhenSubscriptionNameIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).subscriptionName("").create();
}

@Test
public void testTableViewBuilderImplWithCryptoKeyReader() throws PulsarClientException {
TableView tableView = tableViewBuilderImpl.topic(TOPIC_NAME)
.cryptoKeyReader(mock(CryptoKeyReader.class))
.create();

assertNotNull(tableView);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsNullString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((String) null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsEmptyString() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader("").create();
}

@Test(expectedExceptions = NullPointerException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsNullMap() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((Map<String, String>) null).create();
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testTableViewImplWhenDefaultCryptoKeyReaderIsEmptyMap() throws PulsarClientException {
tableViewBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader(new HashMap<String, String>()).create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.impl.TableViewConfigurationData;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;

public class TableViewImplTest {

private PulsarClientImpl client;
private TableViewConfigurationData data;

@BeforeClass(alwaysRun = true)
public void setup() {
client = mock(PulsarClientImpl.class);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));

data = new TableViewConfigurationData();
data.setTopicName("testTopicName");
}

@Test
public void testTableViewImpl() {
data.setCryptoKeyReader(mock(CryptoKeyReader.class));
TableView tableView = new TableViewImpl(client, Schema.BYTES, data);

assertNotNull(tableView);
}
}