Skip to content

Commit

Permalink
Add tests to demonstrate losted events after reconnected for persiste…
Browse files Browse the repository at this point in the history
…nt watches

I found this in reply to
apache#1950 (comment).

But it turns out a known issue
apache#1106 (comment).

> An important question to all committers. In DataTree.setWatches
> persistent watchers are not applied. This means that after a
> network partition, no persistent watchers will trigger. I
> don't have a feeling about this one way or another - the current
> implementation works fine for Curator's use cases.
  • Loading branch information
kezhuw committed May 22, 2023
1 parent a64dbf5 commit 31d89e9
Showing 1 changed file with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand Down Expand Up @@ -65,11 +66,12 @@ public void process(WatchedEvent event) {
}
}

public void assertEvent(long timeout, EventType eventType) {
public void assertEvent(long timeout, String path, EventType eventType) {
try {
WatchedEvent event = dataEvents.poll(timeout, TimeUnit.MILLISECONDS);
assertNotNull(event, "do not receive a " + eventType);
assertEquals(eventType, event.getType());
assertEquals(path, event.getPath());
} catch (InterruptedException e) {
LOG.warn("ignoring interrupt during EventsWatcher assertEvent");
}
Expand Down Expand Up @@ -134,7 +136,7 @@ public void testNodeDataChanged() throws Exception {
zk2.setData(path, new byte[2], stat1.getVersion());
qu.start(1);
watcher.waitForConnected(TIMEOUT);
watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDataChanged);
}

@Test
Expand All @@ -146,7 +148,7 @@ public void testNodeCreated() throws Exception {
zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeCreated);
watcher.assertEvent(TIMEOUT, path, EventType.NodeCreated);
}

@Test
Expand All @@ -159,23 +161,23 @@ public void testNodeDeleted() throws Exception {
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDeleted);

zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.exists(path, watcher);
qu.shutdown(1);
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDeleted);

zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.getChildren(path, watcher);
qu.shutdown(1);
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDeleted);
}

@Test
Expand All @@ -188,8 +190,53 @@ public void testNodeChildrenChanged() throws Exception {
zk2.create(path + "/children-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeChildrenChanged);
watcher.assertEvent(TIMEOUT, path, EventType.NodeChildrenChanged);
}

@Test
public void testPersistentWatch() throws Exception {
String path = "/test-persistent";

zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk2.create(path + "/children-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.addWatch(path, watcher, AddWatchMode.PERSISTENT);

qu.shutdown(1);
zk2.setData(path, new byte[2], -1);
zk2.delete(path + "/children-1", -1);
zk2.create(path + "/children-2", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
qu.start(1);

watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDataChanged);
watcher.assertEvent(TIMEOUT, path, EventType.NodeChildrenChanged);
watcher.assertEvent(TIMEOUT, path, EventType.NodeChildrenChanged);
}

@Test
public void testPersistentRecursiveWatch() throws Exception {
String path = "/test-persistent-recursive";

zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk2.create(path + "/children-1", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.addWatch(path, watcher, AddWatchMode.PERSISTENT);

qu.shutdown(1);
zk2.setData(path, new byte[2], -1);

// XXX: How this could be detected by ZooKeeper now ?
//
// Currently, ZooKeeper maintains and exposes only latest view of DataTree.
// Given only this, we are incapable to detect deletions in disconnected state.
zk2.delete(path + "/children-1", -1);

zk2.create(path + "/children-2", new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
qu.start(1);

watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, path, EventType.NodeDataChanged);
watcher.assertEvent(TIMEOUT, path + "/children-1", EventType.NodeDeleted);
watcher.assertEvent(TIMEOUT, path + "/children-2", EventType.NodeCreated);
}
}

0 comments on commit 31d89e9

Please sign in to comment.