diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
index ddd722b9000..3151fb21177 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
@@ -677,7 +677,6 @@ Persistent watches are set using the method *addWatch()*. The triggering semanti
(other than one-time triggering) are the same as standard watches. The only exception regarding events is that
recursive persistent watchers never trigger child changed events as they are redundant.
Persistent watches are removed using *removeWatches()* with watcher type *WatcherType.Any*.
-
### Remove Watches
diff --git a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java
index aee5b2f18ab..7de1be0370b 100644
--- a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java
+++ b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java
@@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
- state.watchManager.triggerWatch(path, event);
+ state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
@@ -225,7 +225,7 @@ public void tearDown() {
// clear all the watches
for (String path : paths) {
- watchManager.triggerWatch(path, event);
+ watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
@@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
- state.watchManager.triggerWatch(path, event);
+ state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index c29a2141d58..727d97daa6a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
event.setPath(clientPath);
}
- WatchedEvent we = new WatchedEvent(event);
+ WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java
index 1de3d3ddf68..1303629c914 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java
@@ -31,27 +31,38 @@
*/
@InterfaceAudience.Public
public class WatchedEvent {
+ public static final long NO_ZXID = -1L;
private final KeeperState keeperState;
private final EventType eventType;
- private String path;
+ private final String path;
+ private final long zxid;
/**
- * Create a WatchedEvent with specified type, state and path
+ * Create a WatchedEvent with specified type, state, path and zxid
*/
- public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
+ public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
+ this.zxid = zxid;
}
/**
- * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
+ * Create a WatchedEvent with specified type, state and path
*/
- public WatchedEvent(WatcherEvent eventMessage) {
+ public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
+ this(eventType, keeperState, path, NO_ZXID);
+ }
+
+ /**
+ * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
+ */
+ public WatchedEvent(WatcherEvent eventMessage, long zxid) {
keeperState = KeeperState.fromInt(eventMessage.getState());
eventType = EventType.fromInt(eventMessage.getType());
path = eventMessage.getPath();
+ this.zxid = zxid;
}
public KeeperState getState() {
@@ -66,9 +77,24 @@ public String getPath() {
return path;
}
+ /**
+ * Returns the zxid of the transaction that triggered this watch if it is
+ * of one of the following types:
+ * - {@link EventType#NodeCreated}
+ * - {@link EventType#NodeDeleted}
+ * - {@link EventType#NodeDataChanged}
+ * - {@link EventType#NodeChildrenChanged}
+ *
+ * Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also
+ * returned by old servers that do not support this feature.
+ */
+ public long getZxid() {
+ return zxid;
+ }
+
@Override
public String toString() {
- return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
+ return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
}
/**
@@ -77,5 +103,4 @@ public String toString() {
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}
-
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
index ab4b654880e..42e48b9efd2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
@@ -26,6 +26,11 @@
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
+ * When {@link #process} is triggered by a watch firing, such as
+ * {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will
+ * return the zxid of the transaction that caused said watch to fire. If
+ * {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to
+ * support this feature.
*
*/
@InterfaceAudience.Public
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index a6f60539089..01a6da9a447 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
- dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
- childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
+ dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
+ childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}
/**
@@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
"childWatches.triggerWatch " + parentName);
}
- WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
- childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
- childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
+ WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
+ childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
+ childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
@@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));
updateWriteStat(path, dataBytes);
- dataWatches.triggerWatch(path, EventType.NodeDataChanged);
+ dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
index c7bf830b4ef..231a063fe12 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
@@ -33,6 +33,9 @@
public class DumbWatcher extends ServerCnxn {
private long sessionId;
+ private String mostRecentPath;
+ private Event.EventType mostRecentEventType;
+ private long mostRecentZxid = WatchedEvent.NO_ZXID;
public DumbWatcher() {
this(0);
@@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) {
@Override
public void process(WatchedEvent event) {
+ mostRecentEventType = event.getType();
+ mostRecentZxid = event.getZxid();
+ mostRecentPath = event.getPath();
}
+ public String getMostRecentPath() {
+ return mostRecentPath;
+ }
+
+ public Event.EventType getMostRecentEventType() {
+ return mostRecentEventType;
+ }
+
+ public long getMostRecentZxid() {
+ return mostRecentZxid;
+ }
+
+
@Override
int getSessionTimeout() {
return 0;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index 1a8575cd45e..61cbe71bad2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
*/
@Override
public void process(WatchedEvent event) {
- ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
+ ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index f95200d560b..3ed77183434 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -162,7 +162,7 @@ public int getSessionTimeout() {
@Override
public void process(WatchedEvent event) {
- ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
+ ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
index 1bc44c805a0..482bc7e87d4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
@@ -82,10 +82,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
+ * @param zxid the zxid for the corresponding change that triggered this event
*
* @return the watchers have been notified
*/
- WatcherOrBitSet triggerWatch(String path, EventType type);
+ WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);
/**
* Distribute the watch event for the given path, but ignore those
@@ -93,11 +94,12 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
+ * @param zxid the zxid for the corresponding change that triggered this event
* @param suppress the suppressed watcher set
*
* @return the watchers have been notified
*/
- WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
+ WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);
/**
* Get the size of watchers.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
index c5b133059b2..2697808f6ca 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
@@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) {
}
@Override
- public WatcherOrBitSet triggerWatch(String path, EventType type) {
- return triggerWatch(path, type, null);
+ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
+ return triggerWatch(path, type, zxid, null);
}
@Override
- public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
- WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
+ WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
index 7f72175efdf..0a6b4279fdb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
@@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) {
}
@Override
- public WatcherOrBitSet triggerWatch(String path, EventType type) {
- return triggerWatch(path, type, null);
+ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
+ return triggerWatch(path, type, zxid, null);
}
@Override
- public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
- WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+ public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
+ WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
BitHashSet watchers = remove(path);
if (watchers == null) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
index f65391c497c..6474e0a06ae 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -131,7 +131,7 @@ public WatcherTriggerWorker(
public void run() {
while (!stopped) {
String path = PATH_PREFIX + r.nextInt(paths);
- WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted);
+ WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1);
if (s != null) {
triggeredCount.addAndGet(s.size());
}
@@ -416,6 +416,12 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon
assertEquals(sum, values.get("sum_" + metricName));
}
+ private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) {
+ assertEquals(path, watcher.getMostRecentPath());
+ assertEquals(eventType, watcher.getMostRecentEventType());
+ assertEquals(zxid, watcher.getMostRecentZxid());
+ }
+
@ParameterizedTest
@MethodSource("data")
public void testWatcherMetrics(String className) throws IOException {
@@ -430,41 +436,54 @@ public void testWatcherMetrics(String className) throws IOException {
final String path3 = "/path3";
- //both wather1 and wather2 are watching path1
+ //both watcher1 and watcher2 are watching path1
manager.addWatch(path1, watcher1);
manager.addWatch(path1, watcher2);
//path2 is watched by watcher1
manager.addWatch(path2, watcher1);
- manager.triggerWatch(path3, EventType.NodeCreated);
+ manager.triggerWatch(path3, EventType.NodeCreated, 1);
//path3 is not being watched so metric is 0
checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
+ // Watchers shouldn't have received any events yet so the zxid should be -1.
+ checkMostRecentWatchedEvent(watcher1, null, null, -1);
+ checkMostRecentWatchedEvent(watcher2, null, null, -1);
//path1 is watched by two watchers so two fired
- manager.triggerWatch(path1, EventType.NodeCreated);
+ manager.triggerWatch(path1, EventType.NodeCreated, 2);
checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
+ checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2);
+ checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
//path2 is watched by one watcher so one fired now total is 3
- manager.triggerWatch(path2, EventType.NodeCreated);
+ manager.triggerWatch(path2, EventType.NodeCreated, 3);
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+ checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
+ checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
//watches on path1 are no longer there so zero fired
- manager.triggerWatch(path1, EventType.NodeDataChanged);
+ manager.triggerWatch(path1, EventType.NodeDataChanged, 4);
checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
+ checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
+ checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
- //both wather1 and wather2 are watching path1
+ //both watcher and watcher are watching path1
manager.addWatch(path1, watcher1);
manager.addWatch(path1, watcher2);
//path2 is watched by watcher1
manager.addWatch(path2, watcher1);
- manager.triggerWatch(path1, EventType.NodeDataChanged);
+ manager.triggerWatch(path1, EventType.NodeDataChanged, 5);
checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
+ checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5);
+ checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);
- manager.triggerWatch(path2, EventType.NodeDeleted);
+ manager.triggerWatch(path2, EventType.NodeDeleted, 6);
checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
+ checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6);
+ checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);
//make sure that node created watch count is not impacted by the fire of other event types
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
index e74ee2fd683..529d6620a6a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
@@ -32,8 +32,11 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -80,21 +83,28 @@ public void testBasicAsync()
private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.setData("/a/b/c/d/e", new byte[0], -1);
+
+ Stat stat = new Stat();
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b", stat);
+
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);
+
+ zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat);
+
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);
+
+ stat = zk.setData("/a/b/c/d/e", new byte[0], -1);
+ assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat);
+
zk.delete("/a/b/c/d/e", -1);
- zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
- assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e");
- assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+ assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid());
+
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);
}
@Test
@@ -103,14 +113,15 @@ public void testRemoval()
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+ Stat stat = new Stat();
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b", stat);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);
zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+ assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID);
}
}
@@ -124,13 +135,15 @@ public void testNoChildEvents() throws Exception {
BlockingQueue childEvents = new LinkedBlockingQueue<>();
zk.getChildren("/a", childEvents::add);
- zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Stat createABStat = new Stat();
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat);
+ Stat createABCStat = new Stat();
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat);
- assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+ assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid());
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat);
assertTrue(events.isEmpty());
}
}
@@ -140,9 +153,9 @@ public void testDisconnect() throws Exception {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
stopServer();
- assertEvent(events, Watcher.Event.EventType.None, null);
+ assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID);
startServer();
- assertEvent(events, Watcher.Event.EventType.None, null);
+ assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID);
internalTestBasic(zk);
}
}
@@ -157,17 +170,15 @@ public void testMultiClient()
zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
- zk1.setData("/a/b/c", "one".getBytes(), -1);
- Thread.sleep(1000); // give some time for the event to arrive
-
- zk2.setData("/a/b/c", "two".getBytes(), -1);
- zk2.setData("/a/b/c", "three".getBytes(), -1);
- zk2.setData("/a/b/c", "four".getBytes(), -1);
-
- assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
- assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
- assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
- assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+ Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1);
+ assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+
+ stat = zk2.setData("/a/b/c", "two".getBytes(), -1);
+ assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+ stat = zk2.setData("/a/b/c", "three".getBytes(), -1);
+ assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+ stat = zk2.setData("/a/b/c", "four".getBytes(), -1);
+ assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
}
}
@@ -176,22 +187,42 @@ public void testRootWatcher()
throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);
- zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b");
- assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c");
+ Stat stat = new Stat();
+
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid());
+
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid());
+
+ zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid());
+
+ zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid());
}
}
- private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path)
- throws InterruptedException {
- WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
- assertNotNull(event);
- assertEquals(eventType, event.getType());
- assertEquals(path, event.getPath());
+ private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat)
+ throws InterruptedException {
+ assertEvent(events, eventType, path, stat.getMzxid());
+ }
+
+ private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid)
+ throws InterruptedException {
+ assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid);
+ }
+
+ private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState,
+ String path, long zxid) throws InterruptedException {
+ WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS);
+ assertNotNull(actualEvent);
+ WatchedEvent expectedEvent = new WatchedEvent(
+ eventType,
+ keeperState,
+ path,
+ zxid
+ );
+ TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent);
}
}
\ No newline at end of file
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java
index 00c6c070a04..e3306c1fe76 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java
@@ -18,8 +18,10 @@
package org.apache.zookeeper.test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
+import org.apache.zookeeper.WatchedEvent;
/**
* This class contains test utility methods
@@ -57,4 +59,16 @@ public static boolean deleteFileRecursively(File file) {
return deleteFileRecursively(file, false);
}
+ /**
+ * Asserts that the given {@link WatchedEvent} are semantically equal, i.e. they have the same EventType, path and
+ * zxid.
+ */
+ public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent actual) {
+ // TODO: .hashCode and .equals cannot be added to WatchedEvent without potentially breaking consumers. This
+ // can be changed to `assertEquals(expected, actual)` once WatchedEvent has those methods. Until then,
+ // compare the lists manually.
+ assertEquals(expected.getType(), actual.getType());
+ assertEquals(expected.getPath(), actual.getPath());
+ assertEquals(expected.getZxid(), actual.getZxid());
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
index a3d6eef7bdc..cce26276cdd 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
@@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) {
}
@Override
- public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) {
+ public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) {
return new WatcherOrBitSet(Collections.emptySet());
}
@Override
- public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) {
+ public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) {
return new WatcherOrBitSet(Collections.emptySet());
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java
index f4a0298f233..a9bc11da2a6 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java
@@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() {
for (EventType et : allTypes) {
for (KeeperState ks : allStates) {
wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah");
- we = new WatchedEvent(wep);
+ we = new WatchedEvent(wep, WatchedEvent.NO_ZXID);
assertEquals(et, we.getType());
assertEquals(ks, we.getState());
assertEquals("blah", we.getPath());
@@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() {
try {
WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo");
- new WatchedEvent(wep);
+ new WatchedEvent(wep, WatchedEvent.NO_ZXID);
fail("Was able to create WatchedEvent from bad wrapper");
} catch (RuntimeException re) {
// we're good
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java
index 180cd08a611..44440a7c3bb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java
@@ -37,6 +37,7 @@
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -68,14 +69,17 @@ public void process(WatchedEvent event) {
assertTrue(false, "interruption unexpected");
}
}
- public void verify(List expected) throws InterruptedException {
+
+ public void verify(List expected) throws InterruptedException {
+ List actual = new ArrayList<>();
WatchedEvent event;
- int count = 0;
- while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) {
- assertEquals(expected.get(count), event.getType());
- count++;
+ while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) {
+ actual.add(event);
+ }
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i));
}
- assertEquals(expected.size(), count);
events.clear();
}
@@ -88,7 +92,7 @@ public void verify(List expected) throws InterruptedException {
private volatile CountDownLatch lsnr_latch;
private ZooKeeper lsnr;
- private List expected;
+ private List expected;
@BeforeEach
@Override
@@ -127,15 +131,34 @@ private void verify() throws InterruptedException {
expected.clear();
}
+ private void addEvent(List events, EventType eventType, String path, Stat stat) {
+ addEvent(events, eventType, path, stat.getMzxid());
+ }
+
+ private void addEvent(List events, EventType eventType, String path, long zxid) {
+ events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid));
+ }
+
+ private long delete(String path) throws InterruptedException, KeeperException {
+ client.delete(path, -1);
+ int lastSlash = path.lastIndexOf('/');
+ String parent = (lastSlash == 0)
+ ? "/"
+ : path.substring(0, lastSlash);
+ // the deletion's zxid will be reflected in the parent's Pzxid
+ return client.exists(parent, false).getPzxid();
+ }
+
@Test
public void testExistsSync() throws IOException, InterruptedException, KeeperException {
assertNull(lsnr.exists("/foo", true));
assertNull(lsnr.exists("/foo/bar", true));
- client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- expected.add(EventType.NodeCreated);
- client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- expected.add(EventType.NodeCreated);
+ Stat stat = new Stat();
+ client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(expected, EventType.NodeCreated, "/foo", stat);
+ client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(expected, EventType.NodeCreated, "/foo/bar", stat);
verify();
@@ -160,20 +183,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc
assertEquals("/foo/car", e.getPath());
}
- client.setData("/foo", "parent".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
- client.setData("/foo/bar", "child".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
+ stat = client.setData("/foo", "parent".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+ stat = client.setData("/foo/bar", "child".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);
verify();
assertNotNull(lsnr.exists("/foo", true));
assertNotNull(lsnr.exists("/foo/bar", true));
- client.delete("/foo/bar", -1);
- expected.add(EventType.NodeDeleted);
- client.delete("/foo", -1);
- expected.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
verify();
}
@@ -200,20 +223,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx
client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertNotNull(lsnr.getData("/foo/bar", true, null));
- client.setData("/foo", "parent".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
- client.setData("/foo/bar", "child".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
+ Stat stat = client.setData("/foo", "parent".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+ stat = client.setData("/foo/bar", "child".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);
verify();
assertNotNull(lsnr.getData("/foo", true, null));
assertNotNull(lsnr.getData("/foo/bar", true, null));
- client.delete("/foo/bar", -1);
- expected.add(EventType.NodeDeleted);
- client.delete("/foo", -1);
- expected.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
verify();
}
@@ -238,8 +261,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep
client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertNotNull(lsnr.getChildren("/foo", true));
- client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- expected.add(EventType.NodeChildrenChanged); // /foo
+ Stat stat = new Stat();
+ client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
assertNotNull(lsnr.getChildren("/foo/bar", true));
client.setData("/foo", "parent".getBytes(), -1);
@@ -250,11 +274,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep
assertNotNull(lsnr.getChildren("/foo", true));
assertNotNull(lsnr.getChildren("/foo/bar", true));
- client.delete("/foo/bar", -1);
- expected.add(EventType.NodeDeleted); // /foo/bar childwatch
- expected.add(EventType.NodeChildrenChanged); // /foo
- client.delete("/foo", -1);
- expected.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch
+ addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
verify();
}
@@ -266,7 +290,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe
SimpleWatcher w3 = new SimpleWatcher(null);
SimpleWatcher w4 = new SimpleWatcher(null);
- List e2 = new ArrayList<>();
+ List e2 = new ArrayList<>();
assertNull(lsnr.exists("/foo", true));
assertNull(lsnr.exists("/foo", w1));
@@ -276,10 +300,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe
assertNull(lsnr.exists("/foo/bar", w3));
assertNull(lsnr.exists("/foo/bar", w4));
- client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- expected.add(EventType.NodeCreated);
- client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- e2.add(EventType.NodeCreated);
+ Stat stat = new Stat();
+ client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(expected, EventType.NodeCreated, "/foo", stat);
+ client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(e2, EventType.NodeCreated, "/foo/bar", stat);
lsnr_dwatch.verify(expected);
w1.verify(expected);
@@ -297,10 +322,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe
assertNotNull(lsnr.exists("/foo/bar", w4));
assertNotNull(lsnr.exists("/foo/bar", w4));
- client.setData("/foo", "parent".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
- client.setData("/foo/bar", "child".getBytes(), -1);
- e2.add(EventType.NodeDataChanged);
+ stat = client.setData("/foo", "parent".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+ stat = client.setData("/foo/bar", "child".getBytes(), -1);
+ addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);
lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0
w1.verify(expected);
@@ -319,10 +344,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe
assertNotNull(lsnr.exists("/foo/bar", w3));
assertNotNull(lsnr.exists("/foo/bar", w4));
- client.delete("/foo/bar", -1);
- expected.add(EventType.NodeDeleted);
- client.delete("/foo", -1);
- e2.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
lsnr_dwatch.verify(expected);
w1.verify(expected);
@@ -331,7 +356,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe
w4.verify(e2);
expected.clear();
e2.clear();
-
}
@Test
@@ -341,7 +365,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep
SimpleWatcher w3 = new SimpleWatcher(null);
SimpleWatcher w4 = new SimpleWatcher(null);
- List e2 = new ArrayList<>();
+ List e2 = new ArrayList<>();
try {
lsnr.getData("/foo", w1, null);
@@ -367,10 +391,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep
assertNotNull(lsnr.getData("/foo/bar", w4, null));
assertNotNull(lsnr.getData("/foo/bar", w4, null));
- client.setData("/foo", "parent".getBytes(), -1);
- expected.add(EventType.NodeDataChanged);
- client.setData("/foo/bar", "child".getBytes(), -1);
- e2.add(EventType.NodeDataChanged);
+ Stat stat = client.setData("/foo", "parent".getBytes(), -1);
+ addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+ stat = client.setData("/foo/bar", "child".getBytes(), -1);
+ addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);
lsnr_dwatch.verify(expected);
w1.verify(expected);
@@ -387,10 +411,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep
assertNotNull(lsnr.getData("/foo/bar", w3, null));
assertNotNull(lsnr.getData("/foo/bar", w4, null));
- client.delete("/foo/bar", -1);
- expected.add(EventType.NodeDeleted);
- client.delete("/foo", -1);
- e2.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
lsnr_dwatch.verify(expected);
w1.verify(expected);
@@ -408,7 +432,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException,
SimpleWatcher w3 = new SimpleWatcher(null);
SimpleWatcher w4 = new SimpleWatcher(null);
- List e2 = new ArrayList<>();
+ List e2 = new ArrayList<>();
try {
lsnr.getChildren("/foo", true);
@@ -429,8 +453,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException,
assertNotNull(lsnr.getChildren("/foo", true));
assertNotNull(lsnr.getChildren("/foo", w1));
- client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- expected.add(EventType.NodeChildrenChanged); // /foo
+ Stat stat = new Stat();
+ client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
assertNotNull(lsnr.getChildren("/foo/bar", w2));
assertNotNull(lsnr.getChildren("/foo/bar", w2));
assertNotNull(lsnr.getChildren("/foo/bar", w3));
@@ -451,11 +476,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException,
assertNotNull(lsnr.getChildren("/foo/bar", w4));
assertNotNull(lsnr.getChildren("/foo/bar", w4));
- client.delete("/foo/bar", -1);
- e2.add(EventType.NodeDeleted); // /foo/bar childwatch
- expected.add(EventType.NodeChildrenChanged); // /foo
- client.delete("/foo", -1);
- expected.add(EventType.NodeDeleted);
+ long deleteZxid = delete("/foo/bar");
+ addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+ addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
+ deleteZxid = delete("/foo");
+ addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
lsnr_dwatch.verify(expected);
w1.verify(expected);