Skip to content

Commit

Permalink
[proof-of-concept] Introduce the sync() API to guaratee consistency o…
Browse files Browse the repository at this point in the history
…n critical metadata operation paths
  • Loading branch information
eolivelli committed Nov 17, 2022
1 parent 883f760 commit 30d7d3a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ protected CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
}

protected CompletableFuture<Optional<T>> refreshAndGetAsync(String path) {
return store.sync(path).thenCompose(___ -> {
cache.invalidate(path);
return cache.get(path);
});
}

protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,18 @@ public CompletableFuture<List<String>> listPartitionedTopicsAsync(NamespaceName
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn) {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
return getPartitionedTopicMetadataAsync(tn, false);
}

public CompletableFuture<Optional<PartitionedTopicMetadata>> getPartitionedTopicMetadataAsync(TopicName tn,
boolean refresh) {
if (refresh) {
return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
} else {
return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}
}

public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException {
Expand Down Expand Up @@ -317,7 +327,7 @@ public CompletableFuture<Boolean> isPartitionedTopicBeingDeletedAsync(TopicName
if (tn.isPartitioned()) {
tn = TopicName.get(tn.getPartitionedTopicName());
}
return getPartitionedTopicMetadataAsync(tn)
return getPartitionedTopicMetadataAsync(tn, true)
.thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted)
.orElse(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<Optional<GetResult>> get(String path);


/**
* Ensure that the next value read from the local client will be up-to-date with the latest version of the value
* as it can be seen by all the other clients.
* @param path
* @return a handle to the operation
*/
default CompletableFuture<Void> sync(String path) {
return CompletableFuture.completedFuture(null);
}

/**
* Return all the nodes (lexicographically sorted) that are children to the specific path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,24 @@ protected void receivedSessionEvent(SessionEvent event) {
}
}

@Override
public CompletableFuture<Void> sync(String path) {
CompletableFuture<Void> result = new CompletableFuture<>();
zkc.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String s, Object o) {
Code code = Code.get(rc);
if (code == Code.OK) {
result.complete(null);
} else {
MetadataStoreException e = getException(code, path);
result.completeExceptionally(e);
}
}
}, null);
return result;
}

@Override
protected void batchOperation(List<MetadataOp> ops) {
try {
Expand Down

0 comments on commit 30d7d3a

Please sign in to comment.