Skip to content

Commit

Permalink
[pulsar-broker] Refresh ZooKeeper-data cache in background to avoid d…
Browse files Browse the repository at this point in the history
…eadlock and blocking IO on ZK thread

fix test
  • Loading branch information
rdhabalia committed Oct 23, 2020
1 parent 7f9b7cf commit 1379947
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void setup() throws Exception {
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);

pulsar = new PulsarService(config);
pulsar.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,13 +39,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
Expand Down Expand Up @@ -83,19 +81,20 @@ public static interface CacheUpdater<T> {

public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";

protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final AsyncLoadingCache<String, Pair<Entry<Object, Stat>, Long>> dataCache;
protected final AsyncLoadingCache<String, Set<String>> childrenCache;
protected final AsyncLoadingCache<String, Boolean> existsCache;
private final OrderedExecutor executor;
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300; //5 minutes
private final int cacheExpirySeconds;

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, DEFAULT_CACHE_EXPIRY_SECONDS);
}

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
Expand All @@ -105,10 +104,10 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime
this.executor = executor;
this.zkSession.set(zkSession);
this.shouldShutdownExecutor = false;
this.cacheExpirySeconds = cacheExpirySeconds;

this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(cacheExpirySeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder()
Expand Down Expand Up @@ -346,10 +345,12 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
checkNotNull(path);
checkNotNull(deserializer);

CompletableFuture<Optional<Entry<T, Stat>>> future = new CompletableFuture<>();
// refresh zk-cache entry in background if it's already expired
checkAndRefreshExpiredEntry(path, deserializer);
CompletableFuture<Optional<Entry<T,Stat>>> future = new CompletableFuture<>();
dataCache.get(path, (p, executor) -> {
// Return a future for the z-node to be fetched from ZK
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
CompletableFuture<Pair<Entry<Object, Stat>, Long>> zkFuture = new CompletableFuture<>();

// Broker doesn't restart on global-zk session lost: so handling unexpected exception
try {
Expand All @@ -358,8 +359,8 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
executor.execute(
() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
executor.execute(() -> zkFuture.complete(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
executor.execute(() -> zkFuture.completeExceptionally(e));
}
Expand All @@ -378,7 +379,7 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return zkFuture;
}).thenAccept(result -> {
if (result != null) {
future.complete(Optional.of((Entry<T, Stat>) result));
future.complete(Optional.of((Entry<T, Stat>) result.getLeft()));
} else {
future.complete(Optional.empty());
}
Expand All @@ -390,6 +391,30 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return future;
}

private <T> void checkAndRefreshExpiredEntry(String path, final Deserializer<T> deserializer) {
CompletableFuture<Pair<Entry<Object, Stat>, Long>> result = dataCache.getIfPresent(path);
if (result != null && result.isDone()) {
Pair<Entry<Object, Stat>, Long> entryPair = result.getNow(null);
if (entryPair != null && entryPair.getRight() != null) {
if ((System.nanoTime() - entryPair.getRight()) > TimeUnit.SECONDS.toMillis(cacheExpirySeconds)) {
this.zkSession.get().getData(path, this, (rc, path1, ctx, content, stat) -> {
if (rc != Code.OK.intValue()) {
log.warn("Failed to refresh zookeeper-cache for {} due to {}", path, rc);
return;
}
try {
T obj = deserializer.deserialize(path, content);
dataCache.put(path, CompletableFuture.completedFuture(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
log.warn("Failed to refresh zookeeper-cache for {}", path, e);
}
}, null);
}
}
}
}

/**
* Simple ZooKeeperChildrenCache use this method to invalidate cache entry on watch event w/o automatic re-loading
*
Expand Down Expand Up @@ -465,9 +490,9 @@ public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watc

@SuppressWarnings("unchecked")
public <T> T getDataIfPresent(String path) {
CompletableFuture<Map.Entry<Object, Stat>> f = dataCache.getIfPresent(path);
CompletableFuture<Pair<Entry<Object, Stat>, Long>> f = dataCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
return (T) f.join().getKey();
return (T) f.join().getLeft().getKey();
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -563,6 +564,63 @@ public String deserialize(String key, byte[] content) throws Exception {
scheduledExecutor.shutdown();
}

/**
* Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background.
*
* @throws Exception
*/
@Test
public void testZKRefreshExpiredEntry() throws Exception {
int cacheExpiryTimeSec = 1;
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));

String path = "/test";
String val1 = "test-1";
String val2 = "test-2";
zkClient.create(path, val1.getBytes(), null, null);

// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new ZooKeeperCacheTest("test", zkClient, 30, executor, cacheExpiryTimeSec);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};

// try to do get on the path which will time-out and async-cache will have non-completed Future
assertEquals(zkCache.get(path).get(), val1);

zkClient.setData(path, val2.getBytes(), -1);

retryStrategically((test) -> {
try {
return zkCache.get(path).get().equalsIgnoreCase(val2);
} catch (Exception e) {
log.warn("Failed to get date for path {}", path);
}
return false;
}, 5, 1000);

assertEquals(zkCache.get(path).get(), val2);

executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}

static class ZooKeeperCacheTest extends ZooKeeperCache {

public ZooKeeperCacheTest(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
OrderedExecutor executor, int cacheExpirySeconds) {
super(cacheName, zkSession, zkOperationTimeoutSeconds, executor, cacheExpirySeconds);
}

}

private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
Expand Down

0 comments on commit 1379947

Please sign in to comment.