From a64dbf5b06ca1a73dc2ad6c7d31e679e312082aa Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Fri, 5 May 2023 15:58:42 +0800 Subject: [PATCH] ZOOKEEPER-4466: Support different watch modes on same path (#1859) Signed-off-by: Kezhu Wang Co-authored-by: tison --- .../org/apache/zookeeper/server/DataTree.java | 5 +- .../zookeeper/server/watch/IWatchManager.java | 9 -- .../zookeeper/server/watch/WatchManager.java | 115 ++++++++++-------- .../zookeeper/server/watch/WatchStats.java | 89 ++++++++++++++ .../zookeeper/server/watch/WatcherMode.java | 2 +- .../server/watch/WatcherModeManager.java | 96 --------------- .../server/watch/RecursiveWatchQtyTest.java | 50 ++------ .../test/PersistentRecursiveWatcherTest.java | 48 ++++++++ 8 files changed, 219 insertions(+), 195 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java delete mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index a6f60539089..603cb0b3803 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -675,7 +675,9 @@ public String getMaxPrefixWithQuota(String path) { public void addWatch(String basePath, Watcher watcher, int mode) { WatcherMode watcherMode = WatcherMode.fromZooDef(mode); dataWatches.addWatch(basePath, watcher, watcherMode); - childWatches.addWatch(basePath, watcher, watcherMode); + if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) { + childWatches.addWatch(basePath, watcher, watcherMode); + } } public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { @@ -1511,7 +1513,6 @@ public void setWatches(long relativeZxid, List dataWatches, List this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT); } for (String path : persistentRecursiveWatches) { - this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805a0..4eea5eca03a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -144,13 +144,4 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * */ void dumpWatches(PrintWriter pwriter, boolean byPath); - - /** - * Return the current number of recursive watchers - * - * @return qty - */ - default int getRecursiveWatchQty() { - return 0; - } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059b2..c85c3d84639 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.watch; import java.io.PrintWriter; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager { private final Map> watchTable = new HashMap<>(); - private final Map> watch2Paths = new HashMap<>(); + private final Map> watch2Paths = new HashMap<>(); - private final WatcherModeManager watcherModeManager = new WatcherModeManager(); + private int recursiveWatchQty = 0; @Override public synchronized int size() { @@ -84,25 +85,34 @@ public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode w } list.add(watcher); - Set paths = watch2Paths.get(watcher); + Map paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here - paths = new HashSet<>(); + paths = new HashMap<>(); watch2Paths.put(watcher, paths); } - watcherModeManager.setWatcherMode(watcher, path, watcherMode); + WatchStats stats = paths.getOrDefault(path, WatchStats.NONE); + WatchStats newStats = stats.addMode(watcherMode); - return paths.add(path); + if (newStats != stats) { + paths.put(path, newStats); + if (watcherMode.isRecursive()) { + ++recursiveWatchQty; + } + return true; + } + + return false; } @Override public synchronized void removeWatcher(Watcher watcher) { - Set paths = watch2Paths.remove(watcher); + Map paths = watch2Paths.remove(watcher); if (paths == null) { return; } - for (String p : paths) { + for (String p : paths.keySet()) { Set list = watchTable.get(p); if (list != null) { list.remove(watcher); @@ -110,7 +120,11 @@ public synchronized void removeWatcher(Watcher watcher) { watchTable.remove(p); } } - watcherModeManager.removeWatcher(watcher, p); + } + for (WatchStats stats : paths.values()) { + if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + --recursiveWatchQty; + } } } @@ -123,8 +137,8 @@ public WatcherOrBitSet triggerWatch(String path, EventType type) { public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); Set watchers = new HashSet<>(); - PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { + PathParentIterator pathParentIterator = getPathParentIterator(path); for (String localPath : pathParentIterator.asIterable()) { Set thisWatchers = watchTable.get(localPath); if (thisWatchers == null || thisWatchers.isEmpty()) { @@ -133,20 +147,23 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet Iterator iterator = thisWatchers.iterator(); while (iterator.hasNext()) { Watcher watcher = iterator.next(); - WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath); - if (watcherMode.isRecursive()) { - if (type != EventType.NodeChildrenChanged) { - watchers.add(watcher); - } - } else if (!pathParentIterator.atParentPath()) { + Map paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap()); + WatchStats stats = paths.get(localPath); + if (stats == null) { + LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath); + continue; + } + if (!pathParentIterator.atParentPath()) { watchers.add(watcher); - if (!watcherMode.isPersistent()) { + WatchStats newStats = stats.removeMode(WatcherMode.STANDARD); + if (newStats == WatchStats.NONE) { iterator.remove(); - Set paths = watch2Paths.get(watcher); - if (paths != null) { - paths.remove(localPath); - } + paths.remove(localPath); + } else if (newStats != stats) { + paths.put(localPath, newStats); } + } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + watchers.add(watcher); } } if (thisWatchers.isEmpty()) { @@ -199,7 +216,7 @@ public synchronized String toString() { sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n"); int total = 0; - for (Set paths : watch2Paths.values()) { + for (Map paths : watch2Paths.values()) { total += paths.size(); } sb.append("Total watches:").append(total); @@ -219,10 +236,10 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { } } } else { - for (Entry> e : watch2Paths.entrySet()) { + for (Entry> e : watch2Paths.entrySet()) { pwriter.print("0x"); pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId())); - for (String path : e.getValue()) { + for (String path : e.getValue().keySet()) { pwriter.print("\t"); pwriter.println(path); } @@ -232,31 +249,28 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { @Override public synchronized boolean containsWatcher(String path, Watcher watcher) { - WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path); - PathParentIterator pathParentIterator = getPathParentIterator(path); - for (String localPath : pathParentIterator.asIterable()) { - Set watchers = watchTable.get(localPath); - if (!pathParentIterator.atParentPath()) { - if (watchers != null) { - return true; // at the leaf node, all watcher types match - } - } - if (watcherMode.isRecursive()) { - return true; - } - } - return false; + Set list = watchTable.get(path); + return list != null && list.contains(watcher); } @Override public synchronized boolean removeWatcher(String path, Watcher watcher) { - Set paths = watch2Paths.get(watcher); - if (paths == null || !paths.remove(path)) { + Map paths = watch2Paths.get(watcher); + if (paths == null) { return false; } + WatchStats stats = paths.remove(path); + if (stats == null) { + return false; + } + if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + --recursiveWatchQty; + } + Set list = watchTable.get(path); if (list == null || !list.remove(watcher)) { + LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher); return false; } @@ -264,17 +278,20 @@ public synchronized boolean removeWatcher(String path, Watcher watcher) { watchTable.remove(path); } - watcherModeManager.removeWatcher(watcher, path); - return true; } + // VisibleForTesting + Map> getWatch2Paths() { + return watch2Paths; + } + @Override public synchronized WatchesReport getWatches() { Map> id2paths = new HashMap<>(); - for (Entry> e : watch2Paths.entrySet()) { + for (Entry> e : watch2Paths.entrySet()) { Long id = ((ServerCnxn) e.getKey()).getSessionId(); - Set paths = new HashSet<>(e.getValue()); + Set paths = new HashSet<>(e.getValue().keySet()); id2paths.put(id, paths); } return new WatchesReport(id2paths); @@ -296,7 +313,7 @@ public synchronized WatchesPathReport getWatchesByPath() { @Override public synchronized WatchesSummary getWatchesSummary() { int totalWatches = 0; - for (Set paths : watch2Paths.values()) { + for (Map paths : watch2Paths.values()) { totalWatches += paths.size(); } return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches); @@ -305,13 +322,13 @@ public synchronized WatchesSummary getWatchesSummary() { @Override public void shutdown() { /* do nothing */ } - @Override - public int getRecursiveWatchQty() { - return watcherModeManager.getRecursiveQty(); + // VisibleForTesting + synchronized int getRecursiveWatchQty() { + return recursiveWatchQty; } private PathParentIterator getPathParentIterator(String path) { - if (watcherModeManager.getRecursiveQty() == 0) { + if (getRecursiveWatchQty() == 0) { return PathParentIterator.forPathOnly(path); } return PathParentIterator.forAll(path); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java new file mode 100644 index 00000000000..fd0c0259edd --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchStats.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +/** + * Statistics for multiple different watches on one node. + */ +public final class WatchStats { + private static final WatchStats[] WATCH_STATS = new WatchStats[] { + new WatchStats(0), // NONE + new WatchStats(1), // STANDARD + new WatchStats(2), // PERSISTENT + new WatchStats(3), // STANDARD + PERSISTENT + new WatchStats(4), // PERSISTENT_RECURSIVE + new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE + new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE + new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE + }; + + /** + * Stats that have no watchers attached. + * + *

This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}. + */ + public static final WatchStats NONE = WATCH_STATS[0]; + + private final int flags; + + private WatchStats(int flags) { + this.flags = flags; + } + + private static int modeToFlag(WatcherMode mode) { + return 1 << mode.ordinal(); + } + + /** + * Compute stats after given mode attached to node. + * + * @param mode watcher mode + * @return a new stats if given mode is not attached to this node before, otherwise old stats + */ + public WatchStats addMode(WatcherMode mode) { + int flags = this.flags | modeToFlag(mode); + return WATCH_STATS[flags]; + } + + /** + * Compute stats after given mode removed from node. + * + * @param mode watcher mode + * @return null if given mode is the last attached mode, otherwise a new stats + */ + public WatchStats removeMode(WatcherMode mode) { + int mask = ~modeToFlag(mode); + int flags = this.flags & mask; + if (flags == 0) { + return NONE; + } + return WATCH_STATS[flags]; + } + + /** + * Check whether given mode is attached to this node. + * + * @param mode watcher mode + * @return true if given mode is attached to this node. + */ + public boolean hasMode(WatcherMode mode) { + int flags = modeToFlag(mode); + return (this.flags & flags) != 0; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java index b8a1dda7408..e05ba900e51 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java @@ -23,7 +23,7 @@ public enum WatcherMode { STANDARD(false, false), PERSISTENT(true, false), - PERSISTENT_RECURSIVE(true, true) + PERSISTENT_RECURSIVE(true, true), ; public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java deleted file mode 100644 index c1a8225f8ae..00000000000 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.watch; - -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.zookeeper.Watcher; - -class WatcherModeManager { - private final Map watcherModes = new ConcurrentHashMap<>(); - private final AtomicInteger recursiveQty = new AtomicInteger(0); - - private static class Key { - private final Watcher watcher; - private final String path; - - Key(Watcher watcher, String path) { - this.watcher = watcher; - this.path = path; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Key key = (Key) o; - return watcher.equals(key.watcher) && path.equals(key.path); - } - - @Override - public int hashCode() { - return Objects.hash(watcher, path); - } - } - - // VisibleForTesting - Map getWatcherModes() { - return watcherModes; - } - - void setWatcherMode(Watcher watcher, String path, WatcherMode mode) { - if (mode == WatcherMode.DEFAULT_WATCHER_MODE) { - removeWatcher(watcher, path); - } else { - adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode), mode); - } - } - - WatcherMode getWatcherMode(Watcher watcher, String path) { - return watcherModes.getOrDefault(new Key(watcher, path), WatcherMode.DEFAULT_WATCHER_MODE); - } - - void removeWatcher(Watcher watcher, String path) { - adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)), WatcherMode.DEFAULT_WATCHER_MODE); - } - - int getRecursiveQty() { - return recursiveQty.get(); - } - - // recursiveQty is an optimization to avoid having to walk the map every time this value is needed - private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) { - if (oldMode == null) { - oldMode = WatcherMode.DEFAULT_WATCHER_MODE; - } - if (oldMode.isRecursive() != newMode.isRecursive()) { - if (newMode.isRecursive()) { - recursiveQty.incrementAndGet(); - } else { - recursiveQty.decrementAndGet(); - } - } - } -} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java index 78b13bb3348..0582ddafc84 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java @@ -51,28 +51,6 @@ public void setup() { watchManager = new WatchManager(); } - @Test - public void testRecursiveQty() { - WatcherModeManager manager = new WatcherModeManager(); - DummyWatcher watcher = new DummyWatcher(); - manager.setWatcherMode(watcher, "/a", WatcherMode.DEFAULT_WATCHER_MODE); - assertEquals(0, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE); - assertEquals(1, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE); - assertEquals(2, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE); - assertEquals(2, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT); - assertEquals(1, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE); - assertEquals(2, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a/b", WatcherMode.DEFAULT_WATCHER_MODE); - assertEquals(1, manager.getRecursiveQty()); - manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT); - assertEquals(0, manager.getRecursiveQty()); - } - @Test public void testAddRemove() { Watcher watcher1 = new DummyWatcher(); @@ -125,7 +103,7 @@ public void testSameWatcherMultiPath() { } @Test - public void testChangeType() { + public void testDifferentWatchModes() { Watcher watcher = new DummyWatcher(); watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT); @@ -133,15 +111,14 @@ public void testChangeType() { watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE); assertEquals(1, watchManager.getRecursiveWatchQty()); watchManager.addWatch("/a", watcher, WatcherMode.STANDARD); - assertEquals(0, watchManager.getRecursiveWatchQty()); + assertEquals(1, watchManager.getRecursiveWatchQty()); assertTrue(watchManager.removeWatcher("/a", watcher)); assertEquals(0, watchManager.getRecursiveWatchQty()); } @Test - public void testRecursiveQtyConcurrency() { - ThreadLocalRandom random = ThreadLocalRandom.current(); - WatcherModeManager manager = new WatcherModeManager(); + public void testRecursiveQtyConcurrency() throws Exception { + WatchManager manager = new WatchManager(); ExecutorService threadPool = Executors.newFixedThreadPool(clientQty); List> tasks = null; CountDownLatch completedLatch = new CountDownLatch(clientQty); @@ -149,11 +126,7 @@ public void testRecursiveQtyConcurrency() { tasks = IntStream.range(0, clientQty) .mapToObj(__ -> threadPool.submit(() -> iterate(manager, completedLatch))) .collect(Collectors.toList()); - try { - completedLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + completedLatch.await(); } finally { if (tasks != null) { tasks.forEach(t -> t.cancel(true)); @@ -161,14 +134,15 @@ public void testRecursiveQtyConcurrency() { threadPool.shutdownNow(); } - int expectedRecursiveQty = (int) manager.getWatcherModes().values() + int expectedRecursiveQty = (int) manager.getWatch2Paths().values() .stream() - .filter(mode -> mode == WatcherMode.PERSISTENT_RECURSIVE) + .flatMap(paths -> paths.values().stream()) + .filter(stats -> stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) .count(); - assertEquals(expectedRecursiveQty, manager.getRecursiveQty()); + assertEquals(expectedRecursiveQty, manager.getRecursiveWatchQty()); } - private void iterate(WatcherModeManager manager, CountDownLatch completedLatch) { + private void iterate(WatchManager manager, CountDownLatch completedLatch) { ThreadLocalRandom random = ThreadLocalRandom.current(); try { for (int i = 0; i < iterations; ++i) { @@ -176,9 +150,9 @@ private void iterate(WatcherModeManager manager, CountDownLatch completedLatch) boolean doSet = random.nextInt(100) > 33; // 2/3 will be sets if (doSet) { WatcherMode mode = WatcherMode.values()[random.nextInt(WatcherMode.values().length)]; - manager.setWatcherMode(new DummyWatcher(), path, mode); + manager.addWatch(path, new DummyWatcher(), mode); } else { - manager.removeWatcher(new DummyWatcher(), path); + manager.removeWatcher(path, new DummyWatcher()); } int sleepMillis = random.nextInt(2); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index e74ee2fd683..077af3c456c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -18,6 +18,7 @@ package org.apache.zookeeper.test; +import static org.apache.zookeeper.AddWatchMode.PERSISTENT; import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -171,6 +172,53 @@ public void testMultiClient() } } + @Test + public void testSamePathWithDifferentWatchModes() throws Exception { + try (ZooKeeper zk = createClient()) { + BlockingQueue dataEvents = new LinkedBlockingQueue<>(); + BlockingQueue childEvents = new LinkedBlockingQueue<>(); + BlockingQueue persistentEvents = new LinkedBlockingQueue<>(); + BlockingQueue recursiveEvents = new LinkedBlockingQueue<>(); + + zk.addWatch("/a", persistentEvents::add, PERSISTENT); + zk.addWatch("/a", recursiveEvents::add, PERSISTENT_RECURSIVE); + zk.exists("/a", dataEvents::add); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEvent(dataEvents, Watcher.Event.EventType.NodeCreated, "/a"); + assertEvent(persistentEvents, Watcher.Event.EventType.NodeCreated, "/a"); + assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a"); + + zk.getData("/a", dataEvents::add, null); + zk.setData("/a", new byte[0], -1); + assertEvent(dataEvents, Watcher.Event.EventType.NodeDataChanged, "/a"); + assertEvent(persistentEvents, Watcher.Event.EventType.NodeDataChanged, "/a"); + assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDataChanged, "/a"); + + zk.getChildren("/a", childEvents::add); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(recursiveEvents, Watcher.Event.EventType.NodeCreated, "/a/b"); + + zk.getChildren("/a", childEvents::add); + zk.delete("/a/b", -1); + assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(persistentEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a/b"); + + zk.getChildren("/a", childEvents::add); + zk.getData("/a", dataEvents::add, null); + zk.exists("/a", dataEvents::add); + zk.delete("/a", -1); + assertEvent(childEvents, Watcher.Event.EventType.NodeDeleted, "/a"); + assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a"); + assertEvent(dataEvents, Watcher.Event.EventType.NodeDeleted, "/a"); + assertEvent(persistentEvents, Watcher.Event.EventType.NodeDeleted, "/a"); + assertEvent(recursiveEvents, Watcher.Event.EventType.NodeDeleted, "/a"); + } + } + @Test public void testRootWatcher() throws IOException, InterruptedException, KeeperException {