From 13bd711d92e6152a20c899aedef6c1f9e1177caf Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Wed, 30 Nov 2022 15:52:23 -0500 Subject: [PATCH] Communicate the Zxid that triggered a WatchEvent to fire With the recent addition of persistent watches, many doors have opened up to significantly more performant and intuitive local caches of remote state, but the actual implementation can be difficult because to cache data locally, one needs to execute the following steps: 1. Set the watch 2. Bootstrap the watched subtree 3. Catch up on the events that fired during the bootstrap The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step 3 because it's unknown whether an event arrived during the bootstrap or after. For example, imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies in the local cache. This change sets the Zxid in the response header whenever the watch event type is NodeCreated, NodeDeleted, NodeDataChanged or NodeChildrenChanged. --- .../markdown/zookeeperProgrammers.md | 1 - .../zookeeper/server/watch/WatchBench.java | 6 +- .../java/org/apache/zookeeper/ClientCnxn.java | 2 +- .../org/apache/zookeeper/WatchedEvent.java | 39 ++++- .../java/org/apache/zookeeper/Watcher.java | 5 + .../org/apache/zookeeper/server/DataTree.java | 12 +- .../apache/zookeeper/server/DumbWatcher.java | 19 +++ .../zookeeper/server/NIOServerCnxn.java | 2 +- .../zookeeper/server/NettyServerCnxn.java | 2 +- .../zookeeper/server/watch/IWatchManager.java | 6 +- .../zookeeper/server/watch/WatchManager.java | 8 +- .../server/watch/WatchManagerOptimized.java | 8 +- .../server/watch/WatchManagerTest.java | 37 ++++- .../test/PersistentRecursiveWatcherTest.java | 133 +++++++++------ .../org/apache/zookeeper/test/TestUtils.java | 14 ++ .../test/UnsupportedAddWatcherTest.java | 4 +- .../zookeeper/test/WatchedEventTest.java | 4 +- .../zookeeper/test/WatcherFuncTest.java | 155 ++++++++++-------- 18 files changed, 298 insertions(+), 159 deletions(-) diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md index ddd722b9000..3151fb21177 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md @@ -677,7 +677,6 @@ Persistent watches are set using the method *addWatch()*. The triggering semanti (other than one-time triggering) are the same as standard watches. The only exception regarding events is that recursive persistent watchers never trigger child changed events as they are redundant. Persistent watches are removed using *removeWatches()* with watcher type *WatcherType.Any*. - ### Remove Watches diff --git a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java index aee5b2f18ab..7de1be0370b 100644 --- a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java +++ b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java @@ -191,7 +191,7 @@ void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerConcentrateWatch(InvocationState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } @@ -225,7 +225,7 @@ public void tearDown() { // clear all the watches for (String path : paths) { - watchManager.triggerWatch(path, event); + watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } @@ -294,7 +294,7 @@ public void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c29a2141d58..727d97daa6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { event.setPath(clientPath); } - WatchedEvent we = new WatchedEvent(event); + WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java index 1de3d3ddf68..1303629c914 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java @@ -31,27 +31,38 @@ */ @InterfaceAudience.Public public class WatchedEvent { + public static final long NO_ZXID = -1L; private final KeeperState keeperState; private final EventType eventType; - private String path; + private final String path; + private final long zxid; /** - * Create a WatchedEvent with specified type, state and path + * Create a WatchedEvent with specified type, state, path and zxid */ - public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; + this.zxid = zxid; } /** - * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent + * Create a WatchedEvent with specified type, state and path */ - public WatchedEvent(WatcherEvent eventMessage) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + this(eventType, keeperState, path, NO_ZXID); + } + + /** + * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent + */ + public WatchedEvent(WatcherEvent eventMessage, long zxid) { keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); + this.zxid = zxid; } public KeeperState getState() { @@ -66,9 +77,24 @@ public String getPath() { return path; } + /** + * Returns the zxid of the transaction that triggered this watch if it is + * of one of the following types: + * Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also + * returned by old servers that do not support this feature. + */ + public long getZxid() { + return zxid; + } + @Override public String toString() { - return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path; + return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid; } /** @@ -77,5 +103,4 @@ public String toString() { public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java index ab4b654880e..42e48b9efd2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java @@ -26,6 +26,11 @@ * server it connects to. An application using such a client handles these * events by registering a callback object with the client. The callback object * is expected to be an instance of a class that implements Watcher interface. + * When {@link #process} is triggered by a watch firing, such as + * {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will + * return the zxid of the transaction that caused said watch to fire. If + * {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to + * support this feature. * */ @InterfaceAudience.Public diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index a6f60539089..01a6da9a447 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); - dataWatches.triggerWatch(path, Event.EventType.NodeCreated); - childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); + dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid); + childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid); } /** @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException { "childWatches.triggerWatch " + parentName); } - WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); - childWatches.triggerWatch(path, EventType.NodeDeleted, processed); - childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); + WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid); + childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed); + childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid); } public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException { @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time) nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData)); updateWriteStat(path, dataBytes); - dataWatches.triggerWatch(path, EventType.NodeDataChanged); + dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid); return s; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index c7bf830b4ef..231a063fe12 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -33,6 +33,9 @@ public class DumbWatcher extends ServerCnxn { private long sessionId; + private String mostRecentPath; + private Event.EventType mostRecentEventType; + private long mostRecentZxid = WatchedEvent.NO_ZXID; public DumbWatcher() { this(0); @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) { @Override public void process(WatchedEvent event) { + mostRecentEventType = event.getType(); + mostRecentZxid = event.getZxid(); + mostRecentPath = event.getPath(); } + public String getMostRecentPath() { + return mostRecentPath; + } + + public Event.EventType getMostRecentEventType() { + return mostRecentEventType; + } + + public long getMostRecentZxid() { + return mostRecentZxid; + } + + @Override int getSessionTimeout() { return 0; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 1a8575cd45e..61cbe71bad2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St */ @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f95200d560b..3ed77183434 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -162,7 +162,7 @@ public int getSessionTimeout() { @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805a0..482bc7e87d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -82,10 +82,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid); /** * Distribute the watch event for the given path, but ignore those @@ -93,11 +94,12 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * @param suppress the suppressed watcher set * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress); /** * Get the size of watchers. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059b2..2697808f6ca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); Set watchers = new HashSet<>(); PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java index 7f72175efdf..0a6b4279fdb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java @@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); BitHashSet watchers = remove(path); if (watchers == null) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index f65391c497c..6474e0a06ae 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -131,7 +131,7 @@ public WatcherTriggerWorker( public void run() { while (!stopped) { String path = PATH_PREFIX + r.nextInt(paths); - WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted); + WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1); if (s != null) { triggeredCount.addAndGet(s.size()); } @@ -416,6 +416,12 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon assertEquals(sum, values.get("sum_" + metricName)); } + private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) { + assertEquals(path, watcher.getMostRecentPath()); + assertEquals(eventType, watcher.getMostRecentEventType()); + assertEquals(zxid, watcher.getMostRecentZxid()); + } + @ParameterizedTest @MethodSource("data") public void testWatcherMetrics(String className) throws IOException { @@ -430,41 +436,54 @@ public void testWatcherMetrics(String className) throws IOException { final String path3 = "/path3"; - //both wather1 and wather2 are watching path1 + //both watcher1 and watcher2 are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path3, EventType.NodeCreated); + manager.triggerWatch(path3, EventType.NodeCreated, 1); //path3 is not being watched so metric is 0 checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L); + // Watchers shouldn't have received any events yet so the zxid should be -1. + checkMostRecentWatchedEvent(watcher1, null, null, -1); + checkMostRecentWatchedEvent(watcher2, null, null, -1); //path1 is watched by two watchers so two fired - manager.triggerWatch(path1, EventType.NodeCreated); + manager.triggerWatch(path1, EventType.NodeCreated, 2); checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //path2 is watched by one watcher so one fired now total is 3 - manager.triggerWatch(path2, EventType.NodeCreated); + manager.triggerWatch(path2, EventType.NodeCreated, 3); checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //watches on path1 are no longer there so zero fired - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 4); checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); - //both wather1 and wather2 are watching path1 + //both watcher and watcher are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 5); checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); - manager.triggerWatch(path2, EventType.NodeDeleted); + manager.triggerWatch(path2, EventType.NodeDeleted, 6); checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); //make sure that node created watch count is not impacted by the fire of other event types checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index e74ee2fd683..529d6620a6a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -32,8 +32,11 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -80,21 +83,28 @@ public void testBasicAsync() private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException { zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData("/a/b/c/d/e", new byte[0], -1); + + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); + + zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); + + stat = zk.setData("/a/b/c/d/e", new byte[0], -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat); + zk.delete("/a/b/c/d/e", -1); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); + assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid()); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); } @Test @@ -103,14 +113,15 @@ public void testRemoval() try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false); zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b"); + assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID); } } @@ -124,13 +135,15 @@ public void testNoChildEvents() throws Exception { BlockingQueue childEvents = new LinkedBlockingQueue<>(); zk.getChildren("/a", childEvents::add); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Stat createABStat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat); + Stat createABCStat = new Stat(); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat); - assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid()); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat); assertTrue(events.isEmpty()); } } @@ -140,9 +153,9 @@ public void testDisconnect() throws Exception { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); stopServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID); startServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID); internalTestBasic(zk); } } @@ -157,17 +170,15 @@ public void testMultiClient() zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); - zk1.setData("/a/b/c", "one".getBytes(), -1); - Thread.sleep(1000); // give some time for the event to arrive - - zk2.setData("/a/b/c", "two".getBytes(), -1); - zk2.setData("/a/b/c", "three".getBytes(), -1); - zk2.setData("/a/b/c", "four".getBytes(), -1); - - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); + Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + + stat = zk2.setData("/a/b/c", "two".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "three".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "four".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); } } @@ -176,22 +187,42 @@ public void testRootWatcher() throws IOException, InterruptedException, KeeperException { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE); - zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c"); + Stat stat = new Stat(); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid()); + + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid()); + + zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid()); + + zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid()); } } - private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) - throws InterruptedException { - WatchedEvent event = events.poll(5, TimeUnit.SECONDS); - assertNotNull(event); - assertEquals(eventType, event.getType()); - assertEquals(path, event.getPath()); + private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat) + throws InterruptedException { + assertEvent(events, eventType, path, stat.getMzxid()); + } + + private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid) + throws InterruptedException { + assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid); + } + + private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState, + String path, long zxid) throws InterruptedException { + WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS); + assertNotNull(actualEvent); + WatchedEvent expectedEvent = new WatchedEvent( + eventType, + keeperState, + path, + zxid + ); + TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent); } } \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java index 00c6c070a04..e3306c1fe76 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java @@ -18,8 +18,10 @@ package org.apache.zookeeper.test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import org.apache.zookeeper.WatchedEvent; /** * This class contains test utility methods @@ -57,4 +59,16 @@ public static boolean deleteFileRecursively(File file) { return deleteFileRecursively(file, false); } + /** + * Asserts that the given {@link WatchedEvent} are semantically equal, i.e. they have the same EventType, path and + * zxid. + */ + public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent actual) { + // TODO: .hashCode and .equals cannot be added to WatchedEvent without potentially breaking consumers. This + // can be changed to `assertEquals(expected, actual)` once WatchedEvent has those methods. Until then, + // compare the lists manually. + assertEquals(expected.getType(), actual.getType()); + assertEquals(expected.getPath(), actual.getPath()); + assertEquals(expected.getZxid(), actual.getZxid()); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java index a3d6eef7bdc..cce26276cdd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java @@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) { return new WatcherOrBitSet(Collections.emptySet()); } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) { return new WatcherOrBitSet(Collections.emptySet()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java index f4a0298f233..a9bc11da2a6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java @@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() { for (EventType et : allTypes) { for (KeeperState ks : allStates) { wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah"); - we = new WatchedEvent(wep); + we = new WatchedEvent(wep, WatchedEvent.NO_ZXID); assertEquals(et, we.getType()); assertEquals(ks, we.getState()); assertEquals("blah", we.getPath()); @@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() { try { WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo"); - new WatchedEvent(wep); + new WatchedEvent(wep, WatchedEvent.NO_ZXID); fail("Was able to create WatchedEvent from bad wrapper"); } catch (RuntimeException re) { // we're good diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java index 180cd08a611..44440a7c3bb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java @@ -37,6 +37,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -68,14 +69,17 @@ public void process(WatchedEvent event) { assertTrue(false, "interruption unexpected"); } } - public void verify(List expected) throws InterruptedException { + + public void verify(List expected) throws InterruptedException { + List actual = new ArrayList<>(); WatchedEvent event; - int count = 0; - while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { - assertEquals(expected.get(count), event.getType()); - count++; + while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { + actual.add(event); + } + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i)); } - assertEquals(expected.size(), count); events.clear(); } @@ -88,7 +92,7 @@ public void verify(List expected) throws InterruptedException { private volatile CountDownLatch lsnr_latch; private ZooKeeper lsnr; - private List expected; + private List expected; @BeforeEach @Override @@ -127,15 +131,34 @@ private void verify() throws InterruptedException { expected.clear(); } + private void addEvent(List events, EventType eventType, String path, Stat stat) { + addEvent(events, eventType, path, stat.getMzxid()); + } + + private void addEvent(List events, EventType eventType, String path, long zxid) { + events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid)); + } + + private long delete(String path) throws InterruptedException, KeeperException { + client.delete(path, -1); + int lastSlash = path.lastIndexOf('/'); + String parent = (lastSlash == 0) + ? "/" + : path.substring(0, lastSlash); + // the deletion's zxid will be reflected in the parent's Pzxid + return client.exists(parent, false).getPzxid(); + } + @Test public void testExistsSync() throws IOException, InterruptedException, KeeperException { assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo/bar", true)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo/bar", stat); verify(); @@ -160,20 +183,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc assertEquals("/foo/car", e.getPath()); } - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.exists("/foo", true)); assertNotNull(lsnr.exists("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -200,20 +223,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.getData("/foo", true, null)); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -238,8 +261,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getChildren("/foo", true)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", true)); client.setData("/foo", "parent".getBytes(), -1); @@ -250,11 +274,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -266,7 +290,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo", w1)); @@ -276,10 +300,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNull(lsnr.exists("/foo/bar", w3)); assertNull(lsnr.exists("/foo/bar", w4)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - e2.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(e2, EventType.NodeCreated, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -297,10 +322,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w4)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0 w1.verify(expected); @@ -319,10 +344,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w3)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -331,7 +356,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe w4.verify(e2); expected.clear(); e2.clear(); - } @Test @@ -341,7 +365,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getData("/foo", w1, null); @@ -367,10 +391,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w4, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -387,10 +411,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w3, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -408,7 +432,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getChildren("/foo", true); @@ -429,8 +453,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo", w1)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w3)); @@ -451,11 +476,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo/bar", w4)); assertNotNull(lsnr.getChildren("/foo/bar", w4)); - client.delete("/foo/bar", -1); - e2.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected);