From 29bb685c2c31654a2f5aec91316ebc3097890e5b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 27 Dec 2022 11:55:17 +0100 Subject: [PATCH] [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518) (cherry picked from commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f) --- .../pulsar/broker/resources/BaseResources.java | 7 +++++++ .../broker/resources/NamespaceResources.java | 14 ++++++++++++-- .../pulsar/metadata/api/MetadataStore.java | 11 +++++++++++ .../pulsar/metadata/impl/ZKMetadataStore.java | 18 ++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index bc670c53a8c8f..6f3fb7d714a70 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -92,6 +92,13 @@ protected CompletableFuture> getAsync(String path) { return cache.get(path); } + protected CompletableFuture> refreshAndGetAsync(String path) { + return store.sync(path).thenCompose(___ -> { + cache.invalidate(path); + return cache.get(path); + }); + } + protected void set(String path, Function modifyFunction) throws MetadataStoreException { try { setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 90e3971c4cf5b..5e59d20bfbfbb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -237,8 +237,18 @@ public CompletableFuture> listPartitionedTopicsAsync(NamespaceName } public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn) { - return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), - tn.getEncodedLocalName())); + return getPartitionedTopicMetadataAsync(tn, false); + } + + public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn, + boolean refresh) { + if (refresh) { + return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } else { + return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } } public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index b4295f2586768..79670e859d6d0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture> get(String path); + + /** + * Ensure that the next value read from the local client will be up-to-date with the latest version of the value + * as it can be seen by all the other clients. + * @param path + * @return a handle to the operation + */ + default CompletableFuture sync(String path) { + return CompletableFuture.completedFuture(null); + } + /** * Return all the nodes (lexicographically sorted) that are children to the specific path. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 6f980d3e79466..33e708a76b9dc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -153,6 +153,24 @@ protected void receivedSessionEvent(SessionEvent event) { } } + @Override + public CompletableFuture sync(String path) { + CompletableFuture result = new CompletableFuture<>(); + zkc.sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String s, Object o) { + Code code = Code.get(rc); + if (code == Code.OK) { + result.complete(null); + } else { + MetadataStoreException e = getException(code, path); + result.completeExceptionally(e); + } + } + }, null); + return result; + } + @Override protected void batchOperation(List ops) { try {