Skip to content

Commit

Permalink
high level REST api: cancel task (#30745)
Browse files Browse the repository at this point in the history
* Initial commit of rest high level exposure of cancel task

* fix javadocs

* address some code review comments

* update branch to use tasks namespace instead of cluster

* High-level client: list tasks failure to not lose nodeId

This commit reworks testing for `ListTasksResponse` so that random
fields insertion can be tested and xcontent equivalence can be checked
too. Proper exclusions need to be configured, and failures need to be
tested separately. This helped finding a little problem, whenever there
is a node failure returned, the nodeId was lost as it was never printed
out as part of the exception toXContent.

* added comment

* merge from master

* re-work CancelTasksResponseTests to separate XContent failure cases from non-failure cases

* remove duplication of logic in parser creation

* code review changes

* refactor TasksClient to support RequestOptions

* add tests for parent task id

* address final PR review comments, mostly formatting and such
  • Loading branch information
Paul Sanwald committed Jun 8, 2018
1 parent f4ea2e9 commit 540b117
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
Expand Down Expand Up @@ -108,6 +109,17 @@ private RequestConverters() {
// Contains only status utility methods
}

static Request cancelTasks(CancelTasksRequest cancelTasksRequest) {
Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel");
Params params = new Params(request);
params.withTimeout(cancelTasksRequest.getTimeout())
.withTaskId(cancelTasksRequest.getTaskId())
.withNodes(cancelTasksRequest.getNodes())
.withParentTaskId(cancelTasksRequest.getParentTaskId())
.withActions(cancelTasksRequest.getActions());
return request;
}

static Request delete(DeleteRequest deleteRequest) {
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
Expand Down Expand Up @@ -1092,6 +1104,13 @@ Params withActions(String[] actions) {
return this;
}

Params withTaskId(TaskId taskId) {
if (taskId != null && taskId.isSet()) {
return putParam("task_id", taskId.toString());
}
return this;
}

Params withParentTaskId(TaskId parentTaskId) {
if (parentTaskId != null && parentTaskId.isSet()) {
return putParam("parent_task_id", parentTaskId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;

Expand Down Expand Up @@ -65,4 +67,45 @@ public void listAsync(ListTasksRequest request, RequestOptions options, ActionLi
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, options,
ListTasksResponse::fromXContent, listener, emptySet());
}

/**
* Cancel one or more cluster tasks using the Task Management API.
*
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* @param cancelTasksRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*
*/
public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, RequestOptions options ) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
cancelTasksRequest,
RequestConverters::cancelTasks,
options,
parser -> CancelTasksResponse.fromXContent(parser),
emptySet()
);
}

/**
* Asynchronously cancel one or more cluster tasks using the Task Management API.
*
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* @param cancelTasksRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions options, ActionListener<CancelTasksResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(
cancelTasksRequest,
RequestConverters::cancelTasks,
options,
parser -> CancelTasksResponse.fromXContent(parser),
listener,
emptySet()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
Expand Down Expand Up @@ -1620,6 +1622,23 @@ public void testIndexPutSettings() throws IOException {
assertEquals(expectedParams, request.getParameters());
}

public void testCancelTasks() {
CancelTasksRequest request = new CancelTasksRequest();
Map<String, String> expectedParams = new HashMap<>();
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
TaskId parentTaskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
request.setTaskId(taskId);
request.setParentTaskId(parentTaskId);
expectedParams.put("task_id", taskId.toString());
expectedParams.put("parent_task_id", parentTaskId.toString());
Request httpRequest = RequestConverters.cancelTasks(request);
assertThat(httpRequest, notNullValue());
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(httpRequest.getEntity(), nullValue());
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel"));
assertThat(httpRequest.getParameters(), equalTo(expectedParams));
}

public void testListTasks() {
{
ListTasksRequest request = new ListTasksRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.client;

import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
Expand Down Expand Up @@ -58,4 +61,26 @@ public void testListTasks() throws IOException {
assertTrue("List tasks were not found", listTasksFound);
}

public void testCancelTasks() throws IOException {
ListTasksRequest listRequest = new ListTasksRequest();
ListTasksResponse listResponse = execute(
listRequest,
highLevelClient().tasks()::list,
highLevelClient().tasks()::listAsync
);
// in this case, probably no task will actually be cancelled.
// this is ok, that case is covered in TasksIT.testTasksCancellation
TaskInfo firstTask = listResponse.getTasks().get(0);
String node = listResponse.getPerNodeTasks().keySet().iterator().next();

CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(new TaskId(node, firstTask.getId()));
cancelTasksRequest.setReason("testreason");
CancelTasksResponse response = execute(cancelTasksRequest,
highLevelClient().tasks()::cancel,
highLevelClient().tasks()::cancelAsync);
// Since the task may or may not have been cancelled, assert that we received a response only
// The actual testing of task cancellation is covered by TasksIT.testTasksCancellation
assertThat(response, notNullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,5 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
Expand Down Expand Up @@ -146,4 +148,74 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testCancelTasks() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::cancel-tasks-request
CancelTasksRequest request = new CancelTasksRequest();
// end::cancel-tasks-request

// tag::cancel-tasks-request-filter
request.setTaskId(new TaskId("nodeId1", 42)); //<1>
request.setActions("cluster:*"); // <2>
request.setNodes("nodeId1", "nodeId2"); // <3>
// end::cancel-tasks-request-filter

}

CancelTasksRequest request = new CancelTasksRequest();
request.setTaskId(TaskId.EMPTY_TASK_ID);

// tag::cancel-tasks-execute
CancelTasksResponse response = client.tasks().cancel(request, RequestOptions.DEFAULT);
// end::cancel-tasks-execute

assertThat(response, notNullValue());

// tag::cancel-tasks-response-tasks
List<TaskInfo> tasks = response.getTasks(); // <1>
// end::cancel-tasks-response-tasks


// tag::cancel-tasks-response-failures
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
// end::-tasks-response-failures

assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
}

public void testAsyncCancelTasks() throws InterruptedException {

RestHighLevelClient client = highLevelClient();
{
CancelTasksRequest request = new CancelTasksRequest();

// tag::cancel-tasks-execute-listener
ActionListener<CancelTasksResponse> listener =
new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::cancel-tasks-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::cancel-tasks-execute-async
client.tasks().cancelAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::cancel-tasks-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/supported-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,7 @@ include::snapshot/verify_repository.asciidoc[]
The Java High Level REST Client supports the following Tasks APIs:

* <<java-rest-high-tasks-list>>
* <<java-rest-high-cluster-cancel-tasks>>

include::tasks/list_tasks.asciidoc[]
include::tasks/cancel_tasks.asciidoc[]
82 changes: 82 additions & 0 deletions docs/java-rest/high-level/tasks/cancel_tasks.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[[java-rest-high-cluster-cancel-tasks]]
=== Cancel Tasks API

The Cancel Tasks API allows cancellation of a currently running task.

==== Cancel Tasks Request

A `CancelTasksRequest`:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request]
--------------------------------------------------
There are no required parameters. The task cancellation command supports the same
task selection parameters as the list tasks command.

==== Parameters

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-filter]
--------------------------------------------------
<1> Cancel a task
<2> Cancel only cluster-related tasks
<3> Cancel all tasks running on nodes nodeId1 and nodeId2

==== Synchronous Execution

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute]
--------------------------------------------------

==== Asynchronous Execution

The asynchronous execution requires `CancelTasksRequest` instance and an
`ActionListener` instance to be passed to the asynchronous method:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-async]
--------------------------------------------------
<1> The `CancelTasksRequest` to execute and the `ActionListener` to use
when the execution completes

The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.

A typical listener for `CancelTasksResponse` looks like:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of a failure. The raised exception is provided as an argument

==== Cancel Tasks Response

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-tasks]
--------------------------------------------------
<1> List of cancelled tasks

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-calc]
--------------------------------------------------
<1> List of cancelled tasks grouped by a node
<2> List of cancelled tasks grouped by a parent task

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-failures]
--------------------------------------------------
<1> List of node failures
<2> List of task cancellation failures

Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,48 @@

package org.elasticsearch.action.admin.cluster.node.tasks.cancel;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Returns the list of tasks that were cancelled
*/
public class CancelTasksResponse extends ListTasksResponse {

private static final ConstructingObjectParser<CancelTasksResponse, Void> PARSER =
setupParser("cancel_tasks_response", CancelTasksResponse::new);

public CancelTasksResponse() {
}

public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException>
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends ElasticsearchException>
nodeFailures) {
super(tasks, taskFailures, nodeFailures);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return super.toXContent(builder, params);
}

public static CancelTasksResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
Loading

0 comments on commit 540b117

Please sign in to comment.