Skip to content

Commit

Permalink
...wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pxsalehi committed Sep 12, 2024
1 parent e379c4f commit 38020af
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
*/
ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception;

default boolean shouldBeThrottled() {
return false;
}

/**
* @return {@code true} iff this executor should only run on the elected master.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ public final TimeValue timeout() {
public final Priority priority() {
return priority;
}

public boolean shouldBeThrottled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class MasterService extends AbstractLifecycleComponent {
private final ClusterStateUpdateStatsTracker clusterStateUpdateStatsTracker = new ClusterStateUpdateStatsTracker();
private final StarvationWatcher starvationWatcher = new StarvationWatcher();

private static final long MAX_UPDATE_TASKS_PER_INTERVAL = 1;
private static final long BUDGET_INTERVAL_MILLIS = 30 * 1000;
private final long budgetStartTimeMillis = 0;
private AtomicLong budget = new AtomicLong(MAX_UPDATE_TASKS_PER_INTERVAL);

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));

Expand Down Expand Up @@ -227,6 +232,10 @@ private <T extends ClusterStateTaskListener> void executeAndPublishBatch(
return;
}

if (getTimeSince(budgetStartTimeMillis).millis() > BUDGET_INTERVAL_MILLIS) {
budget.set(MAX_UPDATE_TASKS_PER_INTERVAL);
}

final long computationStartTime = threadPool.rawRelativeTimeInMillis();
final var newClusterState = patchVersions(
previousClusterState,
Expand Down Expand Up @@ -556,14 +565,17 @@ public void submitUnbatchedStateUpdateTask(String source, ClusterStateUpdateTask
createTaskQueue("unbatched", updateTask.priority(), unbatchedExecutor).submitTask(source, updateTask, updateTask.timeout());
}

private static class UnbatchedExecutor implements ClusterStateTaskExecutor<ClusterStateUpdateTask> {
private class UnbatchedExecutor implements ClusterStateTaskExecutor<ClusterStateUpdateTask> {
@Override
@SuppressForbidden(reason = "consuming published cluster state for legacy reasons")
public ClusterState execute(BatchExecutionContext<ClusterStateUpdateTask> batchExecutionContext) throws Exception {
assert batchExecutionContext.taskContexts().size() == 1
: "this only supports a single task but received " + batchExecutionContext.taskContexts();
final var taskContext = batchExecutionContext.taskContexts().get(0);
final var task = taskContext.getTask();
if (task.shouldBeThrottled() && budget.decrementAndGet() < 0) {
throw new EsRejectedExecutionException("too many cluster state updates");
}
final ClusterState newState;
try (var ignored = taskContext.captureResponseHeaders()) {
newState = task.execute(batchExecutionContext.initialState());
Expand Down Expand Up @@ -1023,7 +1035,7 @@ public String toString() {
}
}

private static <T extends ClusterStateTaskListener> ClusterState executeTasks(
private <T extends ClusterStateTaskListener> ClusterState executeTasks(
ClusterState previousClusterState,
List<ExecutionResult<T>> executionResults,
ClusterStateTaskExecutor<T> executor,
Expand Down Expand Up @@ -1055,7 +1067,7 @@ private static <T extends ClusterStateTaskListener> boolean assertAllTasksComple

static final String TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME = "test_only_executor_may_change_version_number";

private static <T extends ClusterStateTaskListener> ClusterState innerExecuteTasks(
private <T extends ClusterStateTaskListener> ClusterState innerExecuteTasks(
ClusterState previousClusterState,
List<ExecutionResult<T>> executionResults,
ClusterStateTaskExecutor<T> executor,
Expand All @@ -1067,6 +1079,9 @@ private static <T extends ClusterStateTaskListener> ClusterState innerExecuteTas
// to avoid leaking headers in production that were missed by tests

try {
if (executor.shouldBeThrottled() && budget.decrementAndGet() < 0) {
throw new EsRejectedExecutionException("too many cluster state updates");
}
final var updatedState = executor.execute(
new ClusterStateTaskExecutor.BatchExecutionContext<>(
previousClusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener;
Expand Down Expand Up @@ -257,6 +258,35 @@ public void clusterStatePublished(ClusterState newClusterState) {
assertThat(registeredActions.toString(), registeredActions, contains(MasterService.STATE_UPDATE_ACTION_NAME));
}

public void testThrottledClusterStateUpdate() {
try (var master = createMasterService(true)) {
var failed = new AtomicInteger(0);
var succeeded = new AtomicInteger(0);
// use some listeners for this test? ref counting ...
IntStream.range(0, 2).forEach(i -> {
master.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
succeeded.incrementAndGet();
return currentState;
}

@Override
public void onFailure(Exception e) {
failed.incrementAndGet();
}

@Override
public boolean shouldBeThrottled() {
return true;
}
});
});
assertThat(succeeded.get(), equalTo(1));
assertThat(failed.get(), equalTo(1));
}
}

public void testThreadContext() throws InterruptedException {
try (var master = createMasterService(true)) {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit 38020af

Please sign in to comment.