diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java index 1949ded919a15..31c77090d6304 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java @@ -677,15 +677,15 @@ private boolean isDestinationOwned(DestinationName fqdn) throws Exception { } public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception { - ownershipCache.removeOwnership(getFullBundle(nsName)); + ownershipCache.removeOwnership(getFullBundle(nsName)).get(); } public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception { - ownershipCache.removeOwnership(nsBundle); + ownershipCache.removeOwnership(nsBundle).get(); } public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception { - ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)); + ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(); } public NamespaceBundleFactory getNamespaceBundleFactory() { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java index c59a0915760ff..ab0664fc29d8a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedBundle.java @@ -104,13 +104,26 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception { int unloadedTopics = 0; try { LOG.info("Disabling ownership: {}", this.bundle); - pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.bundle); - - // Handle unload of persistent topics - unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(); - pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle); + pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, false); + + // close topics forcefully + try { + unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(); + } catch (Exception e) { + // ignore topic-close failure to unload bundle + LOG.error("Failed to close topics under namespace {}", bundle.toString(), e); + } + // delete ownership node on zk + try { + pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle).get(); + } catch (Exception e) { + // Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics + pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true); + throw new RuntimeException(String.format("Failed to delete ownership node %s", bundle.toString()), + e.getCause()); + } } catch (Exception e) { - LOG.error(String.format("failed to unload a namespace. ns=%s", bundle.toString()), e); + LOG.error("Failed to unload a namespace {}", bundle.toString(), e); throw new RuntimeException(e); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java index 015c0d66fa894..f2e4a359ae070 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkState; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,6 +38,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.client.util.FutureUtil; @@ -141,26 +143,6 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe } } - private class OwnedServiceUnitCacheRemovalListener implements RemovalListener { - - @Override - public void onRemoval(String key, OwnedBundle value, RemovalCause cause) { - LOG.info("Removing ownership for {}", key); - // Under the cache sync lock, removing the ZNode - // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode - - localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { - if (rc == KeeperException.Code.OK.intValue()) { - LOG.info("Removed zk lock for service unit: {}", key); - } else { - LOG.warn("Failed to delete the namespace ephemeral node. key={}", key, - KeeperException.Code.get(rc)); - } - }, null); - ownershipReadOnlyCache.invalidate(key); - } - } - /** * Constructor of OwnershipCache * @@ -179,7 +161,6 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache(); // ownedBundlesCache contains all namespaces that are owned by the local broker this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor()) - .removalListener(new OwnedServiceUnitCacheRemovalListener()) .buildAsync(new OwnedServiceUnitCacheLoader()); } @@ -268,8 +249,22 @@ public CompletableFuture tryAcquiringOwnership(Namespace * Method to remove the ownership of local broker on the NamespaceBundle, if owned * */ - public void removeOwnership(NamespaceBundle bundle) { - ownedBundlesCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle)); + public CompletableFuture removeOwnership(NamespaceBundle bundle) { + CompletableFuture result = new CompletableFuture<>(); + String key = ServiceUnitZkUtils.path(bundle); + localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { + if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) { + LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc)); + ownedBundlesCache.synchronous().invalidate(key); + ownershipReadOnlyCache.invalidate(key); + result.complete(null); + } else { + LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, + KeeperException.Code.get(rc)); + result.completeExceptionally(KeeperException.create(rc)); + } + }, null); + return result; } /** @@ -278,22 +273,18 @@ public void removeOwnership(NamespaceBundle bundle) { * @param bundles * NamespaceBundles to remove from ownership cache */ - public void removeOwnership(NamespaceBundles bundles) { - boolean hasError = false; + public CompletableFuture removeOwnership(NamespaceBundles bundles) { + List> allFutures = Lists.newArrayList(); for (NamespaceBundle bundle : bundles.getBundles()) { if (getOwnedBundle(bundle) == null) { // continue continue; } - try { - this.removeOwnership(bundle); - } catch (Exception e) { - LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e); - hasError = true; - } + allFutures.add(this.removeOwnership(bundle)); } - checkState(!hasError, "Not able to remove all owned bundles"); + return FutureUtil.waitForAll(allFutures); } + /** * Method to access the map of all ServiceUnit objects owned by the local broker @@ -330,17 +321,32 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) { } } + /** + * Disable bundle in local cache and on zk + * + * @param bundle + * @throws Exception + */ public void disableOwnership(NamespaceBundle bundle) throws Exception { String path = ServiceUnitZkUtils.path(bundle); - + updateBundleState(bundle, false); + localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1); + ownershipReadOnlyCache.invalidate(path); + } + + /** + * Update bundle state in a local cache + * + * @param bundle + * @throws Exception + */ + public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws Exception { + String path = ServiceUnitZkUtils.path(bundle); // Disable owned instance in local cache CompletableFuture f = ownedBundlesCache.getIfPresent(path); if (f != null && f.isDone() && !f.isCompletedExceptionally()) { - f.join().setActive(false); + f.join().setActive(isActive); } - - localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1); - ownershipReadOnlyCache.invalidate(path); } public NamespaceEphemeralData getSelfOwnerInfo() { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java index 89d9243baf569..ea4647659506f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java @@ -18,6 +18,7 @@ import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -39,6 +40,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.data.Stat; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -48,7 +51,10 @@ import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import com.yahoo.pulsar.broker.service.BrokerTestBase; +import com.yahoo.pulsar.broker.service.Topic; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceBundleFactory; @@ -56,6 +62,7 @@ import com.yahoo.pulsar.common.naming.NamespaceName; import com.yahoo.pulsar.common.policies.data.Policies; import com.yahoo.pulsar.common.util.ObjectMapperFactory; +import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; public class NamespaceServiceTest extends BrokerTestBase { @@ -232,10 +239,45 @@ public void testremoveOwnershipNamespaceBundle() throws Exception { NamespaceBundle bundle = bundles.getBundles().get(0); assertNotNull(ownershipCache.tryAcquiringOwnership(bundle)); assertNotNull(ownershipCache.getOwnedBundle(bundle)); - ownershipCache.removeOwnership(bundles); + ownershipCache.removeOwnership(bundles).get(); assertNull(ownershipCache.getOwnedBundle(bundle)); } + @Test + public void testUnloadNamespaceBundleFailure() throws Exception { + + final String topicName = "persistent://my-property/use/my-ns/my-topic1"; + ConsumerConfiguration conf = new ConsumerConfiguration(); + Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf); + ConcurrentOpenHashMap> topics = pulsar.getBrokerService().getTopics(); + Topic spyTopic = spy(topics.get(topicName).get()); + topics.clear(); + CompletableFuture topicFuture = CompletableFuture.completedFuture(spyTopic); + // add mock topic + topics.put(topicName, topicFuture); + doAnswer(new Answer>() { + @Override + public CompletableFuture answer(InvocationOnMock invocation) throws Throwable { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(new RuntimeException("first time failed")); + return result; + } + }).when(spyTopic).close(); + NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName)); + try { + pulsar.getNamespaceService().unloadNamespaceBundle(bundle); + } catch (Exception e) { + // fail + fail(e.getMessage()); + } + try { + pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null); + fail("it should fail as node is not present"); + } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { + // ok + } + } + @SuppressWarnings("unchecked") private Pair> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java index 601dbc357f9fe..f146c1a6f47dd 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java @@ -258,7 +258,7 @@ public void testRemoveOwnership() throws Exception { // case 1: no one owns the namespace assertFalse(cache.getOwnerAsync(bundle).get().isPresent()); - cache.removeOwnership(bundle); + cache.removeOwnership(bundle).get(); assertTrue(cache.getOwnedBundles().isEmpty()); // case 2: this broker owns the namespace @@ -267,6 +267,7 @@ public void testRemoveOwnership() throws Exception { assertTrue(!data1.isDisabled()); assertTrue(cache.getOwnedBundles().size() == 1); cache.removeOwnership(bundle); + Thread.sleep(500); assertTrue(cache.getOwnedBundles().isEmpty()); Thread.sleep(500); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java index ab2816c6ecec2..2749ff61ee1b6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java @@ -236,7 +236,7 @@ public void testConfigChangeNegativeCases() throws Exception { } // restore the namespace state - ownerCache.removeOwnership(globalNsBundle); + ownerCache.removeOwnership(globalNsBundle).get(); ownerCache.tryAcquiringOwnership(globalNsBundle); }