From 6ed59de6dc9b6698d498334bd51ac5b5a2d950a0 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 23 Jul 2024 15:44:18 +0300 Subject: [PATCH] Pulsar transactions batch eop fix Fixes #2245 --- .../transactions/PulsarTransactionsImpl.java | 23 +++++++++++-------- .../pulsar/base/PulsarContainer.java | 4 +++- .../ExactlyOnceProcessingBatchTest.java | 12 ++-------- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java index 2c8a4d7b8f..9aa93d2289 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.java @@ -39,6 +39,11 @@ public PulsarTransactionsImpl(EmitterConfiguration config, long defaultBufferSiz this.pulsarClient = pulsarClientService.getClient(config.name()); } + private static boolean isConflict(Throwable throwable) { + return throwable instanceof PulsarClientException.TransactionConflictException + || throwable.getCause() instanceof PulsarClientException.TransactionConflictException; + } + @Override public Uni withTransaction(Function, Uni> work) { return new PulsarTransactionEmitter().execute(work); @@ -57,17 +62,14 @@ public Uni withTransaction(Message message, Function Uni withTransaction(Duration txnTimeout, Message message, Function, Uni> work) { return new PulsarTransactionEmitter(txnTimeout, - txn -> Uni.createFrom().completionStage(message.ack()), + txn -> Uni.createFrom().completionStage(message.ack()) + .onFailure(PulsarTransactionsImpl::isConflict).recoverWithUni(throwable -> VOID_UNI), PulsarTransactionsImpl::defaultAfterCommit, (txn, throwable) -> { - if (!(throwable.getCause() instanceof PulsarClientException.TransactionConflictException)) { - // If TransactionConflictException is not thrown, - // you need to redeliver or negativeAcknowledge this message, - // or else this message will not be received again. - return Uni.createFrom().completionStage(() -> message.nack(throwable)); - } else { - return VOID_UNI; - } + // If TransactionConflictException is not thrown, + // you need to redeliver or negativeAcknowledge this message, + // or else this message will not be received again. + return isConflict(throwable) ? VOID_UNI : Uni.createFrom().completionStage(message.nack(throwable)); }, PulsarTransactionsImpl::defaultAfterAbort) .execute(e -> { @@ -200,7 +202,8 @@ private Uni executeInTransaction(Function, Uni> wo .onCancellation().call(() -> abort(new RuntimeException("Transaction cancelled"))) // when there was no exception, // commit or rollback the transaction - .call(() -> abort ? abort(new RuntimeException("Transaction aborted")) : commit()) + .call(() -> abort ? abort(new RuntimeException("Transaction aborted")) + : commit().onFailure().call(throwable -> abort(throwable))) .onFailure().recoverWithUni(throwable -> afterAbort.apply(throwable)) .onItem().transformToUni(result -> afterCommit.apply(result)); } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java index ddcd334b06..c93858eaae 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/PulsarContainer.java @@ -12,7 +12,7 @@ public class PulsarContainer extends GenericContainer { - public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.2.2"); + public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0"); public static final String STARTER_SCRIPT = "/run_pulsar.sh"; @@ -45,6 +45,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n"; command += "export PULSAR_PREFIX_transactionCoordinatorEnabled=true\n"; command += "export PULSAR_PREFIX_systemTopicEnabled=true\n"; + command += "export PULSAR_PREFIX_brokerDeduplicationEnabled=true\n"; + command += "export PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true\n"; command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"; copyFileToContainer( Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java index ad6bd73863..4589357679 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/transactions/ExactlyOnceProcessingBatchTest.java @@ -25,8 +25,6 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; @@ -34,7 +32,6 @@ import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; import io.smallrye.reactive.messaging.pulsar.PulsarMessage; -import io.smallrye.reactive.messaging.pulsar.TestTags; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -104,8 +101,6 @@ Uni process(PulsarIncomingBatchMessage batch) { * There are still duplicate items delivered to the consumer batch after an transaction abort. */ @Test - @Tag(TestTags.FLAKY) - @Disabled void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, PulsarClientException { addBeans(ConsumerConfig.class); this.inTopic = UUID.randomUUID().toString(); @@ -137,10 +132,6 @@ void testExactlyOnceProcessorWithProcessingError() throws PulsarAdminException, .topic(this.inTopic) .create(), numberOfRecords, (i, producer) -> producer.newMessage().sequenceId(i).value(i).key("k-" + i)); - await().untilAsserted(() -> assertThat(app.getProcessed()) - .containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList())) - .doesNotHaveDuplicates()); - await().untilAsserted(() -> assertThat(list) .containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList())) .doesNotHaveDuplicates()); @@ -166,7 +157,8 @@ private MapBasedConfig consumerConfig() { .with("mp.messaging.incoming.exactly-once-consumer.enableTransaction", true) .with("mp.messaging.incoming.exactly-once-consumer.negativeAckRedeliveryDelayMicros", 5000) .with("mp.messaging.incoming.exactly-once-consumer.schema", "INT32") - .with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true); + .with("mp.messaging.incoming.exactly-once-consumer.batchReceive", true) + .with("mp.messaging.incoming.exactly-once-consumer.batchIndexAckEnable", true); } @ApplicationScoped