Skip to content

Commit

Permalink
[improve] Introduce the sync() API to ensure consistency on reads dur…
Browse files Browse the repository at this point in the history
…ing critical metadata operation paths (#18518)

(cherry picked from commit 492a9c3)
  • Loading branch information
eolivelli authored and poorbarcode committed Aug 31, 2023
1 parent 810a2f0 commit 29bb685
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ protected CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
}

protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
return store.sync(path).thenCompose(___ -> {
cache.invalidate(path);
return cache.get(path);
});
}

protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,18 @@ public CompletableFuture<List<String>> listPartitionedTopicsAsync(NamespaceName
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
return getPartitionedTopicMetadataAsync(tn, false);
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn,
boolean refresh) {
if (refresh) {
return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
} else {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
}

public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<Optional<GetResult>> get(String path);


/**
* Ensure that the next value read from the local client will be up-to-date with the latest version of the value
* as it can be seen by all the other clients.
* @param path
* @return a handle to the operation
*/
default CompletableFuture<Void> sync(String path) {
return CompletableFuture.completedFuture(null);
}

/**
* Return all the nodes (lexicographically sorted) that are children to the specific path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

@Override
public CompletableFuture<Void> sync(String path) {
CompletableFuture<Void> result = new CompletableFuture<>();
zkc.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object o) {
Code code = Code.get(rc);
if (code == Code.OK) {
result.complete(null);
} else {
MetadataStoreException e = getException(code, path);
result.completeExceptionally(e);
}
}
}, null);
return result;
}

@Override
protected void batchOperation(List<MetadataOp> ops) {
try {
Expand Down

0 comments on commit 29bb685

Please sign in to comment.