Skip to content

Commit

Permalink
Use ZK persistent watches
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jul 2, 2021
1 parent c716495 commit cb28038
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand All @@ -53,7 +56,7 @@
import org.apache.zookeeper.ZooKeeper;

@Slf4j
public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher, MetadataStoreLifecycle {
public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, MetadataStoreLifecycle {

private final String metadataURL;
private final MetadataStoreConfig metadataStoreConfig;
Expand All @@ -76,6 +79,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf
}
}))
.build();
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
} catch (Throwable t) {
throw new MetadataStoreException(t);
Expand All @@ -91,12 +95,38 @@ public ZKMetadataStore(ZooKeeper zkc) {
this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
}

@Override
protected void receivedSessionEvent(SessionEvent event) {
if (event == SessionEvent.SessionReestablished) {
// Recreate the persistent watch on the new session
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
if (rc == Code.OK.intValue()) {
super.receivedSessionEvent(event);
} else {
log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc));
sessionWatcher.setSessionInvalid();
// On the reconnectable client, mark the session as expired to trigger a new reconnect and
// we will have the chance to set the watch again.
if (zkc instanceof ZooKeeperClient) {
((ZooKeeperClient) zkc).process(
new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired,
null));
}
}
}, null);
} else {
super.receivedSessionEvent(event);
}
}

@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
CompletableFuture<Optional<GetResult>> future = new CompletableFuture<>();

try {
zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
zkc.getData(path, null, (rc, path1, ctx, data, stat) -> {
execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
Expand Down Expand Up @@ -137,7 +167,7 @@ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
CompletableFuture<List<String>> future = new CompletableFuture<>();

try {
zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
zkc.getChildren(path, null, (rc, path1, ctx, children) -> {
execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
Expand Down Expand Up @@ -180,7 +210,7 @@ public CompletableFuture<Boolean> existsFromStore(String path) {
CompletableFuture<Boolean> future = new CompletableFuture<>();

try {
zkc.exists(path, this, (rc, path1, ctx, stat) -> {
zkc.exists(path, null, (rc, path1, ctx, stat) -> {
execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
Expand All @@ -205,7 +235,7 @@ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> opt
}

@Override
public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
protected CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
Expand Down Expand Up @@ -262,7 +292,7 @@ public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long
}

@Override
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
protected CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();

CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -315,8 +345,7 @@ private static MetadataStoreException getException(Code code, String path) {
}
}

@Override
public void process(WatchedEvent event) {
private void handleWatchEvent(WatchedEvent event) {
if (log.isDebugEnabled()) {
log.debug("Received ZK watch : {}", event);
}
Expand All @@ -326,10 +355,15 @@ public void process(WatchedEvent event) {
return;
}

String parent = parent(path);

NotificationType type;
switch (event.getType()) {
case NodeCreated:
type = NotificationType.Created;
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
break;

case NodeDataChanged:
Expand All @@ -342,6 +376,9 @@ public void process(WatchedEvent event) {

case NodeDeleted:
type = NotificationType.Deleted;
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public synchronized void process(WatchedEvent event) {
checkState(event.getState());
}

synchronized void setSessionInvalid() {
currentStatus = SessionEvent.SessionLost;
}

private void checkState(Watcher.Event.KeeperState zkClientState) {
switch (zkClientState) {
case Expired:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,14 @@ public void insertionOutsideCache(String provider, String url) throws Exception

String key1 = newKey();

assertEquals(objCache.getIfCached(key1), Optional.empty());
assertEquals(objCache.get(key1).join(), Optional.empty());

MyClass value1 = new MyClass("a", 1);
store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), Optional.of(-1L)).join();

assertEquals(objCache.getIfCached(key1), Optional.empty());
assertEquals(objCache.get(key1).join(), Optional.of(value1));
assertEquals(objCache.getIfCached(key1), Optional.of(value1));
}

@Test(dataProvider = "impl")
Expand Down

0 comments on commit cb28038

Please sign in to comment.