diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 3c93a6d39e2..2c8260c6efa 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -79,7 +79,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; -import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; @@ -106,6 +105,7 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,9 +127,9 @@ public class ClientContext implements AccumuloClient { private static final Logger log = LoggerFactory.getLogger(ClientContext.class); private final ClientInfo info; - private InstanceId instanceId; + private final Supplier instanceId; private final ZooReader zooReader; - private final ZooCache zooCache; + private final Supplier zooCache; private Credentials creds; private BatchWriterConfig batchWriterConfig; @@ -229,8 +229,38 @@ public ClientContext(SingletonReservation reservation, ClientInfo info, this.info = info; this.hadoopConf = info.getHadoopConf(); zooReader = new ZooReader(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()); - zooCache = - new ZooCacheFactory().getZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()); + + // Get the instanceID using ZooKeeper, not ZooCache as we will need + // the instanceID to create the ZooKeeper root path when creating + // ZooCache. If the instanceId cannot be found, this is a good + // time to find out. + final String instanceName = info.getInstanceName(); + final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + + instanceId = memoize(() -> { + try { + // Call getZooReader() instead of using the local variable because + // ServerContext overrides getZooReader() to return a connection + // that has been set with the secret. + byte[] data = getZooReader().getData(instanceNamePath); + if (data == null) { + throw new IllegalArgumentException("Instance name " + instanceName + + " does not exist in zookeeper. " + + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + final String instanceIdString = new String(data, UTF_8); + // verify that the instanceId found via the instanceName actually exists as an instance + if (getZooReader().getData(Constants.ZROOT + "/" + instanceIdString) == null) { + throw new IllegalArgumentException("Instance id " + instanceIdString + + (instanceName == null ? "" : " pointed to by the name " + instanceName) + + " does not exist in zookeeper"); + } + return InstanceId.of(instanceIdString); + } catch (KeeperException | InterruptedException e) { + throw new IllegalArgumentException("Unable to create client, instanceId not found", e); + } + }); + zooCache = memoize(() -> new ZooCache(ZooUtil.getRoot(getInstanceID()), getZooReader(), null)); this.serverConf = serverConf; timeoutSupplier = memoizeWithExpiration( () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS); @@ -484,26 +514,7 @@ public synchronized TCredentials rpcCreds() { * @return a UUID */ public InstanceId getInstanceID() { - if (instanceId == null) { - // lookup by name - final String instanceName = info.getInstanceName(); - String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; - byte[] data = zooCache.get(instanceNamePath); - if (data == null) { - throw new RuntimeException( - "Instance name " + instanceName + " does not exist in zookeeper. " - + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); - } - String instanceIdString = new String(data, UTF_8); - // verify that the instanceId found via the instanceName actually exists as an instance - if (zooCache.get(Constants.ZROOT + "/" + instanceIdString) == null) { - throw new RuntimeException("Instance id " + instanceIdString - + (instanceName == null ? "" : " pointed to by the name " + instanceName) - + " does not exist in zookeeper"); - } - instanceId = InstanceId.of(instanceIdString); - } - return instanceId; + return instanceId.get(); } public String getZooKeeperRoot() { @@ -541,7 +552,7 @@ public int getZooKeepersSessionTimeOut() { } public ZooCache getZooCache() { - return zooCache; + return zooCache.get(); } private TableZooHelper tableZooHelper; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 9b260d2ba05..7c22b7d8a87 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; @@ -60,7 +59,6 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.DeprecatedPropertyUtil; import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; @@ -467,22 +465,6 @@ public void waitForBalance() throws AccumuloException { } - /** - * Given a zooCache and instanceId, look up the instance name. - */ - public static String lookupInstanceName(ZooCache zooCache, InstanceId instanceId) { - checkArgument(zooCache != null, "zooCache is null"); - checkArgument(instanceId != null, "instanceId is null"); - for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { - var bytes = zooCache.get(Constants.ZROOT + Constants.ZINSTANCES + "/" + name); - InstanceId iid = InstanceId.of(new String(bytes, UTF_8)); - if (iid.equals(instanceId)) { - return name; - } - } - return null; - } - @Override public InstanceId getInstanceId() { return context.getInstanceID(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index 596037c1de1..73c2172c305 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -25,15 +25,19 @@ import java.util.ConcurrentModificationException; import java.util.List; import java.util.Optional; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Predicate; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.cache.Caches; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; @@ -44,26 +48,50 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + /** * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ public class ZooCache { private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private final ZCacheWatcher watcher = new ZCacheWatcher(); + protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS, + Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, + Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, + Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET}; + + protected final TreeSet watchedPaths = new TreeSet<>(); + // visible for tests + protected final ZCacheWatcher watcher = new ZCacheWatcher(); private final Watcher externalWatcher; private static final AtomicLong nextCacheId = new AtomicLong(0); private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); - // The concurrent map returned by Caffiene will only allow one thread to run at a time for a given + public static final Duration CACHE_DURATION = Duration.ofMinutes(30); + + // public and non-final because this is being set + // in tests to test the eviction + @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", + justification = "being set in tests for eviction test") + public static Ticker ticker = Ticker.systemTicker(); + + // Construct this here, otherwise end up with NPE in some cases + // when the Watcher tries to access nodeCache. Alternative would + // be to mark nodeCache as volatile. + private final Cache cache = + Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker) + .expireAfterAccess(CACHE_DURATION).build(); + + // The concurrent map returned by Caffeine will only allow one thread to run at a time for a given // key and ZooCache relies on that. Not all concurrent map implementations have this behavior for // their compute functions. - private final ConcurrentMap nodeCache; + private final ConcurrentMap nodeCache = cache.asMap(); private final ZooReader zReader; @@ -114,21 +142,28 @@ private ZooKeeper getZooKeeper() { return zReader.getZooKeeper(); } - private class ZCacheWatcher implements Watcher { + public class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{}: {}", cacheId, event); + log.trace("Watcher Event {} {}: {}", cacheId, event.getType(), event); } switch (event.getType()) { - case NodeDataChanged: - case NodeChildrenChanged: case NodeCreated: + case NodeChildrenChanged: + case NodeDataChanged: case NodeDeleted: case ChildWatchRemoved: case DataWatchRemoved: - remove(event.getPath()); + // This code use to call remove(path), but that was when a Watcher was set + // on each node. With the Watcher being set at a higher level we need to remove + // the parent of the affected node and all of its children from the cache + // so that the parent and children node can be re-cached. If we only remove the + // affected node, then the cached children in the parent could be incorrect. + int lastSlash = event.getPath().lastIndexOf('/'); + String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash); + clear((path) -> path.startsWith(parent)); break; case None: switch (event.getState()) { @@ -172,35 +207,45 @@ public void process(WatchedEvent event) { * @param reader ZooKeeper reader * @param watcher watcher object */ - public ZooCache(ZooReader reader, Watcher watcher) { - this(reader, watcher, Duration.ofMinutes(3)); - } - - public ZooCache(ZooReader reader, Watcher watcher, Duration timeout) { + public ZooCache(String zooRoot, ZooReader reader, Watcher watcher) { this.zReader = reader; this.externalWatcher = watcher; - RemovalListener removalListerner = (path, zcNode, reason) -> { - try { - log.trace("{} removing watches for {} because {} accesses {}", cacheId, path, reason, - zcNode == null ? -1 : zcNode.getAccessCount()); - reader.getZooKeeper().removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, - false); - } catch (InterruptedException | KeeperException | RuntimeException e) { - log.warn("{} failed to remove watches on path {} in zookeeper", cacheId, path, e); - } - }; - // Must register the removal listener using evictionListener inorder for removal to be mutually - // exclusive with any other operations on the same path. This is important for watcher - // consistency, concurrently adding and removing watches for the same path would leave zoocache - // in a really bad state. The cache builder has another way to register a removal listener that - // is not mutually exclusive. - Cache cache = - Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) - .expireAfterAccess(timeout).evictionListener(removalListerner).build(); - nodeCache = cache.asMap(); + setupWatchers(zooRoot); log.trace("{} created new cache", cacheId, new Exception()); } + // Visible for testing + protected void setupWatchers(String zooRoot) { + try { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + zReader.getZooKeeper().addWatch(zPath, this.watcher, AddWatchMode.PERSISTENT_RECURSIVE); + log.trace("Added persistent recursive watcher at {}", zPath); + } + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error setting up persistent recursive watcher", e); + } + } + + private boolean isWatchedPath(String path) { + // Check that the path is equal to, or a descendant of, a watched path + for (String watchedPath : watchedPaths) { + if (path.startsWith(watchedPath)) { + return true; + } + } + return false; + } + + // Use this instead of Preconditions.checkState(isWatchedPath, String) + // so that we are not creating String unnecessarily. + private void ensureWatched(String path) { + if (!isWatchedPath(path)) { + throw new IllegalStateException("Supplied path " + path + " is not watched by this ZooCache"); + } + } + private abstract static class ZooRunnable { /** * Runs an operation against ZooKeeper. Retries are performed by the retry method when @@ -298,6 +343,7 @@ private ZcInterruptedException(InterruptedException e) { */ public List getChildren(final String zPath) { Preconditions.checkState(!closed); + ensureWatched(zPath); ZooRunnable> zr = new ZooRunnable<>() { @@ -324,12 +370,12 @@ public List run() throws KeeperException, InterruptedException { // That is ok because the compute() call on the map has a lock and processing the event // will block until compute() returns. After compute() returns the event processing // would clear the map entry. - Stat stat = zooKeeper.exists(zPath, watcher); + Stat stat = zooKeeper.exists(zPath, null); if (stat == null) { log.trace("{} getChildren saw that {} does not exists", cacheId, zPath); return ZcNode.NON_EXISTENT; } - List children = zooKeeper.getChildren(zPath, watcher); + List children = zooKeeper.getChildren(zPath, null); log.trace("{} adding {} children of {} to cache", cacheId, children.size(), zPath); return new ZcNode(children, zcn); } catch (KeeperException.NoNodeException nne) { @@ -372,6 +418,7 @@ public byte[] get(final String zPath) { */ public byte[] get(final String zPath, final ZcStat status) { Preconditions.checkState(!closed); + ensureWatched(zPath); ZooRunnable zr = new ZooRunnable<>() { @Override @@ -403,7 +450,7 @@ public byte[] run() throws KeeperException, InterruptedException { */ try { final ZooKeeper zooKeeper = getZooKeeper(); - Stat stat = zooKeeper.exists(zPath, watcher); + Stat stat = zooKeeper.exists(zPath, null); if (stat == null) { if (log.isTraceEnabled()) { log.trace("{} zookeeper did not contain {}", cacheId, zPath); @@ -413,7 +460,7 @@ public byte[] run() throws KeeperException, InterruptedException { byte[] data = null; ZcStat zstat = null; try { - data = zooKeeper.getData(zPath, watcher, stat); + data = zooKeeper.getData(zPath, null, stat); zstat = new ZcStat(stat); } catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) { throw new ConcurrentModificationException(e1); @@ -459,16 +506,10 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) { } } - private void remove(String zPath) { - nodeCache.remove(zPath); - log.trace("{} removed {} from cache", cacheId, zPath); - updateCount.incrementAndGet(); - } - /** * Clears this cache. */ - public void clear() { + private void clear() { Preconditions.checkState(!closed); nodeCache.clear(); updateCount.incrementAndGet(); @@ -496,6 +537,7 @@ public long getUpdateCount() { */ @VisibleForTesting public boolean dataCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedData(); } @@ -508,6 +550,7 @@ public boolean dataCached(String zPath) { */ @VisibleForTesting public boolean childrenCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedChildren(); } @@ -518,20 +561,15 @@ public boolean childrenCached(String zPath) { public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); - Predicate pathPredicateToUse; - if (log.isTraceEnabled()) { - pathPredicateToUse = path -> { - boolean testResult = pathPredicate.test(path); - if (testResult) { - log.trace("{} removing {} from cache", cacheId, path); - } - return testResult; - }; - } else { - pathPredicateToUse = pathPredicate; - } - nodeCache.keySet().removeIf(pathPredicateToUse); - updateCount.incrementAndGet(); + Predicate pathPredicateWrapper = path -> { + boolean testResult = isWatchedPath(path) && pathPredicate.test(path); + if (testResult) { + updateCount.incrementAndGet(); + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + nodeCache.keySet().removeIf(pathPredicateWrapper); } /** @@ -540,10 +578,12 @@ public void clear(Predicate pathPredicate) { * @param zPath path of top node */ public void clear(String zPath) { + ensureWatched(zPath); clear(path -> path.startsWith(zPath)); } public Optional getLockData(ServiceLockPath path) { + ensureWatched(path.toString()); List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { return Optional.empty(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java deleted file mode 100644 index 2e1987af344..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate.zookeeper; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.singletons.SingletonManager; -import org.apache.accumulo.core.singletons.SingletonService; - -/** - * A factory for {@link ZooCache} instances. - *

- * Implementation note: We were using the instances map to track all the instances that have been - * created, so we could explicitly close them when the SingletonManager detected that the last - * legacy client (using Connector/ZooKeeperInstance) has gone away. This class may no longer be - * needed, since the legacy client code has been removed, so long as the ZooCache instances it is - * tracking are managed as resources within ClientContext or ServerContext, and explicitly closed - * when those are closed. - */ -public class ZooCacheFactory { - - private static final Map instances = new HashMap<>(); - private static boolean enabled = true; - - public ZooCacheFactory() {} - - private static boolean isEnabled() { - synchronized (instances) { - return enabled; - } - } - - private static void enable() { - synchronized (instances) { - enabled = true; - } - } - - private static void disable() { - synchronized (instances) { - try { - instances.values().forEach(ZooCache::close); - } finally { - instances.clear(); - enabled = false; - } - } - } - - static { - // important because of ZOOKEEPER-2368.. when zookeeper client is closed it does not generate an - // event! - SingletonManager.register(new SingletonService() { - - @Override - public synchronized boolean isEnabled() { - return ZooCacheFactory.isEnabled(); - } - - @Override - public synchronized void enable() { - ZooCacheFactory.enable(); - } - - @Override - public synchronized void disable() { - ZooCacheFactory.disable(); - } - }); - - } - - /** - * Gets a {@link ZooCache}. The same object may be returned for multiple calls with the same - * arguments. - * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout session timeout - * @return cache object - */ - public ZooCache getZooCache(String zooKeepers, int sessionTimeout) { - String key = zooKeepers + ":" + sessionTimeout; - synchronized (instances) { - if (!isEnabled()) { - throw new IllegalStateException("The Accumulo singleton for zookeeper caching is " - + "disabled. This is likely caused by all AccumuloClients being closed"); - } - return instances.computeIfAbsent(key, k -> getNewZooCache(zooKeepers, sessionTimeout)); - } - } - - /** - * Always return a new {@link ZooCache}. - * - * @param zooKeepers comma-separated list of ZooKeeper host[:port]s - * @param sessionTimeout session timeout - * @return a new instance - */ - public ZooCache getNewZooCache(String zooKeepers, int sessionTimeout) { - return new ZooCache(new ZooReader(zooKeepers, sessionTimeout), null); - } - - /** - * Resets the factory. All cached objects are flushed. - */ - void reset() { - synchronized (instances) { - instances.clear(); - } - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/util/tables/TableZooHelper.java b/core/src/main/java/org/apache/accumulo/core/util/tables/TableZooHelper.java index 66a41768d69..1ab1e1d708f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/tables/TableZooHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/util/tables/TableZooHelper.java @@ -173,7 +173,7 @@ public TableState getTableState(TableId tableId, boolean clearCachedState) { String statePath = context.getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId.canonical() + Constants.ZTABLE_STATE; if (clearCachedState) { - context.getZooCache().clear(context.getZooKeeperRoot() + statePath); + context.getZooCache().clear(statePath); instanceToMapCache.invalidateAll(); } ZooCache zc = context.getZooCache(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java deleted file mode 100644 index 7db2dd9df2e..00000000000 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheFactoryTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate.zookeeper; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertSame; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class ZooCacheFactoryTest { - private ZooCacheFactory zcf; - - @BeforeEach - public void setUp() { - zcf = new ZooCacheFactory(); - } - - @AfterEach - public void tearDown() { - zcf.reset(); - } - - @Test - public void testGetZooCache() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getZooCache(zks1, timeout1); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertSame(zc1, zc1a); - - String zks2 = "zk2"; - int timeout2 = 1000; - ZooCache zc2 = zcf.getZooCache(zks2, timeout2); - assertNotSame(zc1, zc2); - - String zks3 = "zk1"; - int timeout3 = 2000; - ZooCache zc3 = zcf.getZooCache(zks3, timeout3); - assertNotSame(zc1, zc3); - } - - @Test - public void testGetNewZooCache() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getNewZooCache(zks1, timeout1); - assertNotNull(zc1); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertNotSame(zc1, zc1a); - ZooCache zc1b = zcf.getNewZooCache(zks1, timeout1); - assertNotSame(zc1, zc1b); - assertNotSame(zc1a, zc1b); - } - - @Test - public void testReset() { - String zks1 = "zk1"; - int timeout1 = 1000; - ZooCache zc1 = zcf.getZooCache(zks1, timeout1); - zcf.reset(); - ZooCache zc1a = zcf.getZooCache(zks1, timeout1); - assertNotSame(zc1, zc1a); - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java index 3aced2e103e..43366ea73e7 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java @@ -19,12 +19,10 @@ package org.apache.accumulo.core.fate.zookeeper; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.createStrictMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -35,36 +33,62 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ZooCacheTest { - private static final String ZPATH = "/some/path/in/zk"; + + private static class TestZooCache extends ZooCache { + + /** + * Test class that extends ZooCache to suppress the creation of the persistent recursive + * watchers that are created in the constructor and to provide access to the watcher. + */ + public TestZooCache(String zooRoot, ZooReader reader, Watcher watcher) { + super(zooRoot, reader, watcher); + } + + @Override + protected void setupWatchers(String zooRoot) { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + } + } + + public void executeWatcher(WatchedEvent event) { + // simulate ZooKeeper calling our Watcher + watcher.process(event); + } + + } + + private static final String instancePath = Constants.ZROOT + "/" + UUID.randomUUID().toString(); + private static final String root = instancePath + Constants.ZTSERVERS; + private static final String ZPATH = root + "/testPath"; private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; private static final List CHILDREN = java.util.Arrays.asList("huey", "dewey", "louie"); private ZooReader zr; private ZooKeeper zk; - private ZooCache zc; + private TestZooCache zc; @BeforeEach - public void setUp() { + public void setUp() throws KeeperException, InterruptedException { zr = createMock(ZooReader.class); zk = createStrictMock(ZooKeeper.class); - expect(zr.getZooKeeper()).andReturn(zk); - expectLastCall().anyTimes(); + expect(zr.getZooKeeper()).andReturn(zk).anyTimes(); replay(zr); - - zc = new ZooCache(zr, null); + zc = new TestZooCache(instancePath, zr, null); } @Test @@ -85,8 +109,8 @@ private void testGet(boolean fillStat) throws Exception { final long ephemeralOwner = 123456789L; Stat existsStat = new Stat(); existsStat.setEphemeralOwner(ephemeralOwner); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(DATA); replay(zk); assertFalse(zc.dataCached(ZPATH)); @@ -294,29 +318,25 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchData(initialData); - w.process(event); + watchData(initialData); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.dataCached(ZPATH)); } - private Watcher watchData(byte[] initialData) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchData(byte[] initialData) throws Exception { Stat existsStat = new Stat(); if (initialData != null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))) - .andReturn(initialData); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(initialData); } else { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } replay(zk); zc.get(ZPATH); assertTrue(zc.dataCached(ZPATH)); - - return cw.getValue(); } @Test @@ -408,11 +428,11 @@ private void testGetBoth(boolean getDataFirst) throws Exception { private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception { WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchData(DATA); + watchData(DATA); assertTrue(zc.dataCached(ZPATH)); - w.process(event); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertFalse(zc.dataCached(ZPATH)); } @@ -442,27 +462,24 @@ private void testWatchChildrenNode(List initialChildren, WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zr, exw); + zc = new TestZooCache(instancePath, zr, exw); - Watcher w = watchChildren(initialChildren); - w.process(event); + watchChildren(initialChildren); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.childrenCached(ZPATH)); } - private Watcher watchChildren(List initialChildren) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchChildren(List initialChildren) throws Exception { if (initialChildren == null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } else { Stat existsStat = new Stat(); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getChildren(eq(ZPATH), capture(cw))).andReturn(initialChildren); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getChildren(eq(ZPATH), eq(null))).andReturn(initialChildren); } replay(zk); zc.getChildren(ZPATH); assertTrue(zc.childrenCached(ZPATH)); - - return cw.getValue(); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java index aa7d2149dde..ee269a868e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.server; +import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; @@ -29,13 +30,11 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.InstanceOperationsImpl; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonManager.Mode; @@ -44,6 +43,7 @@ import org.apache.accumulo.server.security.SystemCredentials; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.KeeperException; public class ServerInfo implements ClientInfo { @@ -54,7 +54,6 @@ public class ServerInfo implements ClientInfo { private final String zooKeepers; private final int zooKeepersSessionTimeOut; private final VolumeManager volumeManager; - private final ZooCache zooCache; private final ServerDirs serverDirs; private final Credentials credentials; @@ -71,22 +70,26 @@ public class ServerInfo implements ClientInfo { } catch (IOException e) { throw new UncheckedIOException(e); } - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); - String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; - byte[] iidb = zooCache.get(instanceNamePath); - if (iidb == null) { - throw new IllegalStateException( - "Instance name " + instanceName + " does not exist in zookeeper. " - + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); - } - instanceID = InstanceId.of(new String(iidb, UTF_8)); - if (zooCache.get(ZooUtil.getRoot(instanceID)) == null) { - if (instanceName == null) { + final ZooReader zooReader = new ZooReader(this.zooKeepers, this.zooKeepersSessionTimeOut); + final String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + try { + byte[] iidb = zooReader.getData(instanceNamePath); + if (iidb == null) { throw new IllegalStateException( - "Instance id " + instanceID + " does not exist in zookeeper"); + "Instance name " + instanceName + " does not exist in zookeeper. " + + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + instanceID = InstanceId.of(new String(iidb, UTF_8)); + if (zooReader.getData(ZooUtil.getRoot(instanceID)) == null) { + if (instanceName == null) { + throw new IllegalStateException( + "Instance id " + instanceID + " does not exist in zookeeper"); + } + throw new IllegalStateException("Instance id " + instanceID + " pointed to by the name " + + instanceName + " does not exist in zookeeper"); } - throw new IllegalStateException("Instance id " + instanceID + " pointed to by the name " - + instanceName + " does not exist in zookeeper"); + } catch (KeeperException | InterruptedException e) { + throw new IllegalArgumentException("Unabled to create client, instanceId not found", e); } serverDirs = new ServerDirs(siteConfig, hadoopConf); credentials = SystemCredentials.get(instanceID, siteConfig); @@ -106,9 +109,14 @@ public class ServerInfo implements ClientInfo { instanceID = VolumeManager.getInstanceIDFromHdfs(instanceIdPath, hadoopConf); zooKeepers = config.get(Property.INSTANCE_ZK_HOST); zooKeepersSessionTimeOut = (int) config.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); - instanceName = InstanceOperationsImpl.lookupInstanceName(zooCache, instanceID); credentials = SystemCredentials.get(instanceID, siteConfig); + final ZooReader zooReader = new ZooReader(this.zooKeepers, this.zooKeepersSessionTimeOut); + try { + instanceName = lookupInstanceName(zooReader, instanceID); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Unable to lookup instanceName for instanceID: " + instanceID, + e); + } } ServerInfo(SiteConfiguration config, String instanceName, InstanceId instanceID) { @@ -123,12 +131,25 @@ public class ServerInfo implements ClientInfo { this.instanceID = instanceID; zooKeepers = config.get(Property.INSTANCE_ZK_HOST); zooKeepersSessionTimeOut = (int) config.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut); this.instanceName = instanceName; serverDirs = new ServerDirs(siteConfig, hadoopConf); credentials = SystemCredentials.get(instanceID, siteConfig); } + private String lookupInstanceName(ZooReader zr, InstanceId instanceId) + throws KeeperException, InterruptedException { + checkArgument(zr != null, "zooReader is null"); + checkArgument(instanceId != null, "instanceId is null"); + for (String name : zr.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { + var bytes = zr.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + name); + InstanceId iid = InstanceId.of(new String(bytes, UTF_8)); + if (iid.equals(instanceId)) { + return name; + } + } + return null; + } + public SiteConfiguration getSiteConfiguration() { return siteConfig; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index b6e51b412f7..51d94833510 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; @@ -77,7 +76,6 @@ public interface Listener { private final Listener cback; private final ServerContext context; - private ZooCache zooCache; public class TServerConnection { private final HostAndPort address; @@ -213,13 +211,6 @@ public LiveTServerSet(ServerContext context, Listener cback) { this.context = context; } - public synchronized ZooCache getZooCache() { - if (zooCache == null) { - zooCache = new ZooCache(context.getZooReader(), this); - } - return zooCache; - } - public synchronized void startListeningForTabletServerChanges() { scanServers(); @@ -267,7 +258,8 @@ private synchronized void checkServer(final Set updates, final TServerInfo info = current.get(tserverPath.getServer()); ZcStat stat = new ZcStat(); - Optional sld = ServiceLock.getLockData(getZooCache(), tserverPath, stat); + Optional sld = + ServiceLock.getLockData(this.context.getZooCache(), tserverPath, stat); if (sld.isEmpty()) { if (info != null) { @@ -482,7 +474,7 @@ public synchronized void remove(TServerInstance server) { log.error("FATAL: {}", msg, e); Halt.halt(msg, -1); } - getZooCache().clear(slp.toString()); + this.context.getZooCache().clear(slp.toString()); } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index e1fd176bae0..b12929a7160 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -41,7 +41,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.manager.thrift.FateOperation; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; @@ -74,7 +73,6 @@ public class SecurityOperation { private final PermissionHandler permHandle; private final boolean isKerberos; private final Supplier rootUserName; - private final ZooCache zooCache; private final String zkUserPath; protected final ServerContext context; @@ -105,8 +103,8 @@ protected SecurityOperation(ServerContext context, Authorizor author, Authentica PermissionHandler pm) { this.context = context; zkUserPath = context.zkUserPath(); - zooCache = new ZooCache(context.getZooReader(), null); - rootUserName = Suppliers.memoize(() -> new String(zooCache.get(zkUserPath), UTF_8)); + rootUserName = + Suppliers.memoize(() -> new String(context.getZooCache().get(zkUserPath), UTF_8)); authorizor = author; authenticator = authent; permHandle = pm; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java index 7aa23061c54..ece2ca9471b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.clientImpl.DelegationTokenImpl; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -51,7 +50,6 @@ public class KerberosAuthenticator implements Authenticator { Set.of(KerberosToken.class.getName(), SystemToken.class.getName()); private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator(); - private ZooCache zooCache; private ServerContext context; private String zkUserPath; private UserImpersonation impersonation; @@ -59,7 +57,6 @@ public class KerberosAuthenticator implements Authenticator { @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); impersonation = new UserImpersonation(context.getConfiguration()); zkAuthenticator.initialize(context); zkUserPath = context.zkUserPath(); @@ -71,12 +68,9 @@ public boolean validSecurityHandlers() { } private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], - NodeExistsPolicy.FAIL); - } + this.context.getZooCache().clear(zkUserPath + "/" + principal); + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL); } @Override @@ -84,22 +78,20 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } - - // prep parent node of users with root username - // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value - byte[] principalData = principal.getBytes(UTF_8); - zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); - - // Create the root user in ZK using base64 encoded name (since the name is included in the - // znode) - createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); } + + // prep parent node of users with root username + // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value + byte[] principalData = principal.getBytes(UTF_8); + zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); + + // Create the root user in ZK using base64 encoded name (since the name is included in the + // znode) + createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); } catch (KeeperException | InterruptedException e) { log.error("Failed to initialize security", e); throw new IllegalStateException(e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java index 10b7c4b880d..d67831c48fc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,12 +43,10 @@ public final class ZKAuthenticator implements Authenticator { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); zkUserPath = context.zkUserPath(); } @@ -58,18 +55,16 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); + } - // prep parent node of users with root username - zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); + // prep parent node of users with root username + zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); - constructUser(principal, ZKSecurityTool.createPass(token)); - } + constructUser(principal, ZKSecurityTool.createPass(token)); } catch (KeeperException | AccumuloException | InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -82,16 +77,14 @@ public void initializeSecurity(String principal, byte[] token) { */ private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + user, pass, NodeExistsPolicy.FAIL); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.putPrivatePersistentData(zkUserPath + "/" + user, pass, NodeExistsPolicy.FAIL); } @Override public Set listUsers() { - return new TreeSet<>(zooCache.getChildren(zkUserPath)); + return new TreeSet<>(this.context.getZooCache().getChildren(zkUserPath)); } @Override @@ -120,11 +113,8 @@ public void createUser(String principal, AuthenticationToken token) @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooReaderWriter().recursiveDelete(zkUserPath + "/" + user, - NodeMissingPolicy.FAIL); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); + context.getZooReaderWriter().recursiveDelete(zkUserPath + "/" + user, NodeMissingPolicy.FAIL); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -146,11 +136,9 @@ public void changePassword(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; if (userExists(principal)) { try { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + principal); - context.getZooReaderWriter().putPrivatePersistentData(zkUserPath + "/" + principal, - ZKSecurityTool.createPass(pt.getPassword()), NodeExistsPolicy.OVERWRITE); - } + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + principal)); + context.getZooReaderWriter().putPrivatePersistentData(zkUserPath + "/" + principal, + ZKSecurityTool.createPass(pt.getPassword()), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e); @@ -169,7 +157,7 @@ public void changePassword(String principal, AuthenticationToken token) @Override public boolean userExists(String user) { - return zooCache.get(zkUserPath + "/" + user) != null; + return this.context.getZooCache().get(zkUserPath + "/" + user) != null; } @Override @@ -186,11 +174,11 @@ public boolean authenticateUser(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; byte[] zkData; String zpath = zkUserPath + "/" + principal; - zkData = zooCache.get(zpath); + zkData = this.context.getZooCache().get(zpath); boolean result = authenticateUser(principal, pt, zkData); if (!result) { - zooCache.clear(zpath); - zkData = zooCache.get(zpath); + this.context.getZooCache().clear(zpath); + zkData = this.context.getZooCache().get(zpath); result = authenticateUser(principal, pt, zkData); } return result; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java index 8c0691d86c0..0814fec9a4c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,18 +43,16 @@ public class ZKAuthorizor implements Authorizor { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooReader(), null); zkUserPath = context.zkUserPath(); } @Override public Authorizations getCachedUserAuthorizations(String user) { - byte[] authsBytes = zooCache.get(zkUserPath + "/" + user + ZKUserAuths); + byte[] authsBytes = this.context.getZooCache().get(zkUserPath + "/" + user + ZKUserAuths); if (authsBytes != null) { return ZKSecurityTool.convertAuthorizations(authsBytes); } @@ -105,11 +102,9 @@ public void initUser(String user) throws AccumuloSecurityException { @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - ZooReaderWriter zoo = context.getZooReaderWriter(); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + ZooReaderWriter zoo = context.getZooReaderWriter(); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); + this.context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -127,11 +122,9 @@ public void dropUser(String user) throws AccumuloSecurityException { public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooReaderWriter().putPersistentData(zkUserPath + "/" + user + ZKUserAuths, - ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); - } + this.context.getZooCache().clear(zkUserPath + "/" + user + ZKUserAuths); + context.getZooReaderWriter().putPersistentData(zkUserPath + "/" + user + ZKUserAuths, + ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java index f5b3768a854..c45760e449d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -53,18 +52,18 @@ public class ZKPermHandler implements PermissionHandler { private static final Logger log = LoggerFactory.getLogger(ZKPermHandler.class); + private ServerContext ctx; private ZooReaderWriter zoo; private String zkUserPath; private String ZKTablePath; private String ZKNamespacePath; - private ZooCache zooCache; private final String ZKUserSysPerms = "/System"; private final String ZKUserTablePerms = "/Tables"; private final String ZKUserNamespacePerms = "/Namespaces"; @Override public void initialize(ServerContext context) { - zooCache = new ZooCache(context.getZooReader(), null); + ctx = context; zoo = context.getZooReaderWriter(); zkUserPath = context.zkUserPath(); ZKTablePath = context.getZooKeeperRoot() + Constants.ZTABLES; @@ -113,7 +112,8 @@ public boolean hasTablePermission(String user, String table, TablePermission per @Override public boolean hasCachedTablePermission(String user, String table, TablePermission permission) { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); if (serializedPerms != null) { return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission); } @@ -164,8 +164,8 @@ public boolean hasNamespacePermission(String user, String namespace, @Override public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) { - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); if (serializedPerms != null) { return ZKSecurityTool.convertNamespacePermissions(serializedPerms).contains(permission); } @@ -176,7 +176,7 @@ public boolean hasCachedNamespacePermission(String user, String namespace, public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { try { - byte[] permBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] permBytes = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); Set perms; if (permBytes == null) { perms = new TreeSet<>(); @@ -185,11 +185,9 @@ public void grantSystemPermission(String user, SystemPermission permission) } if (perms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(perms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache().clear(zkUserPath + "/" + user + ZKUserSysPerms); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, + ZKSecurityTool.convertSystemPermissions(perms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -204,7 +202,8 @@ public void grantSystemPermission(String user, SystemPermission permission) public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { Set tablePerms; - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); if (serializedPerms != null) { tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); } else { @@ -213,11 +212,9 @@ public void grantTablePermission(String user, String table, TablePermission perm try { if (tablePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache().clear(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -232,8 +229,8 @@ public void grantTablePermission(String user, String table, TablePermission perm public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { Set namespacePerms; - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); if (serializedPerms != null) { namespacePerms = ZKSecurityTool.convertNamespacePermissions(serializedPerms); } else { @@ -242,12 +239,10 @@ public void grantNamespacePermission(String user, String namespace, try { if (namespacePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(namespacePerms), - NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache() + .clear(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + ZKSecurityTool.convertNamespacePermissions(namespacePerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -261,7 +256,7 @@ public void grantNamespacePermission(String user, String namespace, @Override public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { - byte[] sysPermBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] sysPermBytes = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); // User had no system permission, nothing to revoke. if (sysPermBytes == null) { @@ -272,11 +267,10 @@ public void revokeSystemPermission(String user, SystemPermission permission) try { if (sysPerms.remove(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(sysPerms), NodeExistsPolicy.OVERWRITE); - } + this.ctx.getZooCache() + .clear((path) -> path.startsWith(zkUserPath + "/" + user + ZKUserSysPerms)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, + ZKSecurityTool.convertSystemPermissions(sysPerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -290,7 +284,8 @@ public void revokeSystemPermission(String user, SystemPermission permission) @Override public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); // User had no table permission, nothing to revoke. if (serializedPerms == null) { @@ -300,7 +295,8 @@ public void revokeTablePermission(String user, String table, TablePermission per Set tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); try { if (tablePerms.remove(permission)) { - zooCache.clear(); + this.ctx.getZooCache().clear( + (path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); if (tablePerms.isEmpty()) { zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP); @@ -321,8 +317,8 @@ public void revokeTablePermission(String user, String table, TablePermission per @Override public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = this.ctx.getZooCache() + .get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); // User had no namespace permission, nothing to revoke. if (serializedPerms == null) { @@ -333,7 +329,8 @@ public void revokeNamespacePermission(String user, String namespace, ZKSecurityTool.convertNamespacePermissions(serializedPerms); try { if (namespacePerms.remove(permission)) { - zooCache.clear(); + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); if (namespacePerms.isEmpty()) { zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP); @@ -355,12 +352,11 @@ public void revokeNamespacePermission(String user, String namespace, @Override public void cleanTablePermissions(String table) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - NodeMissingPolicy.SKIP); - } + for (String user : this.ctx.getZooCache().getChildren(zkUserPath)) { + this.ctx.getZooCache().clear( + (path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -374,12 +370,11 @@ public void cleanTablePermissions(String table) throws AccumuloSecurityException @Override public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - NodeMissingPolicy.SKIP); - } + for (String user : this.ctx.getZooCache().getChildren(zkUserPath)) { + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -455,11 +450,10 @@ public void initUser(String user) throws AccumuloSecurityException { */ private void createTablePerm(String user, TableId table, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL); - } + this.ctx.getZooCache() + .clear((path) -> path.startsWith(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, + ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL); } /** @@ -468,22 +462,19 @@ private void createTablePerm(String user, TableId table, Set pe */ private void createNamespacePerm(String user, NamespaceId namespace, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL); - } + this.ctx.getZooCache().clear((path) -> path + .startsWith(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace)); + zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL); } @Override public void cleanUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); + this.ctx.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -523,7 +514,7 @@ public boolean hasSystemPermission(String user, SystemPermission permission) { @Override public boolean hasCachedSystemPermission(String user, SystemPermission permission) { - byte[] perms = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] perms = this.ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); if (perms == null) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java index 86e33cc8f2a..3f15997b33f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java @@ -122,7 +122,7 @@ public TableManager(ServerContext context) { zkRoot = context.getZooKeeperRoot(); instanceID = context.getInstanceID(); zoo = context.getZooReaderWriter(); - zooStateCache = new ZooCache(zoo, new TableStateWatcher()); + zooStateCache = new ZooCache(zkRoot, zoo, new TableStateWatcher()); updateTableStateCache(); } @@ -256,6 +256,7 @@ public void process(WatchedEvent event) { log.trace("{}", event); } final String zPath = event.getPath(); + final EventType zType = event.getType(); String tablesPrefix = zkRoot + Constants.ZTABLES; @@ -270,9 +271,14 @@ public void process(WatchedEvent event) { } } if (tableId == null) { - log.warn("Unknown path in {}", event); + // not a path we care about + log.trace("Unknown path in {}", event); return; } + } else { + // not a path we care about + log.trace("Event fired for path {}, not something we care about", event); + return; } switch (zType) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index af98fd0d201..0450ade990c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -89,7 +89,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean System.out.println("INFO : Using ZooKeepers " + keepers); ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS); - ZooCache cache = new ZooCache(rdr, null); + ZooCache cache = new ZooCache(Constants.ZROOT, rdr, null); TreeMap instanceNames = getInstanceNames(rdr, printErrors); diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java index 6621feb93ff..1446c3f64fc 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.security.Authorizations; @@ -145,15 +146,17 @@ public void testUserAuthentication() throws Exception { var instanceId = InstanceId.of("example"); ServerContext context = MockServerContext.getWithZK(instanceId, "", 30_000); ZooReaderWriter zr = createMock(ZooReaderWriter.class); + ZooCache zc = createMock(ZooCache.class); expect(context.getZooReader()).andReturn(zr).anyTimes(); + expect(context.getZooCache()).andReturn(zc).anyTimes(); ZooKeeper zk = createMock(ZooKeeper.class); expect(zk.getChildren(anyObject(), anyObject())).andReturn(Arrays.asList(principal)).anyTimes(); expect(zk.exists(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), anyObject(Watcher.class))).andReturn(new Stat()).anyTimes(); expect(zr.getZooKeeper()).andReturn(zk).anyTimes(); - expect(zk.getData(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), - anyObject(), anyObject())).andReturn(newHash).once(); - replay(context, zr, zk); + expect(zc.get(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal))) + .andReturn(newHash); + replay(context, zr, zk, zc); // creating authenticator ZKAuthenticator auth = new ZKAuthenticator(); @@ -162,6 +165,6 @@ public void testUserAuthentication() throws Exception { PasswordToken token = new PasswordToken(rawPass.clone()); // verifying that if the new type of hash is stored in zk authentication works as expected assertTrue(auth.authenticateUser(principal, token)); - verify(context, zr, zk); + verify(context, zr, zk, zc); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 7270d8018f4..d2132762313 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -65,7 +64,6 @@ public class RecoveryManager { private final Cache existenceCache; private final ScheduledExecutorService executor; private final Manager manager; - private final ZooCache zooCache; public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { this.manager = manager; @@ -76,7 +74,6 @@ public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); - zooCache = new ZooCache(manager.getContext().getZooReader(), null); try { List workIDs = new DistributedWorkQueue(manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY, @@ -182,7 +179,7 @@ public boolean recoverLogs(KeyExtent extent, Collection walogs) throws sortQueued = sortsQueued.contains(sortId); } - if (sortQueued && zooCache.get( + if (sortQueued && this.manager.getContext().getZooCache().get( manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) == null) { synchronized (this) { sortsQueued.remove(sortId); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 021ee3dbde9..93e2835130f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -64,7 +64,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -204,8 +203,6 @@ private TabletMetadataLoader(Ample ample) { private ScanServerMetrics scanServerMetrics; private BlockCacheMetrics blockCacheMetrics; - private final ZooCache managerLockCache; - public ScanServer(ConfigOpts opts, String[] args) { super("sserver", opts, ServerContext::new, args); @@ -216,8 +213,6 @@ public ScanServer(ConfigOpts opts, String[] args) { this.resourceManager = new TabletServerResourceManager(context, this); - this.managerLockCache = new ZooCache(context.getZooReader(), null); - var readWriteLock = new ReentrantReadWriteLock(); reservationsReadLock = readWriteLock.readLock(); reservationsWriteLock = readWriteLock.writeLock(); @@ -1138,11 +1133,6 @@ public ServiceLock getLock() { return scanServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - @Override public BlockCacheConfiguration getBlockCacheConfiguration(AccumuloConfiguration acuConf) { return BlockCacheConfiguration.forScanServer(acuConf); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 0b5c2466473..aa8d9247f34 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockPaths; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -910,11 +911,12 @@ static void checkPermission(SecurityOperation security, ServerContext context, new ZooUtil.LockID(context.getServerPaths().createManagerPath().toString(), lock); try { - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { // maybe the cache is out of date and a new manager holds the // lock? - server.getManagerLockCache().clear(); - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + server.getContext().getZooCache().clear( + (path) -> path.equals(ServiceLockPaths.parse(Optional.empty(), lid.path).toString())); + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { log.warn("Got {} message from a manager that does not hold the current lock {}", request, lock); throw new RuntimeException("bad manager lock"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 8e5047bb40d..9760341a995 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -20,7 +20,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.scan.ScanServerInfo; @@ -58,7 +57,5 @@ public interface TabletHostingServer { ServiceLock getLock(); - ZooCache getManagerLockCache(); - BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index a7282c0688a..54121a3646d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -74,7 +74,6 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -160,8 +159,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private static final Logger log = LoggerFactory.getLogger(TabletServer.class); private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = TimeUnit.HOURS.toMillis(1); - final ZooCache managerLockCache; - final TabletServerLogger logger; private TabletServerMetrics metrics; @@ -232,7 +229,6 @@ protected TabletServer(ConfigOpts opts, Function serverContextFactory, String[] args) { super("tserver", opts, serverContextFactory, args); context = super.getContext(); - this.managerLockCache = new ZooCache(context.getZooReader(), null); final AccumuloConfiguration aconf = getConfiguration(); log.info("Version " + Constants.VERSION); log.info("Instance " + getInstanceID()); @@ -478,11 +474,6 @@ public ServiceLock getLock() { return tabletServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - private void announceExistence() { ZooReaderWriter zoo = getContext().getZooReaderWriter(); try { diff --git a/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java index ce0e21c4689..0b5ef9d119b 100644 --- a/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -96,7 +96,6 @@ public void alterConfig() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build(); ClientContext context = (ClientContext) client) { ZooCache zcache = context.getZooCache(); - zcache.clear(); var path = context.getServerPaths().createGarbageCollectorPath(); Optional gcLockData; do { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index be17ed550fb..ac98bbe1fe0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -46,7 +46,7 @@ public static void main(String[] args) throws Exception { File myfile = new File(reportDir + "/" + UUID.randomUUID()); myfile.deleteOnExit(); - ZooCache zc = new ZooCache(new ZooReader(keepers, 30000), null); + ZooCache zc = new ZooCache("/", new ZooReader(keepers, 30000), null); while (true) { if (myfile.exists() && !myfile.delete()) { diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java index 645b7c70d2f..e374fb3fb9f 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java @@ -20,17 +20,17 @@ import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; -import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.test.util.Wait; @@ -41,19 +41,46 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.github.benmanes.caffeine.cache.Ticker; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + @Tag(ZOOKEEPER_TESTING_SERVER) public class ZooCacheIT { + public static class ZooCacheTicker implements Ticker { + + private int advanceCounter = 0; + + @Override + public long read() { + return System.nanoTime() + (advanceCounter * ZooCache.CACHE_DURATION.toNanos()); + } + + public void advance() { + advanceCounter++; + } + + public void reset() { + advanceCounter = 0; + } + + } + private ZooKeeperTestingServer szk = null; private ZooReaderWriter zk = null; + private ZooCacheTicker ticker = new ZooCacheTicker(); @TempDir private File tempDir; @BeforeEach + @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", + justification = "setting ticker in test for eviction test") public void setup() throws Exception { szk = new ZooKeeperTestingServer(tempDir); zk = szk.getZooReaderWriter(); + ZooCache.ticker = ticker; } @AfterEach @@ -71,125 +98,133 @@ public void testGetChildren() throws Exception { watchesRemoved.add(event.getPath()); } }; - ZooCache zooCache = new ZooCache(zk, watcher, Duration.ofSeconds(3)); - zk.mkdirs("/test2"); - zk.mkdirs("/test3/c1"); - zk.mkdirs("/test3/c2"); + final String root = Constants.ZROOT + UUID.randomUUID().toString(); + + ZooCache zooCache = new ZooCache(root, zk, watcher); + + final String base = root + Constants.ZTSERVERS; + + zk.mkdirs(base + "/test2"); + zk.mkdirs(base + "/test3/c1"); + zk.mkdirs(base + "/test3/c2"); // cache non-existence of /test1 and existence of /test2 and /test3 long uc1 = zooCache.getUpdateCount(); - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); long uc2 = zooCache.getUpdateCount(); assertTrue(uc1 < uc2); - assertEquals(List.of(), zooCache.getChildren("/test2")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); long uc3 = zooCache.getUpdateCount(); assertTrue(uc2 < uc3); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); long uc4 = zooCache.getUpdateCount(); assertTrue(uc3 < uc4); // The cache should be stable now and new accesses should not change the update count - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); // once getChildren discovers that a node does not exists, then get data will also know this - assertNull(zooCache.get("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertNull(zooCache.get(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc4, zooCache.getUpdateCount()); // Had cached non-existence of "/test1", should get a notification that it was created - zk.mkdirs("/test1"); + zk.mkdirs(base + "/test1"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test1"); + var children = zooCache.getChildren(base + "/test1"); return children != null && children.isEmpty(); }); long uc5 = zooCache.getUpdateCount(); assertTrue(uc4 < uc5); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc5, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc5b = zooCache.getUpdateCount(); + assertTrue(uc5 < uc5b); // add a child to /test3, should get a notification of the change - zk.mkdirs("/test3/c3"); + zk.mkdirs(base + "/test3/c3"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); + System.out.println("children: " + children); return children != null && children.size() == 3; }); long uc6 = zooCache.getUpdateCount(); - assertTrue(uc5 < uc6); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc5b < uc6); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc6, zooCache.getUpdateCount()); // remove a child from /test3 - zk.delete("/test3/c2"); + zk.delete(base + "/test3/c2"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); return children != null && children.size() == 2; }); long uc7 = zooCache.getUpdateCount(); assertTrue(uc6 < uc7); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc7, zooCache.getUpdateCount()); // remove /test2, should start caching that it does not exist - zk.delete("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") == null); + zk.delete(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") == null); long uc8 = zooCache.getUpdateCount(); assertTrue(uc7 < uc8); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertNull(zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc8, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertNull(zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc8b = zooCache.getUpdateCount(); + assertTrue(uc8 < uc8b); // add /test2 back, should update - zk.mkdirs("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") != null); + zk.mkdirs(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") != null); long uc9 = zooCache.getUpdateCount(); - assertTrue(uc8 < uc9); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc9, zooCache.getUpdateCount()); + assertTrue(uc8b < uc9); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc9b = zooCache.getUpdateCount(); + assertTrue(uc9 < uc9b); // make multiple changes. the cache should see all of these - zk.delete("/test1"); - zk.mkdirs("/test2/ca"); - zk.delete("/test3/c1"); - zk.mkdirs("/test3/c4"); - zk.delete("/test3/c4"); - zk.mkdirs("/test3/c5"); + zk.delete(base + "/test1"); + zk.mkdirs(base + "/test2/ca"); + zk.delete(base + "/test3/c1"); + zk.mkdirs(base + "/test3/c4"); + zk.delete(base + "/test3/c4"); + zk.mkdirs(base + "/test3/c5"); Wait.waitFor(() -> { - var children1 = zooCache.getChildren("/test1"); - var children2 = zooCache.getChildren("/test2"); - var children3 = zooCache.getChildren("/test3"); + var children1 = zooCache.getChildren(base + "/test1"); + var children2 = zooCache.getChildren(base + "/test2"); + var children3 = zooCache.getChildren(base + "/test3"); return children1 == null && children2 != null && children2.size() == 1 && children3 != null && Set.copyOf(children3).equals(Set.of("c3", "c5")); }); long uc10 = zooCache.getUpdateCount(); - assertTrue(uc9 < uc10); - assertNull(zooCache.getChildren("/test1")); - assertEquals(List.of("ca"), zooCache.getChildren("/test2")); - assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc9b < uc10); + assertNull(zooCache.getChildren(base + "/test1")); + assertEquals(List.of("ca"), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc10, zooCache.getUpdateCount()); // wait for the cache to evict and clear watches + ticker.advance(); Wait.waitFor(() -> { // the cache will not run its eviction handler unless accessed, so access something that is // not expected to be evicted - zooCache.getChildren("/test4"); - return watchesRemoved.equals(Set.of("/test1", "/test2", "/test3")); + zooCache.getChildren(base + "/test4"); + return zooCache.childrenCached(base + "/test1") == false + && zooCache.childrenCached(base + "/test2") == false + && zooCache.childrenCached(base + "/test3") == false; }); - - assertFalse(zooCache.childrenCached("/test1")); - assertFalse(zooCache.childrenCached("/test2")); - assertFalse(zooCache.childrenCached("/test3")); } }