From 79bbfce7bd7b8b9b33884b3a643dc4e2a2398080 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Mon, 9 Aug 2021 09:49:13 +0800 Subject: [PATCH] [Transaction] Fix delete sub then delete pending ack. (#11023) Fix delete sub then delete pending ack managedledger. (cherry picked from commit a50fe878773a76ca974d1af6ea8b994a5df7f81a) --- .../service/persistent/PersistentTopic.java | 97 +++++++++++++------ .../broker/service/PersistentTopicTest.java | 6 ++ .../pulsar/broker/service/ServerCnxTest.java | 13 ++- .../buffer/TransactionBufferClientTest.java | 5 +- .../pendingack/PendingAckPersistentTest.java | 76 ++++++++++++++- 5 files changed, 161 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 62a07017a7f32..2f9a91da972af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -107,6 +108,7 @@ import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; @@ -147,6 +149,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; @@ -949,7 +952,32 @@ public CompletableFuture createSubscription(String subscriptionNam @Override public CompletableFuture unsubscribe(String subscriptionName) { CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore + .getTransactionPendingAckStoreSuffix(topic, + Codec.encode(subscriptionName))).getPersistenceNamingEncoding(), + new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + asyncDeleteCursor(subscriptionName, unsubscribeFuture); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + if (exception instanceof MetadataNotFoundException) { + asyncDeleteCursor(subscriptionName, unsubscribeFuture); + return; + } + + unsubscribeFuture.completeExceptionally(exception); + log.error("[{}][{}] Error deleting subscription pending ack store", + topic, subscriptionName, exception); + } + }, null); + + return unsubscribeFuture; + } + private void asyncDeleteCursor(String subscriptionName, CompletableFuture unsubscribeFuture) { ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { @@ -964,13 +992,12 @@ public void deleteCursorComplete(Object ctx) { @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Error deleting cursor for subscription", topic, subscriptionName, exception); + log.debug("[{}][{}] Error deleting cursor for subscription", + topic, subscriptionName, exception); } unsubscribeFuture.completeExceptionally(new PersistenceException(exception)); } }, null); - - return unsubscribeFuture; } void removeSubscription(String subscriptionName) { @@ -1077,32 +1104,46 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, unfenceTopicToResume(); deleteFuture.completeExceptionally(ex); } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(topic); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic)); - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } - - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - if (exception.getCause() instanceof KeeperException.NoNodeException) { - log.info("[{}] Topic is already deleted {}", topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally(new PersistenceException(exception)); - } + List> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(topic); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + brokerService.pulsar().getTopicPoliciesService() + .clean(TopicName.get(topic)); + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", topic, exception); + deleteFuture.completeExceptionally(new PersistenceException(exception)); + } + } + }, null); } - }, null); + }); } }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 4a804b201e215..b53404a898e91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -178,6 +178,12 @@ public void setup() throws Exception { mlFactoryMock = mock(ManagedLedgerFactory.class); doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); + doAnswer(invocation -> { + DeleteLedgerCallback deleteLedgerCallback = invocation.getArgument(1); + deleteLedgerCallback.deleteLedgerComplete(null); + return null; + }).when(mlFactoryMock).asyncDelete(any(), any(), any()); + ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); doReturn(createMockBookKeeper(executor)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b2d1e54dcd74e..2e2f0f20b0894 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -115,6 +115,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -759,14 +760,16 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { // Create producer second time clientCommand = Commands.newSubscribe(successTopicName, // - successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, + successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /* avoid reseting cursor */); channel.writeInbound(clientCommand); - Object response = getResponse(); - assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); - CommandError error = (CommandError) response; - assertEquals(error.getError(), ServerError.ServiceNotReady); + Awaitility.await().untilAsserted(() -> { + Object response = getResponse(); + assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); + CommandError error = (CommandError) response; + assertEquals(error.getError(), ServerError.ConsumerBusy); + }); channel.finish(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index 5d088bbacd5a6..60815807af827 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl; import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; import org.apache.pulsar.client.api.transaction.TransactionBufferClientException; @@ -90,9 +91,11 @@ protected void afterSetup() throws Exception { pulsarAdmins[0].tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster"))); pulsarAdmins[0].namespaces().createNamespace(namespace, 10); pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); + String subName = "test"; + pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(), subName, MessageId.latest); pulsarClient.newConsumer() .topic(partitionedTopicName.getPartitionedTopicName()) - .subscriptionName("test").subscribe(); + .subscriptionName(subName).subscribe(); tbClient = TransactionBufferClientImpl.create( ((PulsarClientImpl) pulsarClient), new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer"))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 191e40276f4a4..461991124cee6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.transaction.pendingack; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; @@ -45,9 +46,11 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -59,7 +62,11 @@ @Slf4j public class PendingAckPersistentTest extends TransactionTestBase { - private final static String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay"; + private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay"; + + private static final String NAMESPACE = "public/txn"; + + private static final int NUM_PARTITIONS = 16; @BeforeMethod public void setup() throws Exception { @@ -75,7 +82,7 @@ public void setup() throws Exception { admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); admin.tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace("public/txn", 10); + admin.namespaces().createNamespace(NAMESPACE, 10); admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC); pulsarClient = PulsarClient.builder() @@ -298,4 +305,69 @@ public void cumulativePendingAckReplayTest() throws Exception { .until(() -> ((PositionImpl) managedCursor.getMarkDeletedPosition()) .compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1); } + + @Test + private void testDeleteSubThenDeletePendingAckManagedLedger() throws Exception { + + String subName = "test-delete"; + + String topic = TopicName.get(TopicDomain.persistent.toString(), + NamespaceName.get(NAMESPACE), "test-delete").toString(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + consumer.close(); + + admin.topics().deleteSubscription(topic, subName); + + List topics = admin.namespaces().getTopics(NAMESPACE); + + TopicStats topicStats = admin.topics().getStats(topic, false); + + assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName))); + + assertTrue(topics.contains(topic)); + } + + @Test + private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception { + + String subName1 = "test-delete"; + String subName2 = "test-delete"; + + String topic = TopicName.get(TopicDomain.persistent.toString(), + NamespaceName.get(NAMESPACE), "test-delete").toString(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName1) + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + consumer1.close(); + + @Cleanup + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName2) + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + consumer2.close(); + + admin.topics().delete(topic); + + List topics = admin.namespaces().getTopics(NAMESPACE); + + assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName1))); + assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2))); + assertFalse(topics.contains(topic)); + } }