Skip to content

Commit

Permalink
Unload bundle: close topic forcefully and enable bundle on ownership …
Browse files Browse the repository at this point in the history
…removal failure (#164)
  • Loading branch information
rdhabalia authored Jan 20, 2017
1 parent 9266687 commit 8818eea
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,15 +677,15 @@ private boolean isDestinationOwned(DestinationName fqdn) throws Exception {
}

public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName));
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle);
ownershipCache.removeOwnership(nsBundle).get();
}

public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData));
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,26 @@ public void handleUnloadRequest(PulsarService pulsar) throws Exception {
int unloadedTopics = 0;
try {
LOG.info("Disabling ownership: {}", this.bundle);
pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.bundle);

// Handle unload of persistent topics
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, false);

// close topics forcefully
try {
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get();
} catch (Exception e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);
}
// delete ownership node on zk
try {
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle).get();
} catch (Exception e) {
// Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.bundle, true);
throw new RuntimeException(String.format("Failed to delete ownership node %s", bundle.toString()),
e.getCause());
}
} catch (Exception e) {
LOG.error(String.format("failed to unload a namespace. ns=%s", bundle.toString()), e);
LOG.error("Failed to unload a namespace {}", bundle.toString(), e);
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkState;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,6 +38,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.client.util.FutureUtil;
Expand Down Expand Up @@ -141,26 +143,6 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
}
}

private class OwnedServiceUnitCacheRemovalListener implements RemovalListener<String, OwnedBundle> {

@Override
public void onRemoval(String key, OwnedBundle value, RemovalCause cause) {
LOG.info("Removing ownership for {}", key);
// Under the cache sync lock, removing the ZNode
// If succeeded, we guaranteed that the cache entry is removed together w/ ZNode

localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
LOG.info("Removed zk lock for service unit: {}", key);
} else {
LOG.warn("Failed to delete the namespace ephemeral node. key={}", key,
KeeperException.Code.get(rc));
}
}, null);
ownershipReadOnlyCache.invalidate(key);
}
}

/**
* Constructor of <code>OwnershipCache</code>
*
Expand All @@ -179,7 +161,6 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.sameThreadExecutor())
.removalListener(new OwnedServiceUnitCacheRemovalListener())
.buildAsync(new OwnedServiceUnitCacheLoader());
}

Expand Down Expand Up @@ -268,8 +249,22 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
* Method to remove the ownership of local broker on the <code>NamespaceBundle</code>, if owned
*
*/
public void removeOwnership(NamespaceBundle bundle) {
ownedBundlesCache.synchronous().invalidate(ServiceUnitZkUtils.path(bundle));
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
String key = ServiceUnitZkUtils.path(bundle);
localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) {
LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc));
ownedBundlesCache.synchronous().invalidate(key);
ownershipReadOnlyCache.invalidate(key);
result.complete(null);
} else {
LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key,
KeeperException.Code.get(rc));
result.completeExceptionally(KeeperException.create(rc));
}
}, null);
return result;
}

/**
Expand All @@ -278,22 +273,18 @@ public void removeOwnership(NamespaceBundle bundle) {
* @param bundles
* <code>NamespaceBundles</code> to remove from ownership cache
*/
public void removeOwnership(NamespaceBundles bundles) {
boolean hasError = false;
public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
List<CompletableFuture<Void>> allFutures = Lists.newArrayList();
for (NamespaceBundle bundle : bundles.getBundles()) {
if (getOwnedBundle(bundle) == null) {
// continue
continue;
}
try {
this.removeOwnership(bundle);
} catch (Exception e) {
LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e);
hasError = true;
}
allFutures.add(this.removeOwnership(bundle));
}
checkState(!hasError, "Not able to remove all owned bundles");
return FutureUtil.waitForAll(allFutures);
}


/**
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
Expand Down Expand Up @@ -330,17 +321,32 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
}
}

/**
* Disable bundle in local cache and on zk
*
* @param bundle
* @throws Exception
*/
public void disableOwnership(NamespaceBundle bundle) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);

updateBundleState(bundle, false);
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}

/**
* Update bundle state in a local cache
*
* @param bundle
* @throws Exception
*/
public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws Exception {
String path = ServiceUnitZkUtils.path(bundle);
// Disable owned instance in local cache
CompletableFuture<OwnedBundle> f = ownedBundlesCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
f.join().setActive(false);
f.join().setActive(isActive);
}

localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}

public NamespaceEphemeralData getSelfOwnerInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -39,6 +40,8 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.data.Stat;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -48,14 +51,18 @@
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

public class NamespaceServiceTest extends BrokerTestBase {

Expand Down Expand Up @@ -232,10 +239,45 @@ public void testremoveOwnershipNamespaceBundle() throws Exception {
NamespaceBundle bundle = bundles.getBundles().get(0);
assertNotNull(ownershipCache.tryAcquiringOwnership(bundle));
assertNotNull(ownershipCache.getOwnedBundle(bundle));
ownershipCache.removeOwnership(bundles);
ownershipCache.removeOwnership(bundles).get();
assertNull(ownershipCache.getOwnedBundle(bundle));
}

@Test
public void testUnloadNamespaceBundleFailure() throws Exception {

final String topicName = "persistent://my-property/use/my-ns/my-topic1";
ConsumerConfiguration conf = new ConsumerConfiguration();
Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf);
ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics();
Topic spyTopic = spy(topics.get(topicName).get());
topics.clear();
CompletableFuture<Topic> topicFuture = CompletableFuture.completedFuture(spyTopic);
// add mock topic
topics.put(topicName, topicFuture);
doAnswer(new Answer<CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new RuntimeException("first time failed"));
return result;
}
}).when(spyTopic).close();
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
try {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
} catch (Exception e) {
// fail
fail(e.getMessage());
}
try {
pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null);
fail("it should fail as node is not present");
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
// ok
}
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void testRemoveOwnership() throws Exception {
// case 1: no one owns the namespace
assertFalse(cache.getOwnerAsync(bundle).get().isPresent());

cache.removeOwnership(bundle);
cache.removeOwnership(bundle).get();
assertTrue(cache.getOwnedBundles().isEmpty());

// case 2: this broker owns the namespace
Expand All @@ -267,6 +267,7 @@ public void testRemoveOwnership() throws Exception {
assertTrue(!data1.isDisabled());
assertTrue(cache.getOwnedBundles().size() == 1);
cache.removeOwnership(bundle);
Thread.sleep(500);
assertTrue(cache.getOwnedBundles().isEmpty());

Thread.sleep(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void testConfigChangeNegativeCases() throws Exception {
}

// restore the namespace state
ownerCache.removeOwnership(globalNsBundle);
ownerCache.removeOwnership(globalNsBundle).get();
ownerCache.tryAcquiringOwnership(globalNsBundle);
}

Expand Down

0 comments on commit 8818eea

Please sign in to comment.