diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index b4f3c37b6d9cb..8698eabbe9d67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -268,8 +268,8 @@ public CompletableFuture> getWebServiceUrlAsync(ServiceUnitId suNa * If the service unit is not owned, return an empty optional */ public Optional getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception { - return getWebServiceUrlAsync(suName, options).get( - pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); + return getWebServiceUrlAsync(suName, options) + .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); } private CompletableFuture> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java index 754031f359974..4f93057eeca37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java @@ -132,6 +132,8 @@ public CompletableFuture handleUnloadRequest(PulsarService pulsar, long ti } else { unloadedTopics.set(numUnloadedTopics); } + // clean up topics that failed to unload from the broker ownership cache + pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle); return null; }) .thenCompose(v -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 5d614af5bc6eb..6cfdf38b75ca3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1569,6 +1569,15 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit return FutureUtil.waitForAll(closeFutures).thenApply(v -> closeFutures.size()); } + public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { + topics.forEach((name, topicFuture) -> { + TopicName topicName = TopicName.get(name); + if (serviceUnit.includes(topicName)) { + pulsar.getBrokerService().removeTopicFromCache(topicName.toString()); + } + }); + } + public AuthorizationService getAuthorizationService() { return authorizationService; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java new file mode 100644 index 0000000000000..9961f3ac88390 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + */ +public class PersistentTopicTest extends BrokerTestBase { + @BeforeMethod + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + /** + * Test validates that broker cleans up topic which failed to unload while bundle unloading. + * + * @throws Exception + */ + @Test + public void testCleanFailedUnloadTopic() throws Exception { + final String topicName = "persistent://prop/ns-abc/failedUnload"; + + // 1. producer connect + Producer producer = pulsarClient.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + + ManagedLedger ml = topicRef.ledger; + LedgerHandle ledger = mock(LedgerHandle.class); + Field handleField = ml.getClass().getDeclaredField("currentLedger"); + handleField.setAccessible(true); + handleField.set(ml, ledger); + doNothing().when(ledger).asyncClose(any(), any()); + + NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); + pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 5, TimeUnit.SECONDS).get(); + + retryStrategically((test) -> !pulsar.getBrokerService().getTopicReference(topicName).isPresent(), 5, 500); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + producer.close(); + } +}