Skip to content

Commit

Permalink
Allow a crypto failure action to be set when building a TableView
Browse files Browse the repository at this point in the history
- by default, this action is set to ConsumerCryptoFailureAction.FAIL
- added to the reader when the TableView is being built
  • Loading branch information
rbarbey committed Dec 20, 2022
1 parent 1ef2a3d commit ece6706
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,12 @@ public interface TableViewBuilder<T> {
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

/**
* Set the {@link ConsumerCryptoFailureAction} to specify.
*
* @param action the action to take when the decoding fails
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -102,4 +103,10 @@ public TableViewBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> p
checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
}

@Override
public TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;

import lombok.Data;
Expand All @@ -35,6 +36,8 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private long autoUpdatePartitionsSeconds = 60;

private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

@Override
public TableViewConfigurationData clone() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class TableViewImpl<T> implements TableView<T> {
readerBuilder.cryptoKeyReader(cryptoKeyReader);
}

readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());

this.reader = readerBuilder.createAsync();
}

Expand Down

0 comments on commit ece6706

Please sign in to comment.