Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored and Technoboy- committed May 30, 2023
1 parent 96f49e2 commit 305cf9c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -530,7 +531,19 @@ public void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {
executor.execute(() -> future.completeExceptionally(t));
future.completeExceptionally(t);
}
}

/**
* Run the task in the executor thread and fail the future if the executor is shutting down.
*/
@VisibleForTesting
public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) {
try {
executor.execute(task);
} catch (final Throwable t) {
futures.get().forEach(f -> f.completeExceptionally(t));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,29 @@ protected void batchOperation(List<MetadataOp> ops) {
}

// Trigger all the futures in the batch
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
execute(() -> {
switch (op.getType()) {
case PUT:
handlePutResult(op.asPut(), opr);
break;
case DELETE:
handleDeleteResult(op.asDelete(), opr);
break;
case GET:
handleGetResult(op.asGet(), opr);
break;
case GET_CHILDREN:
handleGetChildrenResult(op.asGetChildren(), opr);
break;

default:
op.getFuture().completeExceptionally(new MetadataStoreException(
"Operation type not supported in multi: " + op.getType()));
execute(() -> {
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
switch (op.getType()) {
case PUT:
handlePutResult(op.asPut(), opr);
break;
case DELETE:
handleDeleteResult(op.asDelete(), opr);
break;
case GET:
handleGetResult(op.asGet(), opr);
break;
case GET_CHILDREN:
handleGetChildrenResult(op.asGetChildren(), opr);
break;
default:
op.getFuture().completeExceptionally(new MetadataStoreException(
"Operation type not supported in multi: " + op.getType()));
}
}
}, op.getFuture());
}
}, () -> ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new MetadataStoreException(t)));
Expand Down

0 comments on commit 305cf9c

Please sign in to comment.