Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add synced flush API #29189

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -265,6 +267,27 @@ public void flushAsync(FlushRequest flushRequest, ActionListener<FlushResponse>
listener, emptySet(), headers);
}

/** Initiate a synced flush manually using the synced flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
* Synced flush API on elastic.co</a>
*/
public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush,
SyncedFlushResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously initiate a synced flush manually using the synced flush API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html">
* Synced flush API on elastic.co</a>
*/
public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener<SyncedFlushResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush,
SyncedFlushResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Force merge one or more indices using the Force Merge API
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -206,6 +207,14 @@ static Request flush(FlushRequest flushRequest) {
return request;
}

static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) {
String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices();
Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced"));
Params parameters = new Params(request);
parameters.withIndicesOptions(syncedFlushRequest.indicesOptions());
return request;
}

static Request forceMerge(ForceMergeRequest forceMergeRequest) {
String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices();
Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -448,6 +450,32 @@ public void testFlush() throws IOException {
}
}

public void testSyncedFlush() throws IOException {
{
String index = "index";
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index);
SyncedFlushResponse flushResponse =
execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync);
assertThat(flushResponse.totalShards(), equalTo(1));
assertThat(flushResponse.successfulShards(), equalTo(1));
assertThat(flushResponse.failedShards(), equalTo(0));
//assertThat(flushResponse.shardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testClearCache() throws IOException {
{
String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -587,6 +588,29 @@ public void testFlush() {
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}

public void testSyncedFlush() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
SyncedFlushRequest syncedFlushRequest;
if (randomBoolean()) {
syncedFlushRequest = new SyncedFlushRequest(indices);
} else {
syncedFlushRequest = new SyncedFlushRequest();
syncedFlushRequest.indices(indices);
}
Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams);
Request request = RequestConverters.syncedFlush(syncedFlushRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
if (indices != null && indices.length > 0) {
endpoint.add(String.join(",", indices));
}
endpoint.add("_flush/synced");
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getEntity(), nullValue());
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}

public void testForceMerge() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
ForceMergeRequest forceMergeRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand All @@ -58,6 +60,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -66,11 +69,13 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -775,6 +780,93 @@ public void onFailure(Exception e) {
}
}

public void testSyncedFlushIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

{
createIndex("index1", Settings.EMPTY);
}

{
// tag::flush-synced-request
SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1>
SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2>
SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3>
// end::flush-synced-request

// tag::flush-synced-request-indicesOptions
request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
// end::flush-synced-request-indicesOptions

// tag::flush-synced-execute
SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request);
// end::flush-synced-execute

// tag::flush-synced-response
int totalShards = flushSyncedResponse.totalShards(); // <1>
int successfulShards = flushSyncedResponse.successfulShards(); // <2>
int failedShards = flushSyncedResponse.failedShards(); // <3>

for (Map.Entry<String, SyncedFlushResponse.FlushSyncedResponsePerIndex> responsePerIndexEntry:
flushSyncedResponse.getResponsePerIndex().entrySet()) {
String indexName = responsePerIndexEntry.getKey(); // <4>
SyncedFlushResponse.FlushSyncedResponsePerIndex responsePerIndex = responsePerIndexEntry.getValue();
int totalShardsForIndex = responsePerIndex.getTotalShards(); // <5>
int successfulShardsForIndex = responsePerIndex.getSuccessfulShards(); // <6>
int failedShardsForIndex = responsePerIndex.getFailedShards(); // <7>
if (failedShardsForIndex > 0) {
for (Map.Entry<ShardId, SyncedFlushResponse.ShardFailure> failureEntry:
responsePerIndex.getShardFailures().entrySet()) {
int shardId = failureEntry.getKey().id(); // <8>
int totalCopies = failureEntry.getValue().getTotalCopies(); // <9>
int successfulCopies = failureEntry.getValue().getSuccessfulCopies(); // <10>
int failedCopies = failureEntry.getValue().getFailedCopies(); // <11>
String failureReason = failureEntry.getValue().getFailureReason(); // <12>
Optional<ShardRouting> routing = failureEntry.getValue().getShardRouting(); // <13>
}
}
}
// end::flush-synced-response

// tag::flush-synced-execute-listener
ActionListener<SyncedFlushResponse> listener = new ActionListener<SyncedFlushResponse>() {
@Override
public void onResponse(SyncedFlushResponse refreshResponse) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::flush-synced-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::flush-synced-execute-async
client.indices().flushSyncedAsync(request, listener); // <1>
// end::flush-synced-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}

{
// tag::flush-synced-notfound
try {
SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist");
client.indices().flushSynced(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
// <1>
}
}
// end::flush-synced-notfound
}
}

public void testForceMergeIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

Expand Down
94 changes: 94 additions & 0 deletions docs/java-rest/high-level/indices/flush_synced.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
[[java-rest-high-flush]]
=== Flush Synced API

[[java-rest-high-flush-synced-request]]
==== Flush Synced Request

A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request]
--------------------------------------------------
<1> Flush synced one index
<2> Flush synced multiple indices
<3> Flush synced all the indices

==== Optional arguments

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions]
--------------------------------------------------
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
how wildcard expressions are expanded

[[java-rest-high-flush-synced-sync]]
==== Synchronous Execution

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute]
--------------------------------------------------

[[java-rest-high-flush-synced-async]]
==== Asynchronous Execution

The asynchronous execution of a flush request requires both the `SyncedFlushRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async]
--------------------------------------------------
<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when
the execution completes

The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.

A typical listener for `SyncedFlushResponse` looks like:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument

[[java-rest-high-flush-response]]
==== Flush Synced Response

The returned `SyncedFlushResponse` allows to retrieve information about the
executed operation as follows:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response]
--------------------------------------------------
<1> Total number of shards hit by the flush request
<2> Number of shards where the flush has succeeded
<3> Number of shards where the flush has failed
<4> Name of the index whose results we are about to calculate.
<5> Total number of shards for index mentioned in 4.
<6> Successful shards for index mentioned in 4.
<7> Failed shards for index mentioned in 4.
<8> One of the failed shard ids of the failed index mentioned in 4.
<9> Total copies of the shard mentioned in 8.
<10> Successful copies of the shard mentioned in 8.
<11> Failed copies of the shard mentioned in 8.
<12> Reason for failure of copies of the shard mentioned in 8.
<13> Optional. Routing information (like id, state, version etc.) for the failed shard copies.
If the entire shard failed then this returns Optional.empty.

By default, if the indices were not found, an `ElasticsearchException` will be thrown:

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound]
--------------------------------------------------
<1> Do something if the indices to be flushed were not found
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/supported-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Index Management::
* <<java-rest-high-split-index>>
* <<java-rest-high-refresh>>
* <<java-rest-high-flush>>
* <<java-rest-high-flush-synced>>
* <<java-rest-high-clear-cache>>
* <<java-rest-high-force-merge>>
* <<java-rest-high-rollover-index>>
Expand All @@ -86,6 +87,7 @@ include::indices/shrink_index.asciidoc[]
include::indices/split_index.asciidoc[]
include::indices/refresh.asciidoc[]
include::indices/flush.asciidoc[]
include::indices/flush_synced.asciidoc[]
include::indices/clear_cache.asciidoc[]
include::indices/force_merge.asciidoc[]
include::indices/rollover.asciidoc[]
Expand Down
1 change: 1 addition & 0 deletions docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Response:
"total_time_in_millis" : 175576,
"source" : {
"repository" : "my_repository",
"snapshot_uuid" : "faOyRKQOQwy8ze5yhwlAcQ",
"snapshot" : "my_snapshot",
"index" : "index1",
"version" : "{version}"
Expand Down
Loading