From 12e490e264c4c49889efecc58b0510f5eedea4b1 Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Wed, 30 Nov 2022 15:52:23 -0500 Subject: [PATCH] Communicate the Zxid that triggered a WatchEvent to fire With the recent addition of persistent watches, many doors have opened up to significantly more performant and intuitive local caches of remote state, but the actual implementation can be difficult because to cache data locally, one needs to execute the following steps: 1. Set the watch 2. Bootstrap the watched subtree 3. Catch up on the events that fired during the bootstrap The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step 3 because it's unknown whether an event arrived during the bootstrap or after. For example, imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies in the local cache. This change sets the Zxid in the response header whenever the watch event type is NodeCreated, NodeDeleted, NodeDataChanged or NodeChildrenChanged. --- .../java/org/apache/zookeeper/ClientCnxn.java | 2 +- .../org/apache/zookeeper/WatchedEvent.java | 49 ++++- .../org/apache/zookeeper/server/DataTree.java | 12 +- .../apache/zookeeper/server/DumbWatcher.java | 6 + .../zookeeper/server/NIOServerCnxn.java | 2 +- .../zookeeper/server/NettyServerCnxn.java | 2 +- .../zookeeper/server/watch/IWatchManager.java | 6 +- .../zookeeper/server/watch/WatchManager.java | 8 +- .../server/watch/WatchManagerOptimized.java | 8 +- .../server/watch/WatchManagerTest.java | 33 +++- .../test/PersistentRecursiveWatcherTest.java | 130 ++++++++------ .../test/UnsupportedAddWatcherTest.java | 4 +- .../zookeeper/test/WatchedEventTest.java | 4 +- .../zookeeper/test/WatcherFuncTest.java | 167 ++++++++++-------- 14 files changed, 273 insertions(+), 160 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 96ff8eb088e..7608415e669 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { event.setPath(clientPath); } - WatchedEvent we = new WatchedEvent(event); + WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java index 1de3d3ddf68..8acc3e83f15 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java @@ -18,6 +18,7 @@ package org.apache.zookeeper; +import java.util.Objects; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -31,27 +32,38 @@ */ @InterfaceAudience.Public public class WatchedEvent { + public static final long NO_ZXID = -1L; private final KeeperState keeperState; private final EventType eventType; - private String path; + private final String path; + private final long zxid; /** - * Create a WatchedEvent with specified type, state and path + * Create a WatchedEvent with specified type, state, path and zxid */ - public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; + this.zxid = zxid; } /** - * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent + * Create a WatchedEvent with specified type, state and path */ - public WatchedEvent(WatcherEvent eventMessage) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + this(eventType, keeperState, path, NO_ZXID); + } + + /** + * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent + */ + public WatchedEvent(WatcherEvent eventMessage, long zxid) { keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); + this.zxid = zxid; } public KeeperState getState() { @@ -66,9 +78,33 @@ public String getPath() { return path; } + public long getZxid() { + return zxid; + } + @Override public String toString() { - return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path; + return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WatchedEvent that = (WatchedEvent) o; + return zxid == that.zxid + && keeperState == that.keeperState + && eventType == that.eventType + && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(keeperState, eventType, path, zxid); } /** @@ -77,5 +113,4 @@ public String toString() { public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index cd45a7c3437..bbc9b97dbaa 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -527,8 +527,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); - dataWatches.triggerWatch(path, Event.EventType.NodeCreated); - childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); + dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid); + childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid); } /** @@ -624,9 +624,9 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce "childWatches.triggerWatch " + parentName); } - WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); - childWatches.triggerWatch(path, EventType.NodeDeleted, processed); - childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); + WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid); + childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed); + childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid); } public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException { @@ -658,7 +658,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time) nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata)); updateWriteStat(path, dataBytes); - dataWatches.triggerWatch(path, EventType.NodeDataChanged); + dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid); return s; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index c7bf830b4ef..93bfc70e7cc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -33,6 +33,7 @@ public class DumbWatcher extends ServerCnxn { private long sessionId; + private long watchedZxid = WatchedEvent.NO_ZXID; public DumbWatcher() { this(0); @@ -49,6 +50,11 @@ void setSessionTimeout(int sessionTimeout) { @Override public void process(WatchedEvent event) { + watchedZxid = event.getZxid(); + } + + public long getWatchedZxid() { + return watchedZxid; } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 5ffc81da1c3..ca18d3411c9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St */ @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f95200d560b..3ed77183434 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -162,7 +162,7 @@ public int getSessionTimeout() { @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805a0..482bc7e87d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -82,10 +82,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid); /** * Distribute the watch event for the given path, but ignore those @@ -93,11 +94,12 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * @param suppress the suppressed watcher set * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress); /** * Get the size of watchers. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059b2..2697808f6ca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); Set watchers = new HashSet<>(); PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java index 1cc7deb9dc0..291fae1adab 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java @@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); BitHashSet watchers = remove(path); if (watchers == null) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index bd613a44488..cef143911f4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -28,6 +28,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.zookeeper.Watcher; @@ -131,7 +132,7 @@ public WatcherTriggerWorker( public void run() { while (!stopped) { String path = PATH_PREFIX + r.nextInt(paths); - WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted); + WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1); if (s != null) { triggeredCount.addAndGet(s.size()); } @@ -416,6 +417,10 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon assertEquals(sum, values.get("sum_" + metricName)); } + private void checkWatchedZxid(DumbWatcher watcher, long expectedZxid) { + assertEquals(expectedZxid, watcher.getWatchedZxid()); + } + @ParameterizedTest @MethodSource("data") public void testWatcherMetrics(String className) throws IOException { @@ -430,28 +435,36 @@ public void testWatcherMetrics(String className) throws IOException { final String path3 = "/path3"; - //both wather1 and wather2 are watching path1 + //both watcher1 and watcher2 are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path3, EventType.NodeCreated); + manager.triggerWatch(path3, EventType.NodeCreated, 1); //path3 is not being watched so metric is 0 checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L); + checkWatchedZxid(watcher1, 1); + checkWatchedZxid(watcher2, 1); //path1 is watched by two watchers so two fired - manager.triggerWatch(path1, EventType.NodeCreated); + manager.triggerWatch(path1, EventType.NodeCreated, 2); checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L); + checkWatchedZxid(watcher1, 2); + checkWatchedZxid(watcher2, 2); //path2 is watched by one watcher so one fired now total is 3 - manager.triggerWatch(path2, EventType.NodeCreated); + manager.triggerWatch(path2, EventType.NodeCreated, 3); checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); + checkWatchedZxid(watcher1, 3); + checkWatchedZxid(watcher2, 2); //watches on path1 are no longer there so zero fired - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 4); checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L); + checkWatchedZxid(watcher1, 3); + checkWatchedZxid(watcher2, 2); //both wather1 and wather2 are watching path1 manager.addWatch(path1, watcher1); @@ -460,11 +473,15 @@ public void testWatcherMetrics(String className) throws IOException { //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 5); checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L); + checkWatchedZxid(watcher1, 5); + checkWatchedZxid(watcher2, 5); - manager.triggerWatch(path2, EventType.NodeDeleted); + manager.triggerWatch(path2, EventType.NodeDeleted, 6); checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L); + checkWatchedZxid(watcher1, 6); + checkWatchedZxid(watcher2, 5); //make sure that node created watch count is not impacted by the fire of other event types checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index 80d8c400cd2..8e6f0a3c737 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -18,10 +18,6 @@ package org.apache.zookeeper.test; -import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -32,13 +28,21 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class PersistentRecursiveWatcherTest extends ClientBase { private static final Logger LOG = LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class); private BlockingQueue events; @@ -80,21 +84,28 @@ public void testBasicAsync() private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException { zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData("/a/b/c/d/e", new byte[0], -1); + + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); + + zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); + + stat = zk.setData("/a/b/c/d/e", new byte[0], -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat); + zk.delete("/a/b/c/d/e", -1); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); + assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid()); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); } @Test @@ -103,14 +114,15 @@ public void testRemoval() try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false); zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b"); + assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID); } } @@ -119,9 +131,9 @@ public void testDisconnect() throws Exception { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); stopServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID); startServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID); internalTestBasic(zk); } } @@ -136,17 +148,15 @@ public void testMultiClient() zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); - zk1.setData("/a/b/c", "one".getBytes(), -1); - Thread.sleep(1000); // give some time for the event to arrive - - zk2.setData("/a/b/c", "two".getBytes(), -1); - zk2.setData("/a/b/c", "three".getBytes(), -1); - zk2.setData("/a/b/c", "four".getBytes(), -1); - - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); + Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + + stat = zk2.setData("/a/b/c", "two".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "three".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "four".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); } } @@ -155,22 +165,42 @@ public void testRootWatcher() throws IOException, InterruptedException, KeeperException { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE); - zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c"); + Stat stat = new Stat(); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid()); + + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid()); + + zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid()); + + zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid()); } } - private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) - throws InterruptedException { - WatchedEvent event = events.poll(5, TimeUnit.SECONDS); - assertNotNull(event); - assertEquals(eventType, event.getType()); - assertEquals(path, event.getPath()); + private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat) + throws InterruptedException { + assertEvent(events, eventType, path, stat.getMzxid()); + } + + private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid) + throws InterruptedException { + assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid); + } + + private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState, + String path, long zxid) throws InterruptedException { + WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS); + assertNotNull(actualEvent); + WatchedEvent expectedEvent = new WatchedEvent( + eventType, + keeperState, + path, + zxid + ); + assertEquals(expectedEvent, actualEvent); } } \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java index a3d6eef7bdc..cce26276cdd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java @@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) { return new WatcherOrBitSet(Collections.emptySet()); } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) { return new WatcherOrBitSet(Collections.emptySet()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java index f4a0298f233..a9bc11da2a6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java @@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() { for (EventType et : allTypes) { for (KeeperState ks : allStates) { wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah"); - we = new WatchedEvent(wep); + we = new WatchedEvent(wep, WatchedEvent.NO_ZXID); assertEquals(et, we.getType()); assertEquals(ks, we.getState()); assertEquals("blah", we.getPath()); @@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() { try { WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo"); - new WatchedEvent(wep); + new WatchedEvent(wep, WatchedEvent.NO_ZXID); fail("Was able to create WatchedEvent from bad wrapper"); } catch (RuntimeException re) { // we're good diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java index 0987c4cf7e9..4d893ed6379 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java @@ -18,11 +18,6 @@ package org.apache.zookeeper.test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -37,10 +32,17 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class WatcherFuncTest extends ClientBase { private static class SimpleWatcher implements Watcher { @@ -68,14 +70,14 @@ public void process(WatchedEvent event) { assertTrue(false, "interruption unexpected"); } } - public void verify(List expected) throws InterruptedException { + + public void verify(List expected) throws InterruptedException { + List actual = new ArrayList<>(); WatchedEvent event; - int count = 0; - while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { - assertEquals(expected.get(count), event.getType()); - count++; + while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { + actual.add(event); } - assertEquals(expected.size(), count); + assertEquals(expected, actual); events.clear(); } @@ -88,7 +90,7 @@ public void verify(List expected) throws InterruptedException { private volatile CountDownLatch lsnr_latch; private ZooKeeper lsnr; - private List expected; + private List expected; @BeforeEach @Override @@ -103,7 +105,7 @@ public void setUp() throws Exception { lsnr_dwatch = new SimpleWatcher(lsnr_latch); lsnr = createClient(lsnr_dwatch, lsnr_latch); - expected = new ArrayList(); + expected = new ArrayList<>(); } @AfterEach @@ -127,15 +129,34 @@ private void verify() throws InterruptedException { expected.clear(); } + private void addEvent(List events, EventType eventType, String path, Stat stat) { + addEvent(events, eventType, path, stat.getMzxid()); + } + + private void addEvent(List events, EventType eventType, String path, long zxid) { + events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid)); + } + + private long delete(String path) throws InterruptedException, KeeperException { + client.delete(path, -1); + int lastSlash = path.lastIndexOf('/'); + String parent = (lastSlash == 0) + ? "/" + : path.substring(0, lastSlash); + // the deletion's zxid will be reflected in the parent's Pzxid + return client.exists(parent, false).getPzxid(); + } + @Test public void testExistsSync() throws IOException, InterruptedException, KeeperException { assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo/bar", true)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo/bar", stat); verify(); @@ -160,20 +181,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc assertEquals("/foo/car", e.getPath()); } - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.exists("/foo", true)); assertNotNull(lsnr.exists("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -200,20 +221,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.getData("/foo", true, null)); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -238,8 +259,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getChildren("/foo", true)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", true)); client.setData("/foo", "parent".getBytes(), -1); @@ -250,11 +272,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -266,7 +288,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo", w1)); @@ -276,10 +298,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNull(lsnr.exists("/foo/bar", w3)); assertNull(lsnr.exists("/foo/bar", w4)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - e2.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(e2, EventType.NodeCreated, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -297,12 +320,12 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w4)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); - lsnr_dwatch.verify(new ArrayList()); // not reg so should = 0 + lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0 w1.verify(expected); w2.verify(e2); w3.verify(e2); @@ -319,10 +342,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w3)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -331,7 +354,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe w4.verify(e2); expected.clear(); e2.clear(); - } @Test @@ -341,7 +363,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); try { lsnr.getData("/foo", w1, null); @@ -367,10 +389,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w4, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -387,10 +409,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w3, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -408,7 +430,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); try { lsnr.getChildren("/foo", true); @@ -429,8 +451,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo", w1)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w3)); @@ -451,11 +474,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo/bar", w4)); assertNotNull(lsnr.getChildren("/foo/bar", w4)); - client.delete("/foo/bar", -1); - e2.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected);