diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index 081f8150d8c8e..fc753c2fea5a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -43,6 +43,10 @@ public interface ClusterStateTaskExecutor { */ ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception; + default boolean shouldBeThrottled() { + return false; + } + /** * @return {@code true} iff this executor should only run on the elected master. */ diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 4f038132e7dfa..32fcd2d68a430 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -69,4 +69,8 @@ public final TimeValue timeout() { public final Priority priority() { return priority; } + + public boolean shouldBeThrottled() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 296acc30a83f5..0ae3110e1703c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -118,6 +118,11 @@ public class MasterService extends AbstractLifecycleComponent { private final ClusterStateUpdateStatsTracker clusterStateUpdateStatsTracker = new ClusterStateUpdateStatsTracker(); private final StarvationWatcher starvationWatcher = new StarvationWatcher(); + private static final long MAX_UPDATE_TASKS_PER_INTERVAL = 1; + private static final long BUDGET_INTERVAL_MILLIS = 30 * 1000; + private final long budgetStartTimeMillis = 0; + private AtomicLong budget = new AtomicLong(MAX_UPDATE_TASKS_PER_INTERVAL); + public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -227,6 +232,10 @@ private void executeAndPublishBatch( return; } + if (getTimeSince(budgetStartTimeMillis).millis() > BUDGET_INTERVAL_MILLIS) { + budget.set(MAX_UPDATE_TASKS_PER_INTERVAL); + } + final long computationStartTime = threadPool.rawRelativeTimeInMillis(); final var newClusterState = patchVersions( previousClusterState, @@ -556,7 +565,7 @@ public void submitUnbatchedStateUpdateTask(String source, ClusterStateUpdateTask createTaskQueue("unbatched", updateTask.priority(), unbatchedExecutor).submitTask(source, updateTask, updateTask.timeout()); } - private static class UnbatchedExecutor implements ClusterStateTaskExecutor { + private class UnbatchedExecutor implements ClusterStateTaskExecutor { @Override @SuppressForbidden(reason = "consuming published cluster state for legacy reasons") public ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception { @@ -564,6 +573,9 @@ public ClusterState execute(BatchExecutionContext batchE : "this only supports a single task but received " + batchExecutionContext.taskContexts(); final var taskContext = batchExecutionContext.taskContexts().get(0); final var task = taskContext.getTask(); + if (task.shouldBeThrottled() && budget.decrementAndGet() < 0) { + throw new EsRejectedExecutionException("too many cluster state updates"); + } final ClusterState newState; try (var ignored = taskContext.captureResponseHeaders()) { newState = task.execute(batchExecutionContext.initialState()); @@ -1023,7 +1035,7 @@ public String toString() { } } - private static ClusterState executeTasks( + private ClusterState executeTasks( ClusterState previousClusterState, List> executionResults, ClusterStateTaskExecutor executor, @@ -1055,7 +1067,7 @@ private static boolean assertAllTasksComple static final String TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME = "test_only_executor_may_change_version_number"; - private static ClusterState innerExecuteTasks( + private ClusterState innerExecuteTasks( ClusterState previousClusterState, List> executionResults, ClusterStateTaskExecutor executor, @@ -1067,6 +1079,9 @@ private static ClusterState innerExecuteTas // to avoid leaking headers in production that were missed by tests try { + if (executor.shouldBeThrottled() && budget.decrementAndGet() < 0) { + throw new EsRejectedExecutionException("too many cluster state updates"); + } final var updatedState = executor.execute( new ClusterStateTaskExecutor.BatchExecutionContext<>( previousClusterState, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 26faa295cf727..01af9ed8d028f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptySet; import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener; @@ -257,6 +258,35 @@ public void clusterStatePublished(ClusterState newClusterState) { assertThat(registeredActions.toString(), registeredActions, contains(MasterService.STATE_UPDATE_ACTION_NAME)); } + public void testThrottledClusterStateUpdate() { + try (var master = createMasterService(true)) { + var failed = new AtomicInteger(0); + var succeeded = new AtomicInteger(0); + // use some listeners for this test? ref counting ... + IntStream.range(0, 2).forEach(i -> { + master.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + succeeded.incrementAndGet(); + return currentState; + } + + @Override + public void onFailure(Exception e) { + failed.incrementAndGet(); + } + + @Override + public boolean shouldBeThrottled() { + return true; + } + }); + }); + assertThat(succeeded.get(), equalTo(1)); + assertThat(failed.get(), equalTo(1)); + } + } + public void testThreadContext() throws InterruptedException { try (var master = createMasterService(true)) { final CountDownLatch latch = new CountDownLatch(1);