Skip to content

Commit

Permalink
Added get pipeline API
Browse files Browse the repository at this point in the history
Relates to #27205

(cherry picked from commit 696b0ab2d09d4d25806b3fbfd4f82b6fc9f7218b)
  • Loading branch information
sohaibiftikhar committed May 29, 2018
1 parent eaee530 commit da82960
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;

Expand Down Expand Up @@ -87,4 +89,26 @@ public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipel
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -610,6 +611,18 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return request;
}

static Request getPipeline(GetPipelineRequest getPipelineRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

Params parameters = new Params(request);
parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
return request;
}

static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -32,7 +34,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -113,31 +115,7 @@ public void testClusterUpdateSettingNonExistent() {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
Expand All @@ -147,4 +125,27 @@ public void testPutPipeline() throws IOException {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "get_pipeline_id";
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
{
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

GetPipelineRequest request = new GetPipelineRequest(id);

GetPipelineResponse response =
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
assertTrue(response.isFound());
assertEquals(response.getPipelineConfigs().get(0).getId(), id);
PipelineConfiguration expectedConfig =
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.getPipelineConfigs().get(0).getConfigAsMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -80,4 +85,42 @@ private HighLevelClient(RestClient restClient) {
super(restClient, (client) -> {}, Collections.emptyList());
}
}

protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
return pipelineBuilder;
}

protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
}

protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
Expand Down Expand Up @@ -1425,6 +1426,20 @@ public void testPutPipeline() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testGetPipeline() {
String pipelineId = "some_pipeline_id";
Map<String, String> expectedParams = new HashMap<>();
GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
setRandomMasterTimeout(request, expectedParams);
Request expectedRequest = RequestConverters.getPipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.PipelineConfiguration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -257,4 +260,74 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testGetPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
// tag::get-pipeline-request
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
// end::end-pipeline-request

// tag::get-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::get-pipeline-request-masterTimeout

// tag::get-pipeline-execute
GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
// end::get-pipeline-execute

// tag::get-pipeline-response
boolean successful = response.isFound(); // <1>
List<PipelineConfiguration> pipelines = response.getPipelineConfigs(); // <2>
for(PipelineConfiguration pipeline: pipelines) {
Map<String, Object> config = pipeline.getConfigAsMap(); // <3>
}
// end::get-pipeline-response

assertTrue(successful);
}
}

public void testGetPipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");

// tag::get-pipeline-execute-listener
ActionListener<GetPipelineResponse> listener =
new ActionListener<GetPipelineResponse>() {
@Override
public void onResponse(GetPipelineResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-pipeline-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::get-pipeline-execute-async
client.cluster().getPipelineAsync(request, listener); // <1>
// end::get-pipeline-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
75 changes: 75 additions & 0 deletions docs/java-rest/high-level/cluster/get_pipeline.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[[java-rest-high-cluster-get-pipeline]]
=== Get Pipeline API

[[java-rest-high-cluster-get-pipeline-request]]
==== Get Pipeline Request

A `GetPipelineRequest` requires one or more `pipelineIds` to fetch.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request]
--------------------------------------------------
<1> The pipeline id to fetch

==== Optional arguments
The following arguments can optionally be provided:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-request-masterTimeout]
--------------------------------------------------
<1> Timeout to connect to the master node as a `TimeValue`
<2> Timeout to connect to the master node as a `String`

[[java-rest-high-cluster-get-pipeline-sync]]
==== Synchronous Execution

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a GetPipelineResponse object.

[[java-rest-high-cluster-get-pipeline-async]]
==== Asynchronous Execution

The asynchronous execution of a get pipeline request requires both the `GetPipelineRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-async]
--------------------------------------------------
<1> The `GetPipelineRequest` to execute and the `ActionListener` to use when
the execution completes

The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.

A typical listener for `GetPipelineResponse` looks like:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument

[[java-rest-high-cluster-get-pipeline-response]]
==== Get Pipeline Response

The returned `GetPipelineResponse` allows to retrieve information about the executed
operation as follows:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[get-pipeline-response]
--------------------------------------------------
<1> Check if a matching pipeline id was found or not.
<2> Get the list of pipelines found as a list of `PipelineConfig` objects.
<3> Get the individual configuration of each pipeline as a `Map<String, Object>`.
Loading

0 comments on commit da82960

Please sign in to comment.