Skip to content

Commit

Permalink
Prevent Duplicate ILM Cluster State Updates from Being Created (elast…
Browse files Browse the repository at this point in the history
…ic#78390)

Prevent duplicate ILM tasks from being enqueued to fix the most immediate issues around elastic#78246. The ILM logic should be further improved though. I did not include `MoveToErrorStepUpdateTask` in this change yet as I wasn't entirely sure how valid/safe hashing/comparing arbitrary `Exception`s would be. That could be looked into in a follow-up as well.

Relates elastic#77466 

Closes elastic#78246
  • Loading branch information
original-brownbear committed Sep 29, 2021
1 parent 490c23b commit 357de11
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public class BranchingStep extends ClusterStateActionStep {

private static final Logger logger = LogManager.getLogger(BranchingStep.class);

private StepKey nextStepKeyOnFalse;
private StepKey nextStepKeyOnTrue;
private BiPredicate<Index, ClusterState> predicate;
private SetOnce<Boolean> predicateValue;
private final StepKey nextStepKeyOnFalse;
private final StepKey nextStepKeyOnTrue;
private final BiPredicate<Index, ClusterState> predicate;
private final SetOnce<Boolean> predicateValue;

/**
* {@link BranchingStep} is a step whose next step is based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,9 +24,10 @@
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;

import java.io.IOException;
import java.util.Objects;
import java.util.function.LongSupplier;

public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(ExecuteStepsUpdateTask.class);
private final String policy;
private final Index index;
Expand Down Expand Up @@ -175,7 +176,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
IndexMetadata indexMetadata = newState.metadata().index(index);
if (indexMetadata != null) {
Expand All @@ -200,15 +201,28 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

@Override
public void onFailure(String source, Exception e) {
throw new ElasticsearchException(
"policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e);
public void handleFailure(String source, Exception e) {
logger.warn(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}].", policy, index, startStep.getKey()), e);
}

private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException {
this.failure = cause;
logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
logger.warn("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
currentStepKey);
return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecuteStepsUpdateTask that = (ExecuteStepsUpdateTask) o;
return policy.equals(that.policy) && index.equals(that.index)
&& Objects.equals(startStep, that.startStep);
}

@Override
public int hashCode() {
return Objects.hash(policy, index, startStep);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.util.concurrent.ListenableFuture;

/**
* Base class for index lifecycle cluster state update tasks that requires implementing {@code equals} and {@code hashCode} to allow
* for these tasks to be deduplicated by {@link IndexLifecycleRunner}.
*/
public abstract class IndexLifecycleClusterStateUpdateTask extends ClusterStateUpdateTask {

private final ListenableFuture<Void> listener = new ListenableFuture<>();

@Override
public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
onClusterStateProcessed(source, oldState, newState);
}

@Override
public final void onFailure(String source, Exception e) {
listener.onFailure(e);
handleFailure(source, e);
}

/**
* Add a listener that is resolved once this update has been processed or failed and before either the
* {@link #onClusterStateProcessed(String, ClusterState, ClusterState)} or the {@link #handleFailure(String, Exception)} hooks are
* executed.
*/
public final void addListener(ActionListener<Void> listener) {
this.listener.addListener(listener);
}

/**
* This method is functionally the same as {@link ClusterStateUpdateTask#clusterStateProcessed(String, ClusterState, ClusterState)}
* and implementations can override it as they would override {@code ClusterStateUpdateTask#clusterStateProcessed}.
*/
protected void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}

@Override
public abstract boolean equals(Object other);

@Override
public abstract int hashCode();

/**
* This method is functionally the same as {@link ClusterStateUpdateTask#onFailure(String, Exception)} and implementations can override
* it as they would override {@code ClusterStateUpdateTask#onFailure}.
*/
protected abstract void handleFailure(String source, Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -35,7 +36,10 @@
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;

import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.function.LongSupplier;

import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;
Expand Down Expand Up @@ -374,7 +378,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
}
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
clusterService.submitStateUpdateTask(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
submitUnlessAlreadyQueued(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier));
} else {
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
Expand All @@ -387,7 +391,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
*/
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
clusterService.submitStateUpdateTask(
submitUnlessAlreadyQueued(
String.format(Locale.ROOT, "ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}", policy,
index.getName(), currentStepKey, newStepKey),
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
Expand Down Expand Up @@ -420,7 +424,7 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
* changing other execution state.
*/
private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) {
clusterService.submitStateUpdateTask(
submitUnlessAlreadyQueued(
String.format(Locale.ROOT, "ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
currentStepKey),
new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
Expand Down Expand Up @@ -504,4 +508,27 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
LifecycleExecutionState.fromIndexMetadata(indexMetadata),
failure));
}

private final Set<IndexLifecycleClusterStateUpdateTask> executingTasks = Collections.synchronizedSet(new HashSet<>());

/**
* Tracks already executing {@link IndexLifecycleClusterStateUpdateTask} tasks in {@link #executingTasks} to prevent queueing up
* duplicate cluster state updates.
* TODO: refactor ILM logic so that this is not required any longer. It is unreasonably expensive to only filter out duplicate tasks at
* this point given how these tasks are mostly set up on the cluster state applier thread.
*
* @param source source string as used in {@link ClusterService#submitStateUpdateTask(String, ClusterStateTaskConfig)}
* @param task task to submit unless already tracked in {@link #executingTasks}.
*/
private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterStateUpdateTask task) {
if (executingTasks.add(task)) {
task.addListener(ActionListener.wrap(() -> {
final boolean removed = executingTasks.remove(task);
assert removed : "tried to unregister unknown task [" + task + "]";
}));
clusterService.submitStateUpdateTask(source, task);
} else {
logger.trace("skipped redundant execution of [{}]", source);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public class IndexLifecycleService
private final PolicyStepsRegistry policyRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private final Settings settings;
private ClusterService clusterService;
private LongSupplier nowSupplier;
private final ClusterService clusterService;
private final LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;

public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Step;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class);

private final Index index;
Expand All @@ -44,22 +44,6 @@ public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey current
this.stateChangeConsumer = stateChangeConsumer;
}

Index getIndex() {
return index;
}

String getPolicy() {
return policy;
}

Step.StepKey getCurrentStepKey() {
return currentStepKey;
}

Step.StepKey getNextStepKey() {
return nextStepKey;
}

@Override
public ClusterState execute(ClusterState currentState) {
IndexMetadata indexMetadata = currentState.getMetadata().index(index);
Expand All @@ -82,15 +66,36 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
stateChangeConsumer.accept(newState);
}
}

@Override
public void onFailure(String source, Exception e) {
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step ["
+ currentStepKey + "] to step [" + nextStepKey + "].", e);
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MoveToNextStepUpdateTask that = (MoveToNextStepUpdateTask) o;
return index.equals(that.index)
&& policy.equals(that.policy)
&& currentStepKey.equals(that.currentStepKey)
&& nextStepKey.equals(that.nextStepKey);
}

@Override
public int hashCode() {
return Objects.hash(index, policy, currentStepKey, nextStepKey);
}

@Override
public void handleFailure(String source, Exception e) {
logger.warn(
new ParameterizedMessage(
"policy [{}] for index [{}] failed trying to move from step [{}] to step [{}].",
policy, index, currentStepKey, nextStepKey
),
e
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -25,7 +24,7 @@
import java.io.IOException;
import java.util.Objects;

public class SetStepInfoUpdateTask extends ClusterStateUpdateTask {
public class SetStepInfoUpdateTask extends IndexLifecycleClusterStateUpdateTask {

private static final Logger logger = LogManager.getLogger(SetStepInfoUpdateTask.class);

Expand Down Expand Up @@ -78,9 +77,28 @@ public ClusterState execute(ClusterState currentState) throws IOException {
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(new ParameterizedMessage("policy [{}] for index [{}] failed trying to set step info for step [{}].",
policy, index.getName(), currentStepKey), e);
public void handleFailure(String source, Exception e) {
logger.warn(
new ParameterizedMessage(
"policy [{}] for index [{}] failed trying to set step info for step [{}].",
policy, index, currentStepKey
),
e
);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SetStepInfoUpdateTask that = (SetStepInfoUpdateTask) o;
return index.equals(that.index) && policy.equals(that.policy)
&& currentStepKey.equals(that.currentStepKey) && Objects.equals(stepInfo, that.stepInfo);
}

@Override
public int hashCode() {
return Objects.hash(index, policy, currentStepKey, stepInfo);
}

public static class ExceptionWrapper implements ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

public class SnapshotLifecycleTask implements SchedulerEngine.Listener {

private static Logger logger = LogManager.getLogger(SnapshotLifecycleTask.class);
private static final Logger logger = LogManager.getLogger(SnapshotLifecycleTask.class);

private final Client client;
private final ClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.ilm;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -250,11 +249,7 @@ public void testOnFailure() throws IOException {
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
assertEquals("policy [" + mixedPolicyName + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].",
exception.getMessage());
assertSame(expectedException, exception.getCause());
task.onFailure(randomAlphaOfLength(10), expectedException);
}

public void testClusterActionStepThrowsException() throws IOException {
Expand Down
Loading

0 comments on commit 357de11

Please sign in to comment.