diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 8cd166aa5f0547..76b8ff4fbdac31 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -129,4 +129,12 @@ public interface TableViewBuilder { * @return the {@link TableViewBuilder} builder instance */ TableViewBuilder defaultCryptoKeyReader(Map privateKeys); + + /** + * Set the {@link ConsumerCryptoFailureAction} to specify. + * + * @param action the action to take when the decoding fails + * @return the {@link TableViewBuilder} builder instance + */ + TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index 545633b734a488..e0a47a70b1c8bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -102,4 +103,10 @@ public TableViewBuilder defaultCryptoKeyReader(@NonNull Map p checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty"); return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build()); } + + @Override + public TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) { + conf.setCryptoFailureAction(action); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java index 8027bc96fd6f22..f43774959882f6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java @@ -20,6 +20,7 @@ import java.io.Serializable; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; import lombok.Data; @@ -35,6 +36,8 @@ public class TableViewConfigurationData implements Serializable, Cloneable { private long autoUpdatePartitionsSeconds = 60; private CryptoKeyReader cryptoKeyReader = null; + private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + @Override public TableViewConfigurationData clone() { try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 1f1bd9f64600f7..639f142ca4ab3a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -80,6 +80,8 @@ public class TableViewImpl implements TableView { readerBuilder.cryptoKeyReader(cryptoKeyReader); } + readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); + this.reader = readerBuilder.createAsync(); }