diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index ce1178e760a6c..7c74e918a039f 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -734,4 +735,26 @@ private void logAudits() throws Exception { } }, 5, TimeUnit.SECONDS); } + + @SuppressWarnings("unchecked") + protected List getTransformTasks() throws IOException { + final Request tasksRequest = new Request("GET", "/_tasks"); + tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*"); + Map tasksResponse = entityAsMap(client().performRequest(tasksRequest)); + + Map nodes = (Map) tasksResponse.get("nodes"); + if (nodes == null) { + return List.of(); + } + + List foundTasks = new ArrayList<>(); + for (Map.Entry node : nodes.entrySet()) { + Map nodeInfo = (Map) node.getValue(); + Map tasks = (Map) nodeInfo.get("tasks"); + if (tasks != null) { + foundTasks.addAll(tasks.keySet()); + } + } + return foundTasks; + } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java index 105ac09e356fd..e537a6f280ac0 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java @@ -15,10 +15,8 @@ import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsString; @@ -150,28 +148,6 @@ public void testCancellingTransformTask() throws Exception { assertThat(getTransformTasks(), is(empty())); } - @SuppressWarnings("unchecked") - private List getTransformTasks() throws IOException { - final Request tasksRequest = new Request("GET", "/_tasks"); - tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*"); - Map tasksResponse = entityAsMap(client().performRequest(tasksRequest)); - - Map nodes = (Map) tasksResponse.get("nodes"); - if (nodes == null) { - return List.of(); - } - - List foundTasks = new ArrayList<>(); - for (Entry node : nodes.entrySet()) { - Map nodeInfo = (Map) node.getValue(); - Map tasks = (Map) nodeInfo.get("tasks"); - if (tasks != null) { - foundTasks.addAll(tasks.keySet()); - } - } - return foundTasks; - } - private void beEvilAndDeleteTheTransformIndex() throws IOException { final Request deleteRequest = new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME); deleteRequest.setOptions( diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index 9f74d445252d2..1abf611e833c4 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -24,10 +24,12 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.matchesRegex; +import static org.hamcrest.Matchers.nullValue; public class TransformTaskFailedStateIT extends TransformRestTestCase { @@ -61,6 +63,9 @@ public void testForceStopFailedTransform() throws Exception { String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); + + assertThat(getTransformTasks(), is(empty())); + startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getTransformStateAndStats(transformId); @@ -72,6 +77,8 @@ public void testForceStopFailedTransform() throws Exception { // Verify we have failed for the expected reason assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); + assertThat(getTransformTasks(), hasSize(1)); + // verify that we cannot stop a failed transform ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); @@ -90,6 +97,44 @@ public void testForceStopFailedTransform() throws Exception { awaitState(transformId, TransformStats.State.STOPPED); fullState = getTransformStateAndStats(transformId); assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); + + assertThat(getTransformTasks(), is(empty())); + } + + public void testForceResetFailedTransform() throws Exception { + String transformId = "test-force-reset-failed-transform"; + createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null); + String transformIndex = "failure_pivot_reviews"; + createDestinationIndexWithBadMapping(transformIndex); + createContinuousPivotReviewsTransform(transformId, transformIndex, null); + + assertThat(getTransformTasks(), is(empty())); + + startTransform(transformId); + awaitState(transformId, TransformStats.State.FAILED); + Map fullState = getTransformStateAndStats(transformId); + final String failureReason = "Failed to index documents into destination index due to permanent error: " + + "\\[org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced \\[7\\] " + + "failures and at least 1 irrecoverable " + + "\\[org.elasticsearch.xpack.transform.transforms.TransformException: Destination index mappings are " + + "incompatible with the transform configuration.;.*"; + // Verify we have failed for the expected reason + assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); + + assertThat(getTransformTasks(), hasSize(1)); + + // verify that we cannot reset a failed transform + ResponseException ex = expectThrows(ResponseException.class, () -> resetTransform(transformId, false)); + assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); + assertThat( + (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + is(equalTo("Cannot reset transform [test-force-reset-failed-transform] as the task is running. Stop the task first")) + ); + + // Verify that we can force reset a failed transform + resetTransform(transformId, true); + + assertThat(getTransformTasks(), is(empty())); } public void testStartFailedTransform() throws Exception { @@ -98,6 +143,9 @@ public void testStartFailedTransform() throws Exception { String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); + + assertThat(getTransformTasks(), is(empty())); + startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getTransformStateAndStats(transformId); @@ -109,6 +157,8 @@ public void testStartFailedTransform() throws Exception { // Verify we have failed for the expected reason assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); + assertThat(getTransformTasks(), hasSize(1)); + final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] " + "as it is in a failed state with failure: \\[" + failureReason @@ -124,6 +174,8 @@ public void testStartFailedTransform() throws Exception { }, 60, TimeUnit.SECONDS); stopTransform(transformId, true); + + assertThat(getTransformTasks(), is(empty())); } private void awaitState(String transformId, TransformStats.State state) throws Exception { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java index ee394c7a128b4..87f24ae7c2bc8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java @@ -154,7 +154,14 @@ protected void masterOperation(Task task, Request request, ClusterState state, A stopTransformActionListener.onResponse(null); return; } - StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(request.getId(), true, false, null, true, false); + StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request( + request.getId(), + true, + request.isForce(), + null, + true, + false + ); executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener); }