From 0b54aaf8f8909df85843ed8d2fcad75c437e2f75 Mon Sep 17 00:00:00 2001 From: daojun Date: Sat, 10 Dec 2022 08:01:58 +0800 Subject: [PATCH 1/2] fix PendingAckHandleImpl when pendingAckStoreProvider.checkInitializedBefore failed --- .../pendingack/impl/PendingAckHandleImpl.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 07a0c0c8b57fc..e55d0ed739bc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -153,13 +153,19 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { this.pendingAckStoreProvider = this.persistentSubscription.getTopic() .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider(); - pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> { - if (init) { - initPendingAckStore(); - } else { - completeHandleFuture(); - } - }); + + pendingAckStoreProvider.checkInitializedBefore(persistentSubscription) + .thenAccept(init -> { + if (init) { + initPendingAckStore(); + } else { + completeHandleFuture(); + } + }) + .exceptionally(t -> { + exceptionHandleFuture(t); + return null; + }); } private void initPendingAckStore() { From 955e1fef4bb1fa2ed186b5a63d65794c99b9dbc0 Mon Sep 17 00:00:00 2001 From: daojun Date: Sat, 10 Dec 2022 08:29:08 +0800 Subject: [PATCH 2/2] fix PendingAckHandleImpl when pendingAckStoreProvider.checkInitializedBefore failed --- .../broker/transaction/pendingack/impl/PendingAckHandleImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index e55d0ed739bc7..35d75f1faee77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -163,6 +163,7 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { } }) .exceptionally(t -> { + changeToErrorState(); exceptionHandleFuture(t); return null; });