Skip to content

Commit

Permalink
[Transform] Fail checkpoint on missing clusters (#106793)
Browse files Browse the repository at this point in the history
When there are no remote or local clusters for a given source index, we
call the listener's `onFailure` method with a `CheckpointException`.
A running transform will fail and retry, eventually moving into an
unhealthy and failed state.  Any call to the stats API will note the
checkpoint failure and return.

This fixes a timeout issue calling the Transform stats API and prevents
the Transform from being stuck in indexing.

Fix #106790
Fix #104533
  • Loading branch information
prwhelan authored Mar 27, 2024
1 parent 5ef0b57 commit eea8d1d
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 28 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/106793.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 106793
summary: Fail checkpoint on missing clusters
area: Transform
type: bug
issues:
- 104533
- 106790
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import org.elasticsearch.ElasticsearchException;

class CheckpointException extends ElasticsearchException {
CheckpointException(String msg, Object... params) {
super(msg, params);
}

CheckpointException(String msg, Throwable cause, Object... params) {
super(msg, cause, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ protected void getIndexCheckpoints(TimeValue timeout, ActionListener<Map<String,
ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
ActionListener<Map<String, long[]>> groupedListener = listener;

if (resolvedIndexes.numClusters() == 0) {
var indices = String.join(",", transformConfig.getSource().getIndex());
listener.onFailure(new CheckpointException("No clusters exist for [{}]", indices));
return;
}

if (resolvedIndexes.numClusters() > 1) {
ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
listener.onResponse(
Expand Down Expand Up @@ -234,10 +240,7 @@ private static void getCheckpointsFromOneClusterV2(
);
ActionListener<GetCheckpointAction.Response> checkpointListener;
if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()),
listener::onFailure
);
checkpointListener = listener.safeMap(GetCheckpointAction.Response::getCheckpoints);
} else {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(
Expand Down Expand Up @@ -401,12 +404,12 @@ public void getCheckpointingInfo(

long timestamp = clock.millis();

getIndexCheckpoints(timeout, ActionListener.wrap(checkpointsByIndex -> {
getIndexCheckpoints(timeout, listener.delegateFailure((l, checkpointsByIndex) -> {
TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L);
checkpointingInfoBuilder.setSourceCheckpoint(sourceCheckpoint);
checkpointingInfoBuilder.setOperationsBehind(TransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint));
listener.onResponse(checkpointingInfoBuilder);
}, listener::onFailure));
l.onResponse(checkpointingInfoBuilder);
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand Down Expand Up @@ -67,7 +68,7 @@

public class DefaultCheckpointProviderTests extends ESTestCase {

private static Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class);
private static final Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class);

private Clock clock;
private Client client;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void setUpMocks() {
transformAuditor = MockTransformAuditor.createMockAuditor();
}

public void testReportSourceIndexChangesRunsEmpty() throws Exception {
public void testReportSourceIndexChangesRunsEmpty() {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception {
);
}

public void testReportSourceIndexChangesAddDelete() throws Exception {
public void testReportSourceIndexChangesAddDelete() {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
Expand Down Expand Up @@ -197,7 +198,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception {
);
}

public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
public void testReportSourceIndexChangesAddDeleteMany() {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig);
Expand Down Expand Up @@ -231,29 +232,88 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
}

public void testHandlingShardFailures() throws Exception {
String transformId = getTestName();
String indexName = "some-index";
var transformId = getTestName();
var indexName = "some-index";
TransformConfig transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource(
new SourceConfig(indexName)
).build();

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);
var remoteClusterResolver = mock(RemoteClusterResolver.class);
doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))).when(
remoteClusterResolver
).resolve(transformConfig.getSource().getIndex());

mockGetIndexResponse(indexName);
mockIndicesStatsResponse(indexName);
mockGetCheckpointAction();

var provider = new DefaultCheckpointProvider(
clock,
parentTaskClient,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

var latch = new CountDownLatch(1);
provider.createNextCheckpoint(
null,
new LatchedActionListener<>(
ActionListener.wrap(
response -> fail("This test case must fail"),
e -> assertThat(
e.getMessage(),
startsWith(
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
+ "reason [java.lang.Exception: something's wrong"
)
)
),
latch
)
);
assertTrue(latch.await(1, TimeUnit.MILLISECONDS));
}

private void mockGetIndexResponse(String indexName) {
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
}

private void mockIndicesStatsResponse(String indexName) {
IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
doReturn(7).when(indicesStatsResponse).getFailedShards();
doReturn(
new DefaultShardOperationFailedException[] {
new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong")) }
).when(indicesStatsResponse).getShardFailures();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
}

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
private void mockGetCheckpointAction() {
doAnswer(invocationOnMock -> {
ActionListener<?> listener = invocationOnMock.getArgument(2);
listener.onFailure(new ActionNotFoundTransportException("This should fail."));
return null;
}).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any());
}

public void testHandlingNoClusters() throws Exception {
var transformId = getTestName();
var indexName = "some-missing-index";
var transformConfig = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSource(
new SourceConfig(indexName)
).build();

var remoteClusterResolver = mock(RemoteClusterResolver.class);
doReturn(new RemoteClusterResolver.ResolvedIndices(Map.of(), List.of())).when(remoteClusterResolver)
.resolve(transformConfig.getSource().getIndex());

mockGetIndexResponse(indexName);
mockIndicesStatsResponse(indexName);

var provider = new DefaultCheckpointProvider(
clock,
parentTaskClient,
remoteClusterResolver,
Expand All @@ -262,24 +322,18 @@ public void testHandlingShardFailures() throws Exception {
transformConfig
);

CountDownLatch latch = new CountDownLatch(1);
var latch = new CountDownLatch(1);
provider.createNextCheckpoint(
null,
new LatchedActionListener<>(
ActionListener.wrap(
response -> fail("This test case must fail"),
e -> assertThat(
e.getMessage(),
startsWith(
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
+ "reason [java.lang.Exception: something's wrong"
)
)
e -> assertThat(e.getMessage(), equalTo("No clusters exist for [some-missing-index]"))
),
latch
)
);
latch.await(10, TimeUnit.SECONDS);
assertTrue(latch.await(1, TimeUnit.MILLISECONDS));
}

public void testSourceHasChanged() throws InterruptedException {
Expand Down Expand Up @@ -407,8 +461,7 @@ private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transfor
);
}

private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock)
throws IllegalAccessException {
private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) {
MockLogAppender mockLogAppender = new MockLogAppender();
mockLogAppender.start();

Expand All @@ -429,10 +482,9 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec
}
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
ActionListener<Response> listener = invocationOnMock.getArgument(2);
listener.onResponse(response);
return null;
};
Expand Down

0 comments on commit eea8d1d

Please sign in to comment.