Skip to content

Commit

Permalink
[Transform] Prevent the ResourceNotFoundException from being thrown o…
Browse files Browse the repository at this point in the history
…n force-stop
  • Loading branch information
przemekwitek committed Apr 16, 2024
1 parent 5ba6d1f commit b0e0884
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,51 @@ public void testBatchTransformLifecycltInALoop() throws IOException {
}
}

public void testInterruptedBatchTransformLifecycltInALoop() throws IOException {
createReviewsIndex();

String transformId = "test_interrupted_batch_lifecycle_in_a_loop";
String destIndex = transformId + "-dest";
for (int i = 0; i < 100; ++i) {
long sleepAfterStartMillis = randomLongBetween(0, 1_000);
boolean force = randomBoolean();
try {
// Create the batch transform.
createPivotReviewsTransform(transformId, destIndex, null);
assertThat(getTransformTasks(), is(empty()));
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

startTransform(transformId);
// There is 1 transform task after start.
assertThat(getTransformTasks(), hasSize(1));
assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));

Thread.sleep(sleepAfterStartMillis);

// Stop the transform with force set randomly.
stopTransform(transformId, force);
// After the transform is stopped, there should be no transform task left.
if (force) {
// If the "force" has been used, then the persistent task is removed from the cluster state but the local task can still
// be seen by the PersistentTasksNodeService. We need to wait until PersistentTasksNodeService reconciles the state.
assertBusy(() -> assertThat(getTransformTasks(), is(empty())));
} else {
// If the "force" hasn't been used then we can expect the local task to be already gone.
assertThat(getTransformTasks(), is(empty()));
}
assertThat(getTransformTasksFromClusterState(transformId), is(empty()));

// Delete the transform.
deleteTransform(transformId);
} catch (AssertionError | Exception e) {
throw new AssertionError(
format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()),
e
);
}
}
}

public void testContinuousTransformLifecycleInALoop() throws Exception {
createReviewsIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
);

final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
persistentTasksService,
transformNodeAssignments.getWaitingForAssignment(),
finalListener
);
Expand All @@ -173,9 +174,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
// When force==true, we **do not** fan out to individual tasks (i.e. taskOperation method will not be called) as we
// want to make sure that the persistent tasks will be removed from cluster state even if these tasks are no longer
// visible by the PersistentTasksService.
cancelTransformTasksListener(transformNodeAssignments.getAssigned(), doExecuteListener).onResponse(
new Response(true)
);
cancelTransformTasksListener(persistentTasksService, transformNodeAssignments.getAssigned(), doExecuteListener)
.onResponse(new Response(true));
} else if (transformNodeAssignments.getExecutorNodes().isEmpty()) {
doExecuteListener.onResponse(new Response(true));
} else {
Expand All @@ -195,6 +195,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
// found transforms without a config
} else if (request.isForce()) {
final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
persistentTasksService,
transformNodeAssignments.getWaitingForAssignment(),
finalListener
);
Expand Down Expand Up @@ -488,14 +489,16 @@ private void waitForTransformStopped(
}));
}

// Visible for testing
/**
* Creates and returns the listener that sends remove request for every task in the given set.
*
* @param transformTasks set of transform tasks that should be removed
* @param finalListener listener that should be called once all the given tasks are removed
* @return listener that removes given tasks in parallel
*/
private ActionListener<Response> cancelTransformTasksListener(
static ActionListener<Response> cancelTransformTasksListener(
final PersistentTasksService persistentTasksService,
final Set<String> transformTasks,
final ActionListener<Response> finalListener
) {
Expand All @@ -505,16 +508,23 @@ private ActionListener<Response> cancelTransformTasksListener(
return ActionListener.wrap(response -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
transformTasks.size(),
ActionListener.wrap(r -> finalListener.onResponse(response), finalListener::onFailure)
ActionListener.wrap(unused -> finalListener.onResponse(response), finalListener::onFailure)
);

for (String taskId : transformTasks) {
persistentTasksService.sendRemoveRequest(taskId, null, groupedListener);
persistentTasksService.sendRemoveRequest(taskId, null, ActionListener.wrap(groupedListener::onResponse, e -> {
// If we are about to remove a persistent task which does not exist, treat it as success.
if (e instanceof ResourceNotFoundException) {
groupedListener.onResponse(null);
return;
}
groupedListener.onFailure(e);
}));
}
}, e -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
transformTasks.size(),
ActionListener.wrap(r -> finalListener.onFailure(e), finalListener::onFailure)
ActionListener.wrap(unused -> finalListener.onFailure(e), finalListener::onFailure)
);

for (String taskId : transformTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,50 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTaskResponse;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TransportStopTransformActionTests extends ESTestCase {

Expand Down Expand Up @@ -198,4 +224,85 @@ public void testBuildException() {
assertThat(statusException.getSuppressed().length, equalTo(0));
}

public void testCancelTransformTasksListener_NoTasks() {
StopTransformAction.Response responseTrue = new StopTransformAction.Response(true);

PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
Set<String> transformTasks = Set.of();
ActionListener<StopTransformAction.Response> listener = Mockito.<ActionListener<StopTransformAction.Response>>mock();

ActionListener<StopTransformAction.Response> cancelTransformTasksListener = TransportStopTransformAction
.cancelTransformTasksListener(persistentTasksService, transformTasks, listener);
cancelTransformTasksListener.onResponse(responseTrue);
verify(listener, times(1)).onResponse(responseTrue);
}

public void testCancelTransformTasksListener_ThreeTasksRemovedSuccessfully() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);

// We treat NotFound as a successful removal of the task
doAnswer(randomBoolean() ? withResponse() : withException(new ResourceNotFoundException("task not found"))).when(client)
.execute(same(RemovePersistentTaskAction.INSTANCE), any(), any());

PersistentTasksService persistentTasksService = new PersistentTasksService(mock(ClusterService.class), threadPool, client);
Set<String> transformTasks = Set.of("task-A", "task-B", "task-C");
ActionListener<StopTransformAction.Response> listener = Mockito.<ActionListener<StopTransformAction.Response>>mock();

StopTransformAction.Response responseTrue = new StopTransformAction.Response(true);
ActionListener<StopTransformAction.Response> cancelTransformTasksListener = TransportStopTransformAction
.cancelTransformTasksListener(persistentTasksService, transformTasks, listener);
cancelTransformTasksListener.onResponse(responseTrue);

verify(listener).onResponse(responseTrue);
}

public void testCancelTransformTasksListener_OneTaskCouldNotBeRemoved() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);

doAnswer(randomBoolean() ? withResponse() : withException(new ResourceNotFoundException("task not found"))).when(client)
.execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-A")), any());
doAnswer(randomBoolean() ? withResponse() : withException(new ResourceNotFoundException("task not found"))).when(client)
.execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-B")), any());
doAnswer(withException(new IllegalStateException("real issue while removing task"))).when(client)
.execute(same(RemovePersistentTaskAction.INSTANCE), eq(new RemovePersistentTaskAction.Request("task-C")), any());

PersistentTasksService persistentTasksService = new PersistentTasksService(mock(ClusterService.class), threadPool, client);
Set<String> transformTasks = Set.of("task-A", "task-B", "task-C");
ActionListener<StopTransformAction.Response> listener = Mockito.<ActionListener<StopTransformAction.Response>>mock();

StopTransformAction.Response responseTrue = new StopTransformAction.Response(true);
ActionListener<StopTransformAction.Response> cancelTransformTasksListener = TransportStopTransformAction
.cancelTransformTasksListener(persistentTasksService, transformTasks, listener);
cancelTransformTasksListener.onResponse(responseTrue);

ArgumentCaptor<Exception> exceptionArgumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionArgumentCaptor.capture());
Exception actualException = exceptionArgumentCaptor.getValue();
assertThat(actualException.getMessage(), containsString("real issue while removing task"));
assertThat(actualException.getSuppressed(), is(emptyArray()));
}

private static Answer<?> withResponse() {
return invocationOnMock -> {
@SuppressWarnings("unchecked")
var l = (ActionListener<PersistentTaskResponse>) invocationOnMock.getArguments()[2];
l.onResponse(new PersistentTaskResponse((PersistentTasksCustomMetadata.PersistentTask<?>) null));
return null;
};
}

private static Answer<?> withException(Exception e) {
return invocationOnMock -> {
@SuppressWarnings("unchecked")
var l = (ActionListener<PersistentTaskResponse>) invocationOnMock.getArguments()[2];
l.onFailure(e);
return null;
};
}
}

0 comments on commit b0e0884

Please sign in to comment.