Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Refresh ZooKeeper-data cache in background to avoid d… #8304

Merged
merged 1 commit into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void setup() throws Exception {
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);

pulsar = new PulsarService(config);
pulsar.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
Expand All @@ -40,13 +39,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
Expand Down Expand Up @@ -83,19 +81,20 @@ public static interface CacheUpdater<T> {

public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";

protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final AsyncLoadingCache<String, Pair<Entry<Object, Stat>, Long>> dataCache;
protected final AsyncLoadingCache<String, Set<String>> childrenCache;
protected final AsyncLoadingCache<String, Boolean> existsCache;
private final OrderedExecutor executor;
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300; //5 minutes
private final int cacheExpirySeconds;

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, DEFAULT_CACHE_EXPIRY_SECONDS);
}

public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
Expand All @@ -105,10 +104,10 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime
this.executor = executor;
this.zkSession.set(zkSession);
this.shouldShutdownExecutor = false;
this.cacheExpirySeconds = cacheExpirySeconds;

this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(cacheExpirySeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder()
Expand Down Expand Up @@ -346,10 +345,12 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
checkNotNull(path);
checkNotNull(deserializer);

CompletableFuture<Optional<Entry<T, Stat>>> future = new CompletableFuture<>();
// refresh zk-cache entry in background if it's already expired
checkAndRefreshExpiredEntry(path, deserializer);
CompletableFuture<Optional<Entry<T,Stat>>> future = new CompletableFuture<>();
dataCache.get(path, (p, executor) -> {
// Return a future for the z-node to be fetched from ZK
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
CompletableFuture<Pair<Entry<Object, Stat>, Long>> zkFuture = new CompletableFuture<>();

// Broker doesn't restart on global-zk session lost: so handling unexpected exception
try {
Expand All @@ -358,8 +359,8 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
executor.execute(
() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
executor.execute(() -> zkFuture.complete(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
executor.execute(() -> zkFuture.completeExceptionally(e));
}
Expand All @@ -378,7 +379,7 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return zkFuture;
}).thenAccept(result -> {
if (result != null) {
future.complete(Optional.of((Entry<T, Stat>) result));
future.complete(Optional.of((Entry<T, Stat>) result.getLeft()));
} else {
future.complete(Optional.empty());
}
Expand All @@ -390,6 +391,30 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
return future;
}

private <T> void checkAndRefreshExpiredEntry(String path, final Deserializer<T> deserializer) {
CompletableFuture<Pair<Entry<Object, Stat>, Long>> result = dataCache.getIfPresent(path);
if (result != null && result.isDone()) {
Pair<Entry<Object, Stat>, Long> entryPair = result.getNow(null);
if (entryPair != null && entryPair.getRight() != null) {
if ((System.nanoTime() - entryPair.getRight()) > TimeUnit.SECONDS.toMillis(cacheExpirySeconds)) {
this.zkSession.get().getData(path, this, (rc, path1, ctx, content, stat) -> {
if (rc != Code.OK.intValue()) {
log.warn("Failed to refresh zookeeper-cache for {} due to {}", path, rc);
return;
}
try {
T obj = deserializer.deserialize(path, content);
dataCache.put(path, CompletableFuture.completedFuture(ImmutablePair
.of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
} catch (Exception e) {
log.warn("Failed to refresh zookeeper-cache for {}", path, e);
}
}, null);
}
}
}
}

/**
* Simple ZooKeeperChildrenCache use this method to invalidate cache entry on watch event w/o automatic re-loading
*
Expand Down Expand Up @@ -465,9 +490,9 @@ public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watc

@SuppressWarnings("unchecked")
public <T> T getDataIfPresent(String path) {
CompletableFuture<Map.Entry<Object, Stat>> f = dataCache.getIfPresent(path);
CompletableFuture<Pair<Entry<Object, Stat>, Long>> f = dataCache.getIfPresent(path);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
return (T) f.join().getKey();
return (T) f.join().getLeft().getKey();
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -563,6 +564,63 @@ public String deserialize(String key, byte[] content) throws Exception {
scheduledExecutor.shutdown();
}

/**
* Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background.
*
* @throws Exception
*/
@Test
public void testZKRefreshExpiredEntry() throws Exception {
int cacheExpiryTimeSec = 1;
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));

String path = "/test";
String val1 = "test-1";
String val2 = "test-2";
zkClient.create(path, val1.getBytes(), null, null);

// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new ZooKeeperCacheTest("test", zkClient, 30, executor, cacheExpiryTimeSec);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};

// try to do get on the path which will time-out and async-cache will have non-completed Future
assertEquals(zkCache.get(path).get(), val1);

zkClient.setData(path, val2.getBytes(), -1);

retryStrategically((test) -> {
try {
return zkCache.get(path).get().equalsIgnoreCase(val2);
} catch (Exception e) {
log.warn("Failed to get date for path {}", path);
}
return false;
}, 5, 1000);

assertEquals(zkCache.get(path).get(), val2);

executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}

static class ZooKeeperCacheTest extends ZooKeeperCache {

public ZooKeeperCacheTest(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
OrderedExecutor executor, int cacheExpirySeconds) {
super(cacheName, zkSession, zkOperationTimeoutSeconds, executor, cacheExpirySeconds);
}

}

private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
Expand Down