Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Finetune Schedule to be less noisy on retry and retry slower #88531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.transform.notifications;

import org.elasticsearch.common.Strings;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
Expand Down Expand Up @@ -37,4 +38,9 @@ public final String getJobType() {
protected Optional<String> getResourceField() {
return Optional.of(TRANSFORM_ID.getPreferredName());
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface Listener {
private final Listener taskListener;
private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES;
private final AtomicInteger failureCount;
// Keeps track of the last failure that occured, used for throttling logs and audit
private final AtomicReference<String> lastFailure = new AtomicReference<>();
private volatile Instant changesLastDetectedAt;
private volatile Instant lastSearchTime;
private volatile boolean shouldStopAtCheckpoint = false;
Expand Down Expand Up @@ -68,6 +70,7 @@ void setTaskStateToFailed(String reason) {
void resetReasonAndFailureCounter() {
stateReason.set(null);
failureCount.set(0);
lastFailure.set(null);
taskListener.failureCountChanged();
}

Expand Down Expand Up @@ -99,12 +102,17 @@ int getFailureCount() {
return failureCount.get();
}

int incrementAndGetFailureCount() {
int incrementAndGetFailureCount(String failure) {
int newFailureCount = failureCount.incrementAndGet();
lastFailure.set(failure);
taskListener.failureCountChanged();
return newFailureCount;
}

String getLastFailure() {
return lastFailure.get();
}

void setChangesLastDetectedAt(Instant time) {
changesLastDetectedAt = time;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ private enum RunState {
private volatile TransformCheckpoint lastCheckpoint;
private volatile TransformCheckpoint nextCheckpoint;

// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private volatile RunState runState;

private volatile long lastCheckpointCleanup = 0L;
Expand Down Expand Up @@ -924,7 +922,8 @@ void stopAndMaybeSaveState() {
* (Note: originally this method was synchronized, which is not necessary)
*/
void handleFailure(Exception e) {
logger.warn(() -> "[" + getJobId() + "] transform encountered an exception: ", e);
// more detailed reporting in the handlers and below
logger.debug(() -> "[" + getJobId() + "] transform encountered an exception: ", e);
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);

if (unwrappedException instanceof CircuitBreakingException) {
Expand Down Expand Up @@ -957,7 +956,13 @@ void handleFailure(Exception e) {

int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries())
.orElse(context.getNumFailureRetries());
if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) {

// group failures to decide whether to report it below
final String thisFailureClass = unwrappedException.getClass().toString();
final String lastFailureClass = context.getLastFailure();
final int failureCount = context.incrementAndGetFailureCount(thisFailureClass);

if (numFailureRetries != -1 && failureCount > numFailureRetries) {
failIndexer(
"task encountered more than "
+ numFailureRetries
Expand All @@ -969,14 +974,16 @@ void handleFailure(Exception e) {

// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor.warning(
getJobId(),
"Transform encountered an exception: " + message + "; Will attempt again at next scheduled trigger."
// and if the number of retries is about to exceed
if (thisFailureClass.equals(lastFailureClass) == false || failureCount == numFailureRetries) {
String message = format(
"Transform encountered an exception: [%s]; Will automatically retry [%d/%d]",
ExceptionRootCauseFinder.getDetailedMessage(unwrappedException),
failureCount,
numFailureRetries
);
lastAuditedExceptionMessage = message;
logger.warn(() -> "[" + getJobId() + "] " + message);
auditor.warning(getJobId(), message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class TransformScheduledTask {
/**
* Minimum delay that can be applied after a failure.
*/
private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(1).toMillis();
private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(5).toMillis();
/**
* Maximum delay that can be applied after a failure.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

package org.elasticsearch.xpack.transform.notifications;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -35,6 +39,7 @@
public class MockTransformAuditor extends TransformAuditor {

private static final String MOCK_NODE_NAME = "mock_node_name";
private static final Logger logger = LogManager.getLogger(MockTransformAuditor.class);

@SuppressWarnings("unchecked")
public static MockTransformAuditor createMockAuditor() {
Expand Down Expand Up @@ -97,30 +102,38 @@ public interface AuditExpectation {
}

public abstract static class AbstractAuditExpectation implements AuditExpectation {
protected final String expectedName;
protected final String name;
protected final Level expectedLevel;
protected final String expectedResourceId;
protected final String expectedMessage;
volatile boolean saw;

public AbstractAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
this.expectedName = expectedName;
protected final int expectedCount;
volatile int count;

public AbstractAuditExpectation(
String name,
Level expectedLevel,
String expectedResourceId,
String expectedMessage,
int expectedCount
) {
this.name = name;
this.expectedLevel = expectedLevel;
this.expectedResourceId = expectedResourceId;
this.expectedMessage = expectedMessage;
this.saw = false;
this.expectedCount = expectedCount;
this.count = 0;
}

@Override
public void match(final Level level, final String resourceId, final String message) {
if (level.equals(expectedLevel) && resourceId.equals(expectedResourceId) && innerMatch(level, resourceId, message)) {
if (Regex.isSimpleMatchPattern(expectedMessage)) {
if (Regex.simpleMatch(expectedMessage, message)) {
saw = true;
++count;
}
} else {
if (message.contains(expectedMessage)) {
saw = true;
++count;
}
}
}
Expand All @@ -131,31 +144,70 @@ public boolean innerMatch(final Level level, final String resourceId, final Stri
}
}

/**
* Expectation to assert a certain audit message has been issued once or multiple times.
*/
public static class SeenAuditExpectation extends AbstractAuditExpectation {

public SeenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
/**
* Expectation to match an audit exactly once.
*
* @param name name of the expected audit, free of choice, used for the assert message
* @param expectedLevel The expected level of the audit
* @param expectedResourceId The expected resource id
* @param expectedMessage Expected message of the audit, supports simple wildcard matching
*/
public SeenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) {
super(name, expectedLevel, expectedResourceId, expectedMessage, 1);
}

/**
* Expectation to match an audit a certain number of times.
*
* @param name name of the expected audit, free of choice, used for the assert message
* @param expectedLevel The expected level of the audit
* @param expectedResourceId The expected resource id
* @param expectedMessage Expected message of the audit, supports simple wildcard matching
* @param expectedCount Expected number of times the audit should be matched
*/
public SeenAuditExpectation(
String name,
Level expectedLevel,
String expectedResourceId,
String expectedMessage,
int expectedCount
) {
super(name, expectedLevel, expectedResourceId, expectedMessage, expectedCount);
}

@Override
public void assertMatched() {
assertThat("expected to see " + expectedName + " but did not", saw, equalTo(true));
assertThat(
"expected to see " + name + " " + expectedCount + " times but saw it " + count + " times ",
count,
equalTo(expectedCount)
);
}
}

/**
* Expectation to assert a certain audit message is not issued.
*/
public static class UnseenAuditExpectation extends AbstractAuditExpectation {

public UnseenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
public UnseenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) {
super(name, expectedLevel, expectedResourceId, expectedMessage, 0);
}

@Override
public void assertMatched() {
assertThat("expected not to see " + expectedName + " but did", saw, equalTo(false));
assertThat("expected not to see " + name + " but did", count, equalTo(expectedCount));
}
}

private void audit(Level level, String resourceId, String message) {
logger.info("AUDIT: {}", new TransformAuditMessage(resourceId, message, level, new Date(), MOCK_NODE_NAME));

for (AuditExpectation expectation : expectations) {
expectation.match(level, resourceId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public void verifyNoMoreInteractionsOnMocks() {

public void testFailureCount() {
TransformContext context = new TransformContext(null, null, 0, listener);
assertThat(context.incrementAndGetFailureCount(), is(equalTo(1)));
assertThat(context.incrementAndGetFailureCount("some_exception"), is(equalTo(1)));
assertThat(context.getFailureCount(), is(equalTo(1)));
assertThat(context.incrementAndGetFailureCount(), is(equalTo(2)));
assertThat(context.incrementAndGetFailureCount("some_other_exception"), is(equalTo(2)));
assertThat(context.getFailureCount(), is(equalTo(2)));
context.resetReasonAndFailureCounter();
assertThat(context.getFailureCount(), is(equalTo(0)));
assertThat(context.getLastFailure(), is(nullValue()));

// Verify that the listener is notified every time the failure count is incremented or reset
verify(listener, times(3)).failureCountChanged();
Expand Down
Loading