From 8a7ce527bc15cec490b05536f9e87a987d2c282e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 19 Jul 2024 12:48:10 +0300 Subject: [PATCH 1/2] Pulsar config refactoring Fixes #2631 --- .../messaging/pulsar/ConfigResolver.java | 113 ++++++++++++++++++ .../messaging/pulsar/PulsarConnector.java | 31 +---- .../pulsar/PulsarIncomingChannel.java | 53 +------- .../pulsar/PulsarOutgoingChannel.java | 15 +-- .../pulsar/batch/PulsarBatchReceiveTest.java | 36 +++++- 5 files changed, 154 insertions(+), 94 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java index 2cd43ddc31..b585467cb9 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java @@ -1,9 +1,11 @@ package io.smallrye.reactive.messaging.pulsar; import static io.smallrye.reactive.messaging.providers.helpers.CDIUtils.getInstanceById; +import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import jakarta.enterprise.context.ApplicationScoped; @@ -11,10 +13,18 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RedeliveryBackoff; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -30,6 +40,7 @@ import io.smallrye.reactive.messaging.ClientCustomizer; import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils; +import io.smallrye.reactive.messaging.providers.helpers.Validation; import io.vertx.core.json.JsonObject; /** @@ -110,6 +121,33 @@ public ClientBuilderImpl customize(ClientBuilderImpl builder, PulsarConnectorCom return (ClientBuilderImpl) ConfigUtils.customize(cc.config(), clientConfigCustomizers, builder); } + public ClientBuilderImpl configure(PulsarConnectorCommonConfiguration cc, ClientConfigurationData conf) + throws PulsarClientException { + setAuth(conf); + return customize(new ClientBuilderImpl(conf), cc); + } + + /** + * Sets the authentication object in the given configuration object using + * `authPluginClassName` and `authParams`/`authParamMap` attributes + * This use to be done by the PulsarClientImpl + * + * @param conf client configuration + * @throws PulsarClientException + */ + private void setAuth(ClientConfigurationData conf) throws PulsarClientException { + if (Validation.isBlank(conf.getAuthPluginClassName()) + || (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) { + return; + } + + if (!Validation.isBlank(conf.getAuthParams())) { + conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams())); + } else if (conf.getAuthParamMap() != null) { + conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap())); + } + } + /** * Extract the configuration map for building Pulsar consumer * @@ -129,6 +167,62 @@ public ConsumerBuilder customize(ConsumerBuilder builder, PulsarConnec return (ConsumerBuilder) ConfigUtils.customize(ic.config(), consumerConfigCustomizers, builder); } + public ConsumerBuilder configure(ConsumerBuilder builder, + PulsarConnectorIncomingConfiguration ic, + ConsumerConfigurationData conf) { + builder.loadConf(configToMap(conf)); + ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i))); + ic.getNegativeAckRedeliveryBackoff() + .ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s, ic.getChannel()))); + ic.getAckTimeoutRedeliveryBackoff() + .ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s, ic.getChannel()))); + if (conf.getConsumerEventListener() != null) { + builder.consumerEventListener(conf.getConsumerEventListener()); + } + if (conf.getPayloadProcessor() != null) { + builder.messagePayloadProcessor(conf.getPayloadProcessor()); + } + if (conf.getKeySharedPolicy() != null) { + builder.keySharedPolicy(conf.getKeySharedPolicy()); + } else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) { + builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); + } + if (conf.getCryptoKeyReader() != null) { + builder.cryptoKeyReader(conf.getCryptoKeyReader()); + } + if (conf.getMessageCrypto() != null) { + builder.messageCrypto(conf.getMessageCrypto()); + } + if (ic.getBatchReceive()) { + builder.batchReceivePolicy( + Objects.requireNonNullElse(conf.getBatchReceivePolicy(), BatchReceivePolicy.DEFAULT_POLICY)); + } + return customize(builder, ic); + } + + private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) { + return DeadLetterPolicy.builder() + .maxRedeliverCount(redeliverCount) + .deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null)) + .retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null)) + .initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null)) + .build(); + } + + private RedeliveryBackoff parseBackoff(String backoffString, String channel) { + String[] strings = backoffString.split(","); + try { + return MultiplierRedeliveryBackoff.builder() + .minDelayMs(Long.parseLong(strings[0])) + .maxDelayMs(Long.parseLong(strings[1])) + .multiplier(Double.parseDouble(strings[2])) + .build(); + } catch (Exception e) { + log.unableToParseRedeliveryBackoff(backoffString, channel); + return null; + } + } + /** * Extract the configuration map for building Pulsar producer * @@ -148,6 +242,25 @@ public ProducerBuilder customize(ProducerBuilder builder, PulsarConnec return (ProducerBuilder) ConfigUtils.customize(oc.config(), producerConfigCustomizers, builder); } + public ProducerBuilder configure(ProducerBuilder builder, + PulsarConnectorOutgoingConfiguration oc, + ProducerConfigurationData conf) { + builder.loadConf(configToMap(conf)); + if (conf.getCustomMessageRouter() != null) { + builder.messageRouter(conf.getCustomMessageRouter()); + } + if (conf.getBatcherBuilder() != null) { + builder.batcherBuilder(conf.getBatcherBuilder()); + } + if (conf.getCryptoKeyReader() != null) { + builder.cryptoKeyReader(conf.getCryptoKeyReader()); + } + for (String encryptionKey : conf.getEncryptionKeys()) { + builder.addEncryptionKey(encryptionKey); + } + return customize(builder, oc); + } + private Map mergeMap(Map defaultConfig, Map channelConfig) { Map map = new HashMap<>(defaultConfig); map.putAll(channelConfig); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java index 408ab39a29..2983a6ef1a 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java @@ -21,12 +21,10 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.eclipse.microprofile.config.Config; @@ -41,7 +39,6 @@ import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; -import io.smallrye.reactive.messaging.providers.helpers.Validation; import io.vertx.mutiny.core.Vertx; @ApplicationScoped @@ -166,36 +163,14 @@ public void terminate( private PulsarClientImpl createPulsarClient(PulsarConnectorCommonConfiguration cc, ClientConfigurationData configuration) { try { - setAuth(configuration); - log.createdClientWithConfig(configuration); - ClientBuilderImpl customized = configResolver.customize(new ClientBuilderImpl(configuration), cc); - return new PulsarClientImpl(customized.getClientConfigurationData(), vertx.nettyEventLoopGroup()); + ClientConfigurationData data = configResolver.configure(cc, configuration).getClientConfigurationData(); + log.createdClientWithConfig(data); + return new PulsarClientImpl(data, vertx.nettyEventLoopGroup()); } catch (PulsarClientException e) { throw ex.illegalStateUnableToBuildClient(e); } } - /** - * Sets the authentication object in the given configuration object using - * `authPluginClassName` and `authParams`/`authParamMap` attributes - * This use to be done by the PulsarClientImpl - * - * @param conf client configuration - * @throws PulsarClientException - */ - private void setAuth(ClientConfigurationData conf) throws PulsarClientException { - if (Validation.isBlank(conf.getAuthPluginClassName()) - || (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) { - return; - } - - if (!Validation.isBlank(conf.getAuthParams())) { - conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams())); - } else if (conf.getAuthParamMap() != null) { - conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap())); - } - } - public PulsarClient getClient(String channel) { return clientsByChannel.get(channel); } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index b61f88f7b6..4121f0865c 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -13,7 +13,6 @@ import jakarta.enterprise.inject.Instance; import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; @@ -62,7 +61,6 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, this.channel = ic.getChannel(); this.healthEnabled = ic.getHealthEnabled(); this.tracingEnabled = ic.getTracingEnabled(); - ConsumerBuilder builder = client.newConsumer(schema); ConsumerConfigurationData conf = configResolver.getConsumerConf(ic); if (conf.getSubscriptionName() == null) { String s = UUID.randomUUID().toString(); @@ -75,32 +73,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, if (conf.getConsumerName() == null) { conf.setConsumerName(channel); } - builder.loadConf(configResolver.configToMap(conf)); - ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i))); - ic.getNegativeAckRedeliveryBackoff().ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s))); - ic.getAckTimeoutRedeliveryBackoff().ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s))); - if (conf.getConsumerEventListener() != null) { - builder.consumerEventListener(conf.getConsumerEventListener()); - } - if (conf.getPayloadProcessor() != null) { - builder.messagePayloadProcessor(conf.getPayloadProcessor()); - } - if (conf.getKeySharedPolicy() != null) { - builder.keySharedPolicy(conf.getKeySharedPolicy()); - } else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) { - builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); - } - if (conf.getCryptoKeyReader() != null) { - builder.cryptoKeyReader(conf.getCryptoKeyReader()); - } - if (conf.getMessageCrypto() != null) { - builder.messageCrypto(conf.getMessageCrypto()); - } - if (ic.getBatchReceive() && conf.getBatchReceivePolicy() == null) { - builder.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY); - } - - this.consumer = configResolver.customize(builder, ic).subscribe(); + ConsumerBuilder builder = configResolver.configure(client.newConsumer(schema), ic, conf); + this.consumer = builder.subscribe(); log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf); this.ackHandler = ackHandlerFactory.create(consumer, ic); this.failureHandler = failureHandlerFactory.create(consumer, ic, this::reportFailure); @@ -196,29 +170,6 @@ private boolean isEndOfStream(PulsarClient client, Throwable throwable) { return false; } - private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) { - return DeadLetterPolicy.builder() - .maxRedeliverCount(redeliverCount) - .deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null)) - .retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null)) - .initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null)) - .build(); - } - - private RedeliveryBackoff parseBackoff(String backoffString) { - String[] strings = backoffString.split(","); - try { - return MultiplierRedeliveryBackoff.builder() - .minDelayMs(Long.parseLong(strings[0])) - .maxDelayMs(Long.parseLong(strings[1])) - .multiplier(Double.parseDouble(strings[2])) - .build(); - } catch (Exception e) { - log.unableToParseRedeliveryBackoff(backoffString, this.channel); - return null; - } - } - static boolean hasTopicConfig(ConsumerConfigurationData conf) { return conf.getTopicsPattern() != null || (conf.getTopicNames() != null && !conf.getTopicNames().isEmpty()); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java index 3948a6a16f..ba245ed2d3 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java @@ -57,19 +57,8 @@ public PulsarOutgoingChannel(PulsarClient client, Schema schema, PulsarConnec if (conf.getMaxPendingMessages() > 0 && conf.getMaxPendingMessagesAcrossPartitions() == 0) { conf.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessages()); } - Map producerConf = configResolver.configToMap(conf); - ProducerBuilder builder = client.newProducer(schema) - .loadConf(producerConf); - if (conf.getBatcherBuilder() != null) { - builder.batcherBuilder(conf.getBatcherBuilder()); - } - if (conf.getCryptoKeyReader() != null) { - builder.cryptoKeyReader(conf.getCryptoKeyReader()); - } - for (String encryptionKey : conf.getEncryptionKeys()) { - builder.addEncryptionKey(encryptionKey); - } - this.producer = configResolver.customize(builder, oc).create(); + ProducerBuilder builder = configResolver.configure(client.newProducer(schema), oc, conf); + this.producer = builder.create(); log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf); long requests = getRequests(oc, conf); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java index 1bebfae0ae..413bd23042 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java @@ -11,14 +11,18 @@ import java.util.concurrent.CopyOnWriteArrayList; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; @@ -45,7 +49,7 @@ static void insertMessages() throws PulsarClientException { } @Test - void testBatchRecieveAppUsingPulsarConnector() { + void testBatchReceiveAppUsingPulsarConnector() { // Run app ConsumingApp app = runApplication(config(), ConsumingApp.class); long start = System.currentTimeMillis(); @@ -58,6 +62,35 @@ void testBatchRecieveAppUsingPulsarConnector() { assertThat(app.getResults()).containsExactlyElementsOf(expected); } + @Test + void testBatchReceiveAppWithCustomConfig() { + addBeans(BatchConfig.class); + // Run app + ConsumingApp app = runApplication(config() + .with("mp.messaging.incoming.data.consumer-configuration", "batch-config"), ConsumingApp.class); + long start = System.currentTimeMillis(); + + // Check for consumed messages in app + await().atMost(Duration.ofSeconds(30)).until(() -> app.getResults().size() == NUMBER_OF_MESSAGES); + long end = System.currentTimeMillis(); + + System.out.println("Ack - Estimate: " + (end - start) + " ms"); + assertThat(app.getResults()).containsExactlyElementsOf(expected); + } + + @ApplicationScoped + public static class BatchConfig { + @Produces + @Identifier("batch-config") + public ConsumerConfigurationData configureBatchConsumer() { + var data = new ConsumerConfigurationData<>(); + data.setBatchReceivePolicy(BatchReceivePolicy.builder() + .maxNumMessages(10) + .build()); + return data; + } + } + MapBasedConfig config() { return new MapBasedConfig() .with("mp.messaging.incoming.data.connector", PulsarConnector.CONNECTOR_NAME) @@ -77,7 +110,6 @@ public static class ConsumingApp { @Incoming("data") public CompletionStage consume(PulsarIncomingBatchMessage message) { - // System.out.println(message.getIncomingMessages().size()); results.addAll(message.getPayload()); return message.ack(); } From 6ed59de6dc9b6698d498334bd51ac5b5a2d950a0 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 23 Jul 2024 15:44:18 +0300 Subject: [PATCH 2/2] Pulsar transactions batch eop fix Fixes #2245 --- .../transactions/PulsarTransactionsImpl.java | 23 +++++++++++-------- .../pulsar/base/PulsarContainer.java | 4 +++- .../ExactlyOnceProcessingBatchTest.java | 12 ++-------- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java index 2c8a4d7b8f..9aa93d2289 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java @@ -39,6 +39,11 @@ public PulsarTransactionsImpl(EmitterConfiguration config, long defaultBufferSiz this.pulsarClient = pulsarClientService.getClient(config.name()); } + private static boolean isConflict(Throwable throwable) { + return throwable instanceof PulsarClientException.TransactionConflictException + || throwable.getCause() instanceof PulsarClientException.TransactionConflictException; + } + @Override public Uni withTransaction(Function, Uni> work) { return new PulsarTransactionEmitter().execute(work); @@ -57,17 +62,14 @@ public Uni withTransaction(Message message, Function Uni withTransaction(Duration txnTimeout, Message message, Function, Uni> work) { return new PulsarTransactionEmitter(txnTimeout, - txn -> Uni.createFrom().completionStage(message.ack()), + txn -> Uni.createFrom().completionStage(message.ack()) + .onFailure(PulsarTransactionsImpl::isConflict).recoverWithUni(throwable -> VOID_UNI), PulsarTransactionsImpl::defaultAfterCommit, (txn, throwable) -> { - if (!(throwable.getCause() instanceof PulsarClientException.TransactionConflictException)) { - // If TransactionConflictException is not thrown, - // you need to redeliver or negativeAcknowledge this message, - // or else this message will not be received again. - return Uni.createFrom().completionStage(() -> message.nack(throwable)); - } else { - return VOID_UNI; - } + // If TransactionConflictException is not thrown, + // you need to redeliver or negativeAcknowledge this message, + // or else this message will not be received again. + return isConflict(throwable) ? VOID_UNI : Uni.createFrom().completionStage(message.nack(throwable)); }, PulsarTransactionsImpl::defaultAfterAbort) .execute(e -> { @@ -200,7 +202,8 @@ private Uni executeInTransaction(Function, Uni> wo .onCancellation().call(() -> abort(new RuntimeException("Transaction cancelled"))) // when there was no exception, // commit or rollback the transaction - .call(() -> abort ? abort(new RuntimeException("Transaction aborted")) : commit()) + .call(() -> abort ? abort(new RuntimeException("Transaction aborted")) + : commit().onFailure().call(throwable -> abort(throwable))) .onFailure().recoverWithUni(throwable -> afterAbort.apply(throwable)) .onItem().transformToUni(result -> afterCommit.apply(result)); } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java index ddcd334b06..c93858eaae 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java @@ -12,7 +12,7 @@ public class PulsarContainer extends GenericContainer { - public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.2.2"); + public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0"); public static final String STARTER_SCRIPT = "/run_pulsar.sh"; @@ -45,6 +45,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n"; command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n"; command += "export PULSAR_PREFIX_systemTopicEnabled=true\n"; + command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n"; + command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n"; command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"; copyFileToContainer( Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java index ad6bd73863..4589357679 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java @@ -25,8 +25,6 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; @@ -34,7 +32,6 @@ import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; import io.smallrye.reactive.messaging.pulsar.PulsarMessage; -import io.smallrye.reactive.messaging.pulsar.TestTags; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -104,8 +101,6 @@ Uni process(PulsarIncomingBatchMessage batch) { * There are still duplicate items delivered to the consumer batch after an transaction abort. */ @Test - @Tag(TestTags.FLAKY) - @Disabled void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, PulsarClientException { addBeans(ConsumerConfig.class); this.inTopic = UUID.randomUUID().toString(); @@ -137,10 +132,6 @@ void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, .topic(this.inTopic) .create(), numberOfRecords, (i, producer) -> producer.newMessage().sequenceId(i).value(i).key("k-" + i)); - await().untilAsserted(() -> assertThat(app.getProcessed()) - .containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList())) - .doesNotHaveDuplicates()); - await().untilAsserted(() -> assertThat(list) .containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList())) .doesNotHaveDuplicates()); @@ -166,7 +157,8 @@ private MapBasedConfig consumerConfig() { .with("mp.messaging.incoming.exactly-once-consumer.enableTransaction", true) .with("mp.messaging.incoming.exactly-once-consumer.negativeAckRedeliveryDelayMicros", 5000) .with("mp.messaging.incoming.exactly-once-consumer.schema", "INT32") - .with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true); + .with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true) + .with("mp.messaging.incoming.exactly-once-consumer.batchIndexAckEnable", true); } @ApplicationScoped