Skip to content

Commit

Permalink
Fixed MockZookeeper tests
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Aug 9, 2021
1 parent bd5de76 commit 97bead7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,28 @@ public void notificationListeners(String provider, String url) throws Exception
store.put(key1Child, "value-2".getBytes(), Optional.empty()).join();
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Created);
assertEquals(n.getPath(), key1Child);
assertEquals(n.getType(), NotificationType.ChildrenChanged);


n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.ChildrenChanged);
assertEquals(n.getPath(), key1);
assertEquals(n.getType(), NotificationType.Created);
assertEquals(n.getPath(), key1Child);

assertTrue(store.exists(key1Child).join());
assertEquals(store.getChildren(key1).join(), Collections.singletonList("xx"));

store.delete(key1Child, Optional.empty()).join();
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Deleted);
assertEquals(n.getPath(), key1Child);

// Parent should be notified of the deletion
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.ChildrenChanged);
assertEquals(n.getPath(), key1);

n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Deleted);
assertEquals(n.getPath(), key1Child);
}
}
48 changes: 48 additions & 0 deletions testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
Expand Down Expand Up @@ -95,6 +97,16 @@ private class Failure {
}
}

@Data
@AllArgsConstructor
private static class PersistentWatcher {
final String path;
final Watcher watcher;
final AddWatchMode mode;
}

private List<PersistentWatcher> persistentWatchers;

public static MockZooKeeper newInstance() {
return newInstance(null);
}
Expand Down Expand Up @@ -154,6 +166,7 @@ private void init(ExecutorService executor) {
stopped = false;
alwaysFail = new AtomicReference<>(KeeperException.Code.OK);
failures = new CopyOnWriteArrayList<>();
persistentWatchers = new ArrayList<>();
}

@Override
Expand Down Expand Up @@ -226,6 +239,7 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM

final String finalPath = path;
executor.execute(() -> {
triggerPersistentWatches(finalPath, parent, EventType.NodeCreated);

toNotifyCreate.forEach(
watcher -> watcher.process(
Expand Down Expand Up @@ -292,6 +306,8 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
mutex.unlock();
cb.processResult(0, path, ctx, name);

triggerPersistentWatches(path, parent, EventType.NodeCreated);

toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
Expand Down Expand Up @@ -711,6 +727,8 @@ public Stat setData(final String path, byte[] data, int version) throws KeeperEx
}

executor.execute(() -> {
triggerPersistentWatches(path, null, EventType.NodeDataChanged);

toNotify.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)));
});
Expand Down Expand Up @@ -774,6 +792,8 @@ public void setData(final String path, final byte[] data, int version, final Sta
for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
}

triggerPersistentWatches(path, null, EventType.NodeDataChanged);
});
}

Expand Down Expand Up @@ -829,6 +849,8 @@ public void delete(final String path, int version) throws InterruptedException,
for (Watcher watcher2 : toNotifyParent) {
watcher2.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent));
}

triggerPersistentWatches(path, parent, EventType.NodeDeleted);
});
}

Expand Down Expand Up @@ -878,6 +900,7 @@ public void delete(final String path, int version, final VoidCallback cb, final
.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
};

Expand Down Expand Up @@ -920,6 +943,11 @@ public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws Interr
return res;
}

@Override
public synchronized void addWatch(String basePath, Watcher watcher, AddWatchMode mode) {
persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode));
}

@Override
public void close() throws InterruptedException {
}
Expand Down Expand Up @@ -1002,5 +1030,25 @@ private void checkReadOpDelay() {
}
}

private void triggerPersistentWatches(String path, String parent, EventType eventType) {
persistentWatchers.forEach(w -> {
if (w.mode == AddWatchMode.PERSISTENT_RECURSIVE) {
if (w.getPath().startsWith(path)) {
w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path));
}
} else if (w.mode == AddWatchMode.PERSISTENT) {
if (w.getPath().equals(path)) {
w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path));
}

if (eventType == EventType.NodeCreated || eventType == EventType.NodeDeleted) {
// Also notify parent
w.watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent));
}
}
});
}

private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class);
}

0 comments on commit 97bead7

Please sign in to comment.