From 13799472e702504155d33c5cda2e08573cc108a0 Mon Sep 17 00:00:00 2001 From: rajan Date: Mon, 19 Oct 2020 20:51:33 -0700 Subject: [PATCH] [pulsar-broker] Refresh ZooKeeper-data cache in background to avoid deadlock and blocking IO on ZK thread fix test --- .../broker/service/BkEnsemblesTestBase.java | 1 + .../pulsar/zookeeper/ZooKeeperCache.java | 55 +++++++++++++----- .../pulsar/zookeeper/ZookeeperCacheTest.java | 58 +++++++++++++++++++ 3 files changed, 99 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index a4bfe052ef63f4..0616b55960c2c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -77,6 +77,7 @@ protected void setup() throws Exception { config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAdvertisedAddress("127.0.0.1"); config.setAllowAutoTopicCreationType("non-partitioned"); + config.setZooKeeperOperationTimeoutSeconds(1); pulsar = new PulsarService(config); pulsar.start(); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index d05a2d2fe766e2..a39533be0c72a4 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Collections; -import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; @@ -40,13 +39,12 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.stats.CacheMetricsCollector; -import org.apache.zookeeper.AsyncCallback.ChildrenCallback; -import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -83,19 +81,20 @@ public static interface CacheUpdater { public static final String ZK_CACHE_INSTANCE = "zk_cache_instance"; - protected final AsyncLoadingCache> dataCache; + protected final AsyncLoadingCache, Long>> dataCache; protected final AsyncLoadingCache> childrenCache; protected final AsyncLoadingCache existsCache; private final OrderedExecutor executor; private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build(); private boolean shouldShutdownExecutor; private final int zkOperationTimeoutSeconds; - private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes + private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300; //5 minutes + private final int cacheExpirySeconds; protected AtomicReference zkSession = new AtomicReference(null); public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) { - this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS); + this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, DEFAULT_CACHE_EXPIRY_SECONDS); } public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, @@ -105,10 +104,10 @@ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTime this.executor = executor; this.zkSession.set(zkSession); this.shouldShutdownExecutor = false; + this.cacheExpirySeconds = cacheExpirySeconds; this.dataCache = Caffeine.newBuilder() .recordStats() - .expireAfterWrite(cacheExpirySeconds, TimeUnit.SECONDS) .buildAsync((key, executor1) -> null); this.childrenCache = Caffeine.newBuilder() @@ -346,10 +345,12 @@ public CompletableFuture>> getDataAsync(final String checkNotNull(path); checkNotNull(deserializer); - CompletableFuture>> future = new CompletableFuture<>(); + // refresh zk-cache entry in background if it's already expired + checkAndRefreshExpiredEntry(path, deserializer); + CompletableFuture>> future = new CompletableFuture<>(); dataCache.get(path, (p, executor) -> { // Return a future for the z-node to be fetched from ZK - CompletableFuture> zkFuture = new CompletableFuture<>(); + CompletableFuture, Long>> zkFuture = new CompletableFuture<>(); // Broker doesn't restart on global-zk session lost: so handling unexpected exception try { @@ -358,8 +359,8 @@ public CompletableFuture>> getDataAsync(final String try { T obj = deserializer.deserialize(path, content); // avoid using the zk-client thread to process the result - executor.execute( - () -> zkFuture.complete(new SimpleImmutableEntry(obj, stat))); + executor.execute(() -> zkFuture.complete(ImmutablePair + .of(new SimpleImmutableEntry(obj, stat), System.nanoTime()))); } catch (Exception e) { executor.execute(() -> zkFuture.completeExceptionally(e)); } @@ -378,7 +379,7 @@ public CompletableFuture>> getDataAsync(final String return zkFuture; }).thenAccept(result -> { if (result != null) { - future.complete(Optional.of((Entry) result)); + future.complete(Optional.of((Entry) result.getLeft())); } else { future.complete(Optional.empty()); } @@ -390,6 +391,30 @@ public CompletableFuture>> getDataAsync(final String return future; } + private void checkAndRefreshExpiredEntry(String path, final Deserializer deserializer) { + CompletableFuture, Long>> result = dataCache.getIfPresent(path); + if (result != null && result.isDone()) { + Pair, Long> entryPair = result.getNow(null); + if (entryPair != null && entryPair.getRight() != null) { + if ((System.nanoTime() - entryPair.getRight()) > TimeUnit.SECONDS.toMillis(cacheExpirySeconds)) { + this.zkSession.get().getData(path, this, (rc, path1, ctx, content, stat) -> { + if (rc != Code.OK.intValue()) { + log.warn("Failed to refresh zookeeper-cache for {} due to {}", path, rc); + return; + } + try { + T obj = deserializer.deserialize(path, content); + dataCache.put(path, CompletableFuture.completedFuture(ImmutablePair + .of(new SimpleImmutableEntry(obj, stat), System.nanoTime()))); + } catch (Exception e) { + log.warn("Failed to refresh zookeeper-cache for {}", path, e); + } + }, null); + } + } + } + } + /** * Simple ZooKeeperChildrenCache use this method to invalidate cache entry on watch event w/o automatic re-loading * @@ -465,9 +490,9 @@ public CompletableFuture> getChildrenAsync(String path, Watcher watc @SuppressWarnings("unchecked") public T getDataIfPresent(String path) { - CompletableFuture> f = dataCache.getIfPresent(path); + CompletableFuture, Long>> f = dataCache.getIfPresent(path); if (f != null && f.isDone() && !f.isCompletedExceptionally()) { - return (T) f.join().getKey(); + return (T) f.join().getLeft().getKey(); } else { return null; } diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index b3db3d9742f4a5..d8bd2c92d021db 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.KeeperException.Code; @@ -563,6 +564,63 @@ public String deserialize(String key, byte[] content) throws Exception { scheduledExecutor.shutdown(); } + /** + * Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background. + * + * @throws Exception + */ + @Test + public void testZKRefreshExpiredEntry() throws Exception { + int cacheExpiryTimeSec = 1; + OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build(); + ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); + ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); + + String path = "/test"; + String val1 = "test-1"; + String val2 = "test-2"; + zkClient.create(path, val1.getBytes(), null, null); + + // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle + // callback-result process + ZooKeeperCache zkCacheService = new ZooKeeperCacheTest("test", zkClient, 30, executor, cacheExpiryTimeSec); + ZooKeeperDataCache zkCache = new ZooKeeperDataCache(zkCacheService) { + @Override + public String deserialize(String key, byte[] content) throws Exception { + return new String(content); + } + }; + + // try to do get on the path which will time-out and async-cache will have non-completed Future + assertEquals(zkCache.get(path).get(), val1); + + zkClient.setData(path, val2.getBytes(), -1); + + retryStrategically((test) -> { + try { + return zkCache.get(path).get().equalsIgnoreCase(val2); + } catch (Exception e) { + log.warn("Failed to get date for path {}", path); + } + return false; + }, 5, 1000); + + assertEquals(zkCache.get(path).get(), val2); + + executor.shutdown(); + zkExecutor.shutdown(); + scheduledExecutor.shutdown(); + } + + static class ZooKeeperCacheTest extends ZooKeeperCache { + + public ZooKeeperCacheTest(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, + OrderedExecutor executor, int cacheExpirySeconds) { + super(cacheName, zkSession, zkOperationTimeoutSeconds, executor, cacheExpirySeconds); + } + + } + private static void retryStrategically(Predicate predicate, int retryCount, long intSleepTimeInMillis) throws Exception { for (int i = 0; i < retryCount; i++) {