From 540b117b9e7d2f10f0eba740bc3618c757664e89 Mon Sep 17 00:00:00 2001 From: Paul Sanwald Date: Thu, 7 Jun 2018 14:02:23 -0700 Subject: [PATCH] high level REST api: cancel task (#30745) * Initial commit of rest high level exposure of cancel task * fix javadocs * address some code review comments * update branch to use tasks namespace instead of cluster * High-level client: list tasks failure to not lose nodeId This commit reworks testing for `ListTasksResponse` so that random fields insertion can be tested and xcontent equivalence can be checked too. Proper exclusions need to be configured, and failures need to be tested separately. This helped finding a little problem, whenever there is a node failure returned, the nodeId was lost as it was never printed out as part of the exception toXContent. * added comment * merge from master * re-work CancelTasksResponseTests to separate XContent failure cases from non-failure cases * remove duplication of logic in parser creation * code review changes * refactor TasksClient to support RequestOptions * add tests for parent task id * address final PR review comments, mostly formatting and such --- .../client/RequestConverters.java | 19 +++ .../org/elasticsearch/client/TasksClient.java | 43 +++++++ .../client/RequestConvertersTests.java | 19 +++ .../org/elasticsearch/client/TasksIT.java | 25 ++++ .../ClusterClientDocumentationIT.java | 1 + .../TasksClientDocumentationIT.java | 72 +++++++++++ .../high-level/supported-apis.asciidoc | 2 + .../high-level/tasks/cancel_tasks.asciidoc | 82 +++++++++++++ .../tasks/cancel/CancelTasksResponse.java | 29 ++++- .../node/tasks/list/ListTasksResponse.java | 27 ++-- .../tasks/CancelTasksResponseTests.java | 116 ++++++++++++++++++ .../tasks/ListTasksResponseTests.java | 26 ++-- 12 files changed, 442 insertions(+), 19 deletions(-) create mode 100644 docs/java-rest/high-level/tasks/cancel_tasks.asciidoc create mode 100644 server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 308b8917842d1..53992a051080b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -29,6 +29,7 @@ import org.apache.http.entity.ContentType; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; @@ -108,6 +109,17 @@ private RequestConverters() { // Contains only status utility methods } + static Request cancelTasks(CancelTasksRequest cancelTasksRequest) { + Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel"); + Params params = new Params(request); + params.withTimeout(cancelTasksRequest.getTimeout()) + .withTaskId(cancelTasksRequest.getTaskId()) + .withNodes(cancelTasksRequest.getNodes()) + .withParentTaskId(cancelTasksRequest.getParentTaskId()) + .withActions(cancelTasksRequest.getActions()); + return request; + } + static Request delete(DeleteRequest deleteRequest) { String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); Request request = new Request(HttpDelete.METHOD_NAME, endpoint); @@ -1092,6 +1104,13 @@ Params withActions(String[] actions) { return this; } + Params withTaskId(TaskId taskId) { + if (taskId != null && taskId.isSet()) { + return putParam("task_id", taskId.toString()); + } + return this; + } + Params withParentTaskId(TaskId parentTaskId) { if (parentTaskId != null && parentTaskId.isSet()) { return putParam("parent_task_id", parentTaskId.toString()); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java index f4a76e78b946b..f8f03d7f7d288 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java @@ -20,6 +20,8 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -65,4 +67,45 @@ public void listAsync(ListTasksRequest request, RequestOptions options, ActionLi restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, options, ListTasksResponse::fromXContent, listener, emptySet()); } + + /** + * Cancel one or more cluster tasks using the Task Management API. + * + * See + * Task Management API on elastic.co + * @param cancelTasksRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + * + */ + public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, RequestOptions options ) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + cancelTasksRequest, + RequestConverters::cancelTasks, + options, + parser -> CancelTasksResponse.fromXContent(parser), + emptySet() + ); + } + + /** + * Asynchronously cancel one or more cluster tasks using the Task Management API. + * + * See + * Task Management API on elastic.co + * @param cancelTasksRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions options, ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity( + cancelTasksRequest, + RequestConverters::cancelTasks, + options, + parser -> CancelTasksResponse.fromXContent(parser), + listener, + emptySet() + ); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 2645846341f4b..c09da06995599 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -29,6 +29,8 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; @@ -1620,6 +1622,23 @@ public void testIndexPutSettings() throws IOException { assertEquals(expectedParams, request.getParameters()); } + public void testCancelTasks() { + CancelTasksRequest request = new CancelTasksRequest(); + Map expectedParams = new HashMap<>(); + TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + TaskId parentTaskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); + request.setTaskId(taskId); + request.setParentTaskId(parentTaskId); + expectedParams.put("task_id", taskId.toString()); + expectedParams.put("parent_task_id", parentTaskId.toString()); + Request httpRequest = RequestConverters.cancelTasks(request); + assertThat(httpRequest, notNullValue()); + assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME)); + assertThat(httpRequest.getEntity(), nullValue()); + assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel")); + assertThat(httpRequest.getParameters(), equalTo(expectedParams)); + } + public void testListTasks() { { ListTasksRequest request = new ListTasksRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java index fc7d70a36e10e..baa97cfa5b4ef 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java @@ -19,9 +19,12 @@ package org.elasticsearch.client; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; @@ -58,4 +61,26 @@ public void testListTasks() throws IOException { assertTrue("List tasks were not found", listTasksFound); } + public void testCancelTasks() throws IOException { + ListTasksRequest listRequest = new ListTasksRequest(); + ListTasksResponse listResponse = execute( + listRequest, + highLevelClient().tasks()::list, + highLevelClient().tasks()::listAsync + ); + // in this case, probably no task will actually be cancelled. + // this is ok, that case is covered in TasksIT.testTasksCancellation + TaskInfo firstTask = listResponse.getTasks().get(0); + String node = listResponse.getPerNodeTasks().keySet().iterator().next(); + + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(new TaskId(node, firstTask.getId())); + cancelTasksRequest.setReason("testreason"); + CancelTasksResponse response = execute(cancelTasksRequest, + highLevelClient().tasks()::cancel, + highLevelClient().tasks()::cancelAsync); + // Since the task may or may not have been cancelled, assert that we received a response only + // The actual testing of task cancellation is covered by TasksIT.testTasksCancellation + assertThat(response, notNullValue()); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java index e8dd4025ba94e..75902cf02babb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java @@ -178,4 +178,5 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } } + } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TasksClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TasksClientDocumentationIT.java index 0d62a2d29a03b..8a45195757c13 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TasksClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TasksClientDocumentationIT.java @@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; @@ -146,4 +148,74 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } } + + public void testCancelTasks() throws IOException { + RestHighLevelClient client = highLevelClient(); + { + // tag::cancel-tasks-request + CancelTasksRequest request = new CancelTasksRequest(); + // end::cancel-tasks-request + + // tag::cancel-tasks-request-filter + request.setTaskId(new TaskId("nodeId1", 42)); //<1> + request.setActions("cluster:*"); // <2> + request.setNodes("nodeId1", "nodeId2"); // <3> + // end::cancel-tasks-request-filter + + } + + CancelTasksRequest request = new CancelTasksRequest(); + request.setTaskId(TaskId.EMPTY_TASK_ID); + + // tag::cancel-tasks-execute + CancelTasksResponse response = client.tasks().cancel(request, RequestOptions.DEFAULT); + // end::cancel-tasks-execute + + assertThat(response, notNullValue()); + + // tag::cancel-tasks-response-tasks + List tasks = response.getTasks(); // <1> + // end::cancel-tasks-response-tasks + + + // tag::cancel-tasks-response-failures + List nodeFailures = response.getNodeFailures(); // <1> + List taskFailures = response.getTaskFailures(); // <2> + // end::-tasks-response-failures + + assertThat(response.getNodeFailures(), equalTo(emptyList())); + assertThat(response.getTaskFailures(), equalTo(emptyList())); + } + + public void testAsyncCancelTasks() throws InterruptedException { + + RestHighLevelClient client = highLevelClient(); + { + CancelTasksRequest request = new CancelTasksRequest(); + + // tag::cancel-tasks-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(CancelTasksResponse response) { + // <1> + } + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::cancel-tasks-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::cancel-tasks-execute-async + client.tasks().cancelAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::cancel-tasks-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } } diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 8694a624654a5..064cd401721ac 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -140,5 +140,7 @@ include::snapshot/verify_repository.asciidoc[] The Java High Level REST Client supports the following Tasks APIs: * <> +* <> include::tasks/list_tasks.asciidoc[] +include::tasks/cancel_tasks.asciidoc[] diff --git a/docs/java-rest/high-level/tasks/cancel_tasks.asciidoc b/docs/java-rest/high-level/tasks/cancel_tasks.asciidoc new file mode 100644 index 0000000000000..089f87c00a2ef --- /dev/null +++ b/docs/java-rest/high-level/tasks/cancel_tasks.asciidoc @@ -0,0 +1,82 @@ +[[java-rest-high-cluster-cancel-tasks]] +=== Cancel Tasks API + +The Cancel Tasks API allows cancellation of a currently running task. + +==== Cancel Tasks Request + +A `CancelTasksRequest`: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request] +-------------------------------------------------- +There are no required parameters. The task cancellation command supports the same +task selection parameters as the list tasks command. + +==== Parameters + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-filter] +-------------------------------------------------- +<1> Cancel a task +<2> Cancel only cluster-related tasks +<3> Cancel all tasks running on nodes nodeId1 and nodeId2 + +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute] +-------------------------------------------------- + +==== Asynchronous Execution + +The asynchronous execution requires `CancelTasksRequest` instance and an +`ActionListener` instance to be passed to the asynchronous method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-async] +-------------------------------------------------- +<1> The `CancelTasksRequest` to execute and the `ActionListener` to use +when the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `CancelTasksResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of a failure. The raised exception is provided as an argument + +==== Cancel Tasks Response + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-tasks] +-------------------------------------------------- +<1> List of cancelled tasks + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-calc] +-------------------------------------------------- +<1> List of cancelled tasks grouped by a node +<2> List of cancelled tasks grouped by a parent task + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-failures] +-------------------------------------------------- +<1> List of node failures +<2> List of task cancellation failures + diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java index 5e7c2c0f97d56..fbc81d2995511 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java @@ -19,23 +19,48 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; -import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.TaskInfo; +import java.io.IOException; import java.util.List; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + /** * Returns the list of tasks that were cancelled */ public class CancelTasksResponse extends ListTasksResponse { + private static final ConstructingObjectParser PARSER = + setupParser("cancel_tasks_response", CancelTasksResponse::new); + public CancelTasksResponse() { } - public CancelTasksResponse(List tasks, List taskFailures, List + public CancelTasksResponse(List tasks, List taskFailures, List nodeFailures) { super(tasks, taskFailures, nodeFailures); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return super.toXContent(builder, params); + } + + public static CancelTasksResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index 53d80853328b2..cb1fcb0b091ee 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; @@ -70,8 +71,14 @@ public ListTasksResponse(List tasks, List taskFa this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); } - private static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>("list_tasks_response", true, + + protected static ConstructingObjectParser setupParser(String name, + TriFunction< + List, + List, + List, + T> ctor) { + ConstructingObjectParser parser = new ConstructingObjectParser<>(name, true, constructingObjects -> { int i = 0; @SuppressWarnings("unchecked") @@ -80,16 +87,18 @@ public ListTasksResponse(List tasks, List taskFa List tasksFailures = (List) constructingObjects[i++]; @SuppressWarnings("unchecked") List nodeFailures = (List) constructingObjects[i]; - return new ListTasksResponse(tasks, tasksFailures, nodeFailures); + return ctor.apply(tasks,tasksFailures, nodeFailures); }); - - static { - PARSER.declareObjectArray(constructorArg(), TaskInfo.PARSER, new ParseField(TASKS)); - PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), new ParseField(TASK_FAILURES)); - PARSER.declareObjectArray(optionalConstructorArg(), - (parser, c) -> ElasticsearchException.fromXContent(parser), new ParseField(NODE_FAILURES)); + parser.declareObjectArray(optionalConstructorArg(), TaskInfo.PARSER, new ParseField(TASKS)); + parser.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), new ParseField(TASK_FAILURES)); + parser.declareObjectArray(optionalConstructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(NODE_FAILURES)); + return parser; } + private static final ConstructingObjectParser PARSER = + setupParser("list_tasks_response", ListTasksResponse::new); + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java new file mode 100644 index 0000000000000..3233edefb30d4 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/tasks/CancelTasksResponseTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tasks; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.equalTo; + +public class CancelTasksResponseTests extends AbstractXContentTestCase { + + @Override + protected CancelTasksResponse createTestInstance() { + List randomTasks = randomTasks(); + return new CancelTasksResponse(randomTasks, Collections.emptyList(), Collections.emptyList()); + } + + private static List randomTasks() { + List randomTasks = new ArrayList<>(); + for (int i = 0; i < randomInt(10); i++) { + randomTasks.add(TaskInfoTests.randomTaskInfo()); + } + return randomTasks; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + //status and headers hold arbitrary content, we can't inject random fields in them + return field -> field.endsWith("status") || field.endsWith("headers"); + } + + @Override + protected void assertEqualInstances(CancelTasksResponse expectedInstance, CancelTasksResponse newInstance) { + assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.getTasks(), equalTo(expectedInstance.getTasks())); + ListTasksResponseTests.assertOnNodeFailures(newInstance.getNodeFailures(), expectedInstance.getNodeFailures()); + ListTasksResponseTests.assertOnTaskFailures(newInstance.getTaskFailures(), expectedInstance.getTaskFailures()); + } + + @Override + protected CancelTasksResponse doParseInstance(XContentParser parser) { + return CancelTasksResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected boolean assertToXContentEquivalence() { + return true; + } + + /** + * Test parsing {@link ListTasksResponse} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = CancelTasksResponseTests::createTestInstanceWithFailures; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence); + } + + private static CancelTasksResponse createTestInstanceWithFailures() { + int numNodeFailures = randomIntBetween(0, 3); + List nodeFailures = new ArrayList<>(numNodeFailures); + for (int i = 0; i < numNodeFailures; i++) { + nodeFailures.add(new FailedNodeException(randomAlphaOfLength(5), "error message", new ConnectException())); + } + int numTaskFailures = randomIntBetween(0, 3); + List taskFailures = new ArrayList<>(numTaskFailures); + for (int i = 0; i < numTaskFailures; i++) { + taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(5), randomLong(), new IllegalStateException())); + } + return new CancelTasksResponse(randomTasks(), taskFailures, nodeFailures); + } + +} diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index b280446db1c74..4862278fac111 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -109,20 +109,30 @@ protected Predicate getRandomFieldsExcludeFilter() { protected void assertEqualInstances(ListTasksResponse expectedInstance, ListTasksResponse newInstance) { assertNotSame(expectedInstance, newInstance); assertThat(newInstance.getTasks(), equalTo(expectedInstance.getTasks())); - assertThat(newInstance.getNodeFailures().size(), equalTo(expectedInstance.getNodeFailures().size())); - for (int i = 0; i < newInstance.getNodeFailures().size(); i++) { - ElasticsearchException newException = newInstance.getNodeFailures().get(i); - ElasticsearchException expectedException = expectedInstance.getNodeFailures().get(i); + assertOnNodeFailures(newInstance.getNodeFailures(), expectedInstance.getNodeFailures()); + assertOnTaskFailures(newInstance.getTaskFailures(), expectedInstance.getTaskFailures()); + } + + protected static void assertOnNodeFailures(List nodeFailures, + List expectedFailures) { + assertThat(nodeFailures.size(), equalTo(expectedFailures.size())); + for (int i = 0; i < nodeFailures.size(); i++) { + ElasticsearchException newException = nodeFailures.get(i); + ElasticsearchException expectedException = expectedFailures.get(i); assertThat(newException.getMetadata("es.node_id").get(0), equalTo(((FailedNodeException)expectedException).nodeId())); assertThat(newException.getMessage(), equalTo("Elasticsearch exception [type=failed_node_exception, reason=error message]")); assertThat(newException.getCause(), instanceOf(ElasticsearchException.class)); ElasticsearchException cause = (ElasticsearchException) newException.getCause(); assertThat(cause.getMessage(), equalTo("Elasticsearch exception [type=connect_exception, reason=null]")); } - assertThat(newInstance.getTaskFailures().size(), equalTo(expectedInstance.getTaskFailures().size())); - for (int i = 0; i < newInstance.getTaskFailures().size(); i++) { - TaskOperationFailure newFailure = newInstance.getTaskFailures().get(i); - TaskOperationFailure expectedFailure = expectedInstance.getTaskFailures().get(i); + } + + protected static void assertOnTaskFailures(List taskFailures, + List expectedFailures) { + assertThat(taskFailures.size(), equalTo(expectedFailures.size())); + for (int i = 0; i < taskFailures.size(); i++) { + TaskOperationFailure newFailure = taskFailures.get(i); + TaskOperationFailure expectedFailure = expectedFailures.get(i); assertThat(newFailure.getNodeId(), equalTo(expectedFailure.getNodeId())); assertThat(newFailure.getTaskId(), equalTo(expectedFailure.getTaskId())); assertThat(newFailure.getStatus(), equalTo(expectedFailure.getStatus()));