-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
high level REST api: cancel task #30745
Changes from 15 commits
c7e221f
1df3544
332f4dd
9495732
9869569
5654c21
2c709ed
3ed1829
62b161a
314e1ff
58e9539
c51eb3c
9e748ee
a511490
96e3fa2
4260549
f646fa4
31f08ef
0c05fe7
783d00a
4e34cfd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.elasticsearch.action.ingest.*; | ||
|
||
import java.io.IOException; | ||
|
||
|
@@ -63,4 +64,73 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR | |
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings, | ||
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Add a pipeline or update an existing pipeline in the cluster | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a> | ||
*/ | ||
public WritePipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline, | ||
WritePipelineResponse::fromXContent, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Asynchronously add a pipeline or update an existing pipeline in the cluster | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a> | ||
*/ | ||
public void putPipelineAsync(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener, Header... headers) { | ||
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline, | ||
WritePipelineResponse::fromXContent, listener, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Get an existing pipeline | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html"> Get Pipeline API on elastic.co</a> | ||
*/ | ||
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline, | ||
GetPipelineResponse::fromXContent, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Asynchronously get an existing pipeline | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a> | ||
*/ | ||
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) { | ||
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline, | ||
GetPipelineResponse::fromXContent, listener, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Delete an existing pipeline | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html"> | ||
* Delete Pipeline API on elastic.co</a> | ||
*/ | ||
public WritePipelineResponse deletePipeline(DeletePipelineRequest request, Header... headers) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::deletePipeline, | ||
WritePipelineResponse::fromXContent, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Asynchronously delete an existing pipeline | ||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html"> | ||
* Delete Pipeline API on elastic.co</a> | ||
*/ | ||
public void deletePipelineAsync(DeletePipelineRequest request, ActionListener<WritePipelineResponse> listener, Header... headers) { | ||
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, | ||
WritePipelineResponse::fromXContent, listener, emptySet(), headers); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove empty line? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import org.apache.http.entity.ContentType; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.action.DocWriteRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; | ||
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; | ||
|
@@ -108,6 +109,17 @@ private RequestConverters() { | |
// Contains only status utility methods | ||
} | ||
|
||
static Request cancelTasks(CancelTasksRequest cancelTasksRequest) { | ||
Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel"); | ||
Params params = new Params(request); | ||
params.withTimeout( | ||
cancelTasksRequest.getTimeout()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you adjust the indentation? |
||
.withTaskId(cancelTasksRequest.getTaskId()) | ||
.withNodes(cancelTasksRequest.getNodes()) | ||
.withActions(cancelTasksRequest.getActions()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the REST action seems to also support parent_task_id, the parameter is also declared in the REST spec so it should be here too? |
||
return request; | ||
} | ||
|
||
static Request delete(DeleteRequest deleteRequest) { | ||
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()); | ||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); | ||
|
@@ -1070,6 +1082,13 @@ Params withActions(String[] actions) { | |
return this; | ||
} | ||
|
||
Params withTaskId(TaskId taskId) { | ||
if (taskId != null && taskId.isSet()) { | ||
return putParam("task_id", taskId.toString()); | ||
} | ||
return this; | ||
} | ||
|
||
Params withParentTaskId(TaskId parentTaskId) { | ||
if (parentTaskId != null && parentTaskId.isSet()) { | ||
return putParam("parent_task_id", parentTaskId.toString()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ | |
|
||
import org.apache.http.Header; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; | ||
|
||
|
@@ -61,4 +63,37 @@ public void listAsync(ListTasksRequest request, ActionListener<ListTasksResponse | |
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent, | ||
listener, emptySet(), headers); | ||
} | ||
|
||
/** | ||
* Cancel one or more cluster tasks using the Task Management API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a punctuation mark at the end of the sentence? It will make the generated html slightly better |
||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a> | ||
* </p> | ||
*/ | ||
public CancelTasksResponse cancelTasks(CancelTasksRequest cancelTasksRequest, Header... headers) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call the methods just cancel given that they now belong to the TasksClient? |
||
return restHighLevelClient.performRequestAndParseEntity( | ||
cancelTasksRequest, | ||
RequestConverters::cancelTasks, | ||
parser -> CancelTasksResponse.fromXContent(parser), | ||
emptySet(), | ||
headers); | ||
} | ||
|
||
/** | ||
* Asynchronously cancel one or more cluster tasks using the Task Management API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, can you add the punctuation mark |
||
* <p> | ||
* See | ||
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a> | ||
* </p> | ||
*/ | ||
public void cancelTasksAsync(CancelTasksRequest cancelTasksRequest, ActionListener<CancelTasksResponse> listener, Header... headers) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that #30490 is in, could you replace the header argument with the RequestOptions one? In the method that accept a listener we add RequestOptions between the request and the listener though. |
||
restHighLevelClient.performRequestAsyncAndParseEntity( | ||
cancelTasksRequest, | ||
RequestConverters::cancelTasks, | ||
parser -> CancelTasksResponse.fromXContent(parser), | ||
listener, | ||
emptySet(), | ||
headers); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,12 +22,16 @@ | |
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.elasticsearch.action.ingest.*; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.common.xcontent.support.XContentMapValues; | ||
import org.elasticsearch.indices.recovery.RecoverySettings; | ||
import org.elasticsearch.ingest.PipelineConfiguration; | ||
import org.elasticsearch.rest.RestStatus; | ||
|
||
import java.io.IOException; | ||
|
@@ -105,4 +109,54 @@ public void testClusterUpdateSettingNonExistent() { | |
assertThat(exception.getMessage(), equalTo( | ||
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]")); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove empty line? |
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildRandomXContentPipeline(); | ||
PutPipelineRequest request = new PutPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType()); | ||
|
||
WritePipelineResponse putPipelineResponse = | ||
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync); | ||
assertTrue(putPipelineResponse.isAcknowledged()); | ||
} | ||
|
||
public void testGetPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildRandomXContentPipeline(); | ||
{ | ||
PutPipelineRequest request = new PutPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
} | ||
|
||
GetPipelineRequest request = new GetPipelineRequest(id); | ||
|
||
GetPipelineResponse response = | ||
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync); | ||
assertTrue(response.isFound()); | ||
assertEquals(response.pipelines().get(0).getId(), id); | ||
PipelineConfiguration expectedConfig = | ||
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType()); | ||
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap()); | ||
} | ||
|
||
public void testDeletePipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
{ | ||
createPipeline(id); | ||
} | ||
|
||
DeletePipelineRequest request = new DeletePipelineRequest(id); | ||
|
||
WritePipelineResponse response = | ||
execute(request, highLevelClient().cluster()::deletePipeline, highLevelClient().cluster()::deletePipelineAsync); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the changes should not be here, these tests have been moved to IngestClientIT upstream |
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,8 @@ | |
import org.apache.http.util.EntityUtils; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.DocWriteRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; | ||
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; | ||
|
@@ -1587,6 +1589,20 @@ public void testIndexPutSettings() throws IOException { | |
assertEquals(expectedParams, request.getParameters()); | ||
} | ||
|
||
public void testCancelTasks() { | ||
CancelTasksRequest request = new CancelTasksRequest(); | ||
Map<String, String> expectedParams = new HashMap<>(); | ||
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()); | ||
request.setTaskId(taskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we also test the other supported parameters? |
||
expectedParams.put("task_id", taskId.toString()); | ||
Request httpRequest = RequestConverters.cancelTasks(request); | ||
assertThat(httpRequest, notNullValue()); | ||
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME)); | ||
assertThat(httpRequest.getEntity(), nullValue()); | ||
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel")); | ||
assertThat(httpRequest.getParameters(), equalTo(expectedParams)); | ||
} | ||
|
||
public void testListTasks() { | ||
{ | ||
ListTasksRequest request = new ListTasksRequest(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,12 @@ | |
|
||
package org.elasticsearch.client; | ||
|
||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.tasks.TaskInfo; | ||
|
||
import java.io.IOException; | ||
|
@@ -58,4 +61,32 @@ public void testListTasks() throws IOException { | |
assertTrue("List tasks were not found", listTasksFound); | ||
} | ||
|
||
public void testCancelTasks() throws IOException { | ||
ListTasksRequest listRequest = new ListTasksRequest(); | ||
ListTasksResponse listResponse = execute( | ||
listRequest, | ||
highLevelClient().tasks()::list, | ||
highLevelClient().tasks()::listAsync | ||
); | ||
|
||
// TODO[PCS] submit a task that is cancellable and assert it's cancelled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this TODO still valid? do we need to address it before merging? |
||
// this case is covered in TasksIT.testTasksCancellation | ||
TaskInfo firstTask = listResponse.getTasks().get(0); | ||
String node = listResponse.getPerNodeTasks().keySet().iterator().next(); | ||
|
||
CancelTasksRequest request = new CancelTasksRequest(); | ||
request.setTaskId(new TaskId(node, firstTask.getId())); | ||
request.setReason("testreason"); | ||
CancelTasksResponse response = execute( | ||
request, | ||
highLevelClient().tasks()::cancelTasks, | ||
highLevelClient().tasks()::cancelTasksAsync | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: adjust indentation? |
||
// Since the task may or may not have been cancelled, assert that we received a response only | ||
// The actual testing of task cancellation is covered by TasksIT.testTasksCancellation | ||
assertThat(response, notNullValue()); | ||
} | ||
|
||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,27 +23,18 @@ | |
import org.elasticsearch.action.LatchedActionListener; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.elasticsearch.action.ingest.GetPipelineRequest; | ||
import org.elasticsearch.action.ingest.GetPipelineResponse; | ||
import org.elasticsearch.action.ingest.PutPipelineRequest; | ||
import org.elasticsearch.action.ingest.DeletePipelineRequest; | ||
import org.elasticsearch.action.ingest.WritePipelineResponse; | ||
import org.elasticsearch.client.ESRestHighLevelClientTestCase; | ||
import org.elasticsearch.client.RestHighLevelClient; | ||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.indices.recovery.RecoverySettings; | ||
import org.elasticsearch.ingest.PipelineConfiguration; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
|
@@ -186,4 +177,5 @@ public void onFailure(Exception e) { | |
assertTrue(latch.await(30L, TimeUnit.SECONDS)); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove empty line? |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods should be gone now, they have been moved to IngestClient upstream