Skip to content

Commit

Permalink
[ML][Transforms] remove force flag from _start (#46414)
Browse files Browse the repository at this point in the history
* [ML][Transforms] remove `force` flag from _start

* fixing expected error message
  • Loading branch information
benwtrent authored Sep 16, 2019
1 parent 3e25db2 commit 479ebd1
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.Transform;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
Expand Down Expand Up @@ -389,7 +388,6 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
// Data Frame
PutTransformAction.INSTANCE,
StartTransformAction.INSTANCE,
StartTransformTaskAction.INSTANCE,
StopTransformAction.INSTANCE,
DeleteTransformAction.INSTANCE,
GetTransformsAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TransformMessages {
" Use force stop to stop the data frame transform.";
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
"Use force start to restart data frame transform once error is resolved.";
"Use force stop and then restart the data frame transform once error is resolved.";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand Down Expand Up @@ -33,32 +34,30 @@ private StartTransformAction() {
public static class Request extends AcknowledgedRequest<Request> {

private final String id;
private final boolean force;

public Request(String id, boolean force) {
public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.force = force;
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
force = in.readBoolean();
if(in.getVersion().before(Version.V_8_0_0)) {
in.readBoolean();
}
}

public String getId() {
return id;
}

public boolean isForce() {
return force;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(force);
if(out.getVersion().before(Version.V_8_0_0)) {
out.writeBoolean(false);
}
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class StartTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
return new Request(randomAlphaOfLengthBetween(1, 20));
}

@Override
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ teardown:
- match: { acknowledged: true }

- do:
catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
catch: /Cannot start transform \[airline-transform-start-stop\] as it is already started/
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,13 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}

protected void startDataframeTransform(String transformId, boolean force) throws IOException {
startDataframeTransform(transformId, force, null);
protected void startDataframeTransform(String transformId) throws IOException {
startDataframeTransform(transformId, null);
}

protected void startDataframeTransform(String transformId, boolean force, String authHeader, String... warnings) throws IOException {
protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
startTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
if (warnings.length > 0) {
startTransformRequest.setOptions(expectWarnings(warnings));
}
Expand Down Expand Up @@ -259,7 +258,7 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
String authHeader, String... warnings) throws Exception {
// start the transform
startDataframeTransform(transformId, false, authHeader, warnings);
startDataframeTransform(transformId, authHeader, warnings);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
Expand All @@ -279,7 +278,7 @@ protected void startAndWaitForContinuousTransform(String transformId,
String authHeader,
long checkpoint) throws Exception {
// start the transform
startDataframeTransform(transformId, false, authHeader, new String[0]);
startDataframeTransform(transformId, authHeader, new String[0]);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId, checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.oneOf;

public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {

Expand Down Expand Up @@ -65,7 +63,7 @@ public void testForceStopFailedTransform() throws Exception {
createDestinationIndexWithBadMapping(dataFrameIndex);
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId, false);
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
Expand All @@ -89,14 +87,14 @@ public void testForceStopFailedTransform() throws Exception {
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}

public void testForceStartFailedTransform() throws Exception {
public void testStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
String dataFrameIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(dataFrameIndex);
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId, false);
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
Expand All @@ -106,26 +104,15 @@ public void testForceStartFailedTransform() throws Exception {

final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force start to restart data frame transform once error is resolved.";
"]. Use force stop and then restart the data frame transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo(expectedFailure));
}, 60, TimeUnit.SECONDS);

// Correct the failure by deleting the destination index
deleteIndex(dataFrameIndex);
// Force start the data frame to indicate failure correction
startDataframeTransform(transformId, true);

// Verify that we have started and that our reason is cleared
fullState = getDataFrameState(transformId);
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing"));
assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1));

stopDataFrameTransform(transformId, true);
}

Expand Down
Loading

0 comments on commit 479ebd1

Please sign in to comment.