Skip to content

Commit

Permalink
[fix] [meta]Switch to the metadata store thread after zk operation (#…
Browse files Browse the repository at this point in the history
…20303)

(cherry picked from commit 35d9612)
  • Loading branch information
poorbarcode committed May 30, 2023
1 parent 1ce9cca commit b97a1b4
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -534,6 +535,18 @@ public void execute(Runnable task, CompletableFuture<?> future) {
}
}

/**
* Run the task in the executor thread and fail the future if the executor is shutting down.
*/
@VisibleForTesting
public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) {
try {
executor.execute(task);
} catch (final Throwable t) {
futures.get().forEach(f -> f.completeExceptionally(t));
}
}

protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,29 +204,29 @@ protected void batchOperation(List<MetadataOp> ops) {
}

// Trigger all the futures in the batch
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);

switch (op.getType()) {
case PUT:
handlePutResult(op.asPut(), opr);
break;
case DELETE:
handleDeleteResult(op.asDelete(), opr);
break;
case GET:
handleGetResult(op.asGet(), opr);
break;
case GET_CHILDREN:
handleGetChildrenResult(op.asGetChildren(), opr);
break;

default:
op.getFuture().completeExceptionally(new MetadataStoreException(
"Operation type not supported in multi: " + op.getType()));
}
}
execute(() -> {
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
switch (op.getType()) {
case PUT:
handlePutResult(op.asPut(), opr);
break;
case DELETE:
handleDeleteResult(op.asDelete(), opr);
break;
case GET:
handleGetResult(op.asGet(), opr);
break;
case GET_CHILDREN:
handleGetChildrenResult(op.asGetChildren(), opr);
break;
default:
op.getFuture().completeExceptionally(new MetadataStoreException(
"Operation type not supported in multi: " + op.getType()));
}
}
}, () -> ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new MetadataStoreException(t)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -50,8 +51,10 @@
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -425,6 +428,73 @@ public void testDeleteUnusedDirectories(String provider, Supplier<String> urlSup
assertFalse(store.exists(prefix).join());
}

@DataProvider(name = "conditionOfSwitchThread")
public Object[][] conditionOfSwitchThread(){
return new Object[][]{
{false, false},
{false, true},
{true, false},
{true, true}
};
}

@Test(dataProvider = "conditionOfSwitchThread")
public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean enabledBatch) throws Exception {
final String prefix = newKey();
final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");
MetadataStoreConfig.MetadataStoreConfigBuilder builder =
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
builder.fsyncEnable(false);
builder.batchingEnabled(enabledBatch);
if (!hasSynchronizer) {
builder.synchronizer(null);
}
MetadataStoreConfig config = builder.build();
@Cleanup
ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config);

final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s",
metadataStoreName, currentThreadName);
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName), errorMessage);
};

// put with node which has parent(but the parent node is not exists).
store.put(prefix + "/a1/b1/c1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> {
verify.run();
return null;
}).join();
// put.
store.put(prefix + "/b1", "value".getBytes(), Optional.of(-1L)).thenApply((ignore) -> {
verify.run();
return null;
}).join();
// get.
store.get(prefix + "/b1").thenApply((ignore) -> {
verify.run();
return null;
}).join();
// get the node which is not exists.
store.get(prefix + "/non").thenApply((ignore) -> {
verify.run();
return null;
}).join();
// delete.
store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
verify.run();
return null;
}).join();
// delete the node which is not exists.
store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
verify.run();
return null;
}).exceptionally(ex -> {
verify.run();
return null;
}).join();
}

@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception {
String metadataUrl = urlSupplier.get();
Expand Down

0 comments on commit b97a1b4

Please sign in to comment.