diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index e4a2e9ec5c105..f81d47f2d20f0 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -276,27 +276,28 @@ public void notificationListeners(String provider, String url) throws Exception store.put(key1Child, "value-2".getBytes(), Optional.empty()).join(); n = notifications.poll(3, TimeUnit.SECONDS); assertNotNull(n); - assertEquals(n.getType(), NotificationType.Created); - assertEquals(n.getPath(), key1Child); + assertEquals(n.getType(), NotificationType.ChildrenChanged); + n = notifications.poll(3, TimeUnit.SECONDS); assertNotNull(n); - assertEquals(n.getType(), NotificationType.ChildrenChanged); - assertEquals(n.getPath(), key1); + assertEquals(n.getType(), NotificationType.Created); + assertEquals(n.getPath(), key1Child); assertTrue(store.exists(key1Child).join()); assertEquals(store.getChildren(key1).join(), Collections.singletonList("xx")); store.delete(key1Child, Optional.empty()).join(); - n = notifications.poll(3, TimeUnit.SECONDS); - assertNotNull(n); - assertEquals(n.getType(), NotificationType.Deleted); - assertEquals(n.getPath(), key1Child); // Parent should be notified of the deletion n = notifications.poll(3, TimeUnit.SECONDS); assertNotNull(n); assertEquals(n.getType(), NotificationType.ChildrenChanged); assertEquals(n.getPath(), key1); + + n = notifications.poll(3, TimeUnit.SECONDS); + assertNotNull(n); + assertEquals(n.getType(), NotificationType.Deleted); + assertEquals(n.getPath(), key1Child); } } diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index b791803d29d3b..f2770ab266717 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -42,6 +42,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiPredicate; +import lombok.AllArgsConstructor; +import lombok.Data; import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; @@ -95,6 +97,16 @@ private class Failure { } } + @Data + @AllArgsConstructor + private static class PersistentWatcher { + final String path; + final Watcher watcher; + final AddWatchMode mode; + } + + private List persistentWatchers; + public static MockZooKeeper newInstance() { return newInstance(null); } @@ -154,6 +166,7 @@ private void init(ExecutorService executor) { stopped = false; alwaysFail = new AtomicReference<>(KeeperException.Code.OK); failures = new CopyOnWriteArrayList<>(); + persistentWatchers = new ArrayList<>(); } @Override @@ -226,6 +239,7 @@ public String create(String path, byte[] data, List acl, CreateMode createM final String finalPath = path; executor.execute(() -> { + triggerPersistentWatches(finalPath, parent, EventType.NodeCreated); toNotifyCreate.forEach( watcher -> watcher.process( @@ -292,6 +306,8 @@ public void create(final String path, final byte[] data, final List acl, Cr mutex.unlock(); cb.processResult(0, path, ctx, name); + triggerPersistentWatches(path, parent, EventType.NodeCreated); + toNotifyCreate.forEach( watcher -> watcher.process( new WatchedEvent(EventType.NodeCreated, @@ -711,6 +727,8 @@ public Stat setData(final String path, byte[] data, int version) throws KeeperEx } executor.execute(() -> { + triggerPersistentWatches(path, null, EventType.NodeDataChanged); + toNotify.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path))); }); @@ -774,6 +792,8 @@ public void setData(final String path, final byte[] data, int version, final Sta for (Watcher watcher : toNotify) { watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); } + + triggerPersistentWatches(path, null, EventType.NodeDataChanged); }); } @@ -829,6 +849,8 @@ public void delete(final String path, int version) throws InterruptedException, for (Watcher watcher2 : toNotifyParent) { watcher2.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)); } + + triggerPersistentWatches(path, parent, EventType.NodeDeleted); }); } @@ -878,6 +900,7 @@ public void delete(final String path, int version, final VoidCallback cb, final .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path))); toNotifyParent.forEach(watcher -> watcher .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent))); + triggerPersistentWatches(path, parent, EventType.NodeDeleted); } }; @@ -920,6 +943,11 @@ public List multi(Iterable ops) throws Interr return res; } + @Override + public synchronized void addWatch(String basePath, Watcher watcher, AddWatchMode mode) { + persistentWatchers.add(new PersistentWatcher(basePath, watcher, mode)); + } + @Override public void close() throws InterruptedException { } @@ -1002,5 +1030,25 @@ private void checkReadOpDelay() { } } + private void triggerPersistentWatches(String path, String parent, EventType eventType) { + persistentWatchers.forEach(w -> { + if (w.mode == AddWatchMode.PERSISTENT_RECURSIVE) { + if (w.getPath().startsWith(path)) { + w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); + } + } else if (w.mode == AddWatchMode.PERSISTENT) { + if (w.getPath().equals(path)) { + w.watcher.process(new WatchedEvent(eventType, KeeperState.SyncConnected, path)); + } + + if (eventType == EventType.NodeCreated || eventType == EventType.NodeDeleted) { + // Also notify parent + w.watcher.process( + new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)); + } + } + }); + } + private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class); }