diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 445fd7c6a99b6..444a27e744cf5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -34,6 +34,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -265,6 +267,27 @@ public void flushAsync(FlushRequest flushRequest, ActionListener listener, emptySet(), headers); } + /** Initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush, + SyncedFlushResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { + restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush, + SyncedFlushResponse::fromXContent, listener, emptySet(), headers); + } + /** * Force merge one or more indices using the Force Merge API *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index d4cac4cc63553..2f54d1d158b78 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -206,6 +207,14 @@ static Request flush(FlushRequest flushRequest) { return request; } + static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) { + String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced")); + Params parameters = new Params(request); + parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); + return request; + } + static Request forceMerge(ForceMergeRequest forceMergeRequest) { String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices(); Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge")); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index 0feb78d66b2dd..1274164209925 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -38,6 +38,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -448,6 +450,32 @@ public void testFlush() throws IOException { } } + public void testSyncedFlush() throws IOException { + { + String index = "index"; + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index); + SyncedFlushResponse flushResponse = + execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync); + assertThat(flushResponse.totalShards(), equalTo(1)); + assertThat(flushResponse.successfulShards(), equalTo(1)); + assertThat(flushResponse.failedShards(), equalTo(0)); + //assertThat(flushResponse.shardFailures(), equalTo(BroadcastResponse.EMPTY)); + } + { + String nonExistentIndex = "non_existent_index"; + assertFalse(indexExists(nonExistentIndex)); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex); + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync)); + assertEquals(RestStatus.NOT_FOUND, exception.status()); + } + } + public void testClearCache() throws IOException { { String index = "index"; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 3f9428a3aea0d..b58e289d3eb80 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -587,6 +588,29 @@ public void testFlush() { assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); } + public void testSyncedFlush() { + String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); + SyncedFlushRequest syncedFlushRequest; + if (randomBoolean()) { + syncedFlushRequest = new SyncedFlushRequest(indices); + } else { + syncedFlushRequest = new SyncedFlushRequest(); + syncedFlushRequest.indices(indices); + } + Map expectedParams = new HashMap<>(); + setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); + Request request = RequestConverters.syncedFlush(syncedFlushRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + if (indices != null && indices.length > 0) { + endpoint.add(String.join(",", indices)); + } + endpoint.add("_flush/synced"); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getParameters(), equalTo(expectedParams)); + assertThat(request.getEntity(), nullValue()); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + } + public void testForceMerge() { String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); ForceMergeRequest forceMergeRequest; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java index 24c321f87f998..ef59ebe4024c1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java @@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -58,6 +60,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -66,11 +69,13 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -775,6 +780,93 @@ public void onFailure(Exception e) { } } + public void testSyncedFlushIndex() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + createIndex("index1", Settings.EMPTY); + } + + { + // tag::flush-synced-request + SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1> + SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2> + SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3> + // end::flush-synced-request + + // tag::flush-synced-request-indicesOptions + request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1> + // end::flush-synced-request-indicesOptions + + // tag::flush-synced-execute + SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request); + // end::flush-synced-execute + + // tag::flush-synced-response + int totalShards = flushSyncedResponse.totalShards(); // <1> + int successfulShards = flushSyncedResponse.successfulShards(); // <2> + int failedShards = flushSyncedResponse.failedShards(); // <3> + + for (Map.Entry responsePerIndexEntry: + flushSyncedResponse.getResponsePerIndex().entrySet()) { + String indexName = responsePerIndexEntry.getKey(); // <4> + SyncedFlushResponse.FlushSyncedResponsePerIndex responsePerIndex = responsePerIndexEntry.getValue(); + int totalShardsForIndex = responsePerIndex.getTotalShards(); // <5> + int successfulShardsForIndex = responsePerIndex.getSuccessfulShards(); // <6> + int failedShardsForIndex = responsePerIndex.getFailedShards(); // <7> + if (failedShardsForIndex > 0) { + for (Map.Entry failureEntry: + responsePerIndex.getShardFailures().entrySet()) { + int shardId = failureEntry.getKey().id(); // <8> + int totalCopies = failureEntry.getValue().getTotalCopies(); // <9> + int successfulCopies = failureEntry.getValue().getSuccessfulCopies(); // <10> + int failedCopies = failureEntry.getValue().getFailedCopies(); // <11> + String failureReason = failureEntry.getValue().getFailureReason(); // <12> + Optional routing = failureEntry.getValue().getShardRouting(); // <13> + } + } + } + // end::flush-synced-response + + // tag::flush-synced-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SyncedFlushResponse refreshResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::flush-synced-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::flush-synced-execute-async + client.indices().flushSyncedAsync(request, listener); // <1> + // end::flush-synced-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + + { + // tag::flush-synced-notfound + try { + SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist"); + client.indices().flushSynced(request); + } catch (ElasticsearchException exception) { + if (exception.status() == RestStatus.NOT_FOUND) { + // <1> + } + } + // end::flush-synced-notfound + } + } + public void testForceMergeIndex() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/indices/flush_synced.asciidoc b/docs/java-rest/high-level/indices/flush_synced.asciidoc new file mode 100644 index 0000000000000..41d4ae05d3558 --- /dev/null +++ b/docs/java-rest/high-level/indices/flush_synced.asciidoc @@ -0,0 +1,94 @@ +[[java-rest-high-flush]] +=== Flush Synced API + +[[java-rest-high-flush-synced-request]] +==== Flush Synced Request + +A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request] +-------------------------------------------------- +<1> Flush synced one index +<2> Flush synced multiple indices +<3> Flush synced all the indices + +==== Optional arguments + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions] +-------------------------------------------------- +<1> Setting `IndicesOptions` controls how unavailable indices are resolved and +how wildcard expressions are expanded + +[[java-rest-high-flush-synced-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute] +-------------------------------------------------- + +[[java-rest-high-flush-synced-async]] +==== Asynchronous Execution + +The asynchronous execution of a flush request requires both the `SyncedFlushRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async] +-------------------------------------------------- +<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `SyncedFlushResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-flush-response]] +==== Flush Synced Response + +The returned `SyncedFlushResponse` allows to retrieve information about the +executed operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response] +-------------------------------------------------- +<1> Total number of shards hit by the flush request +<2> Number of shards where the flush has succeeded +<3> Number of shards where the flush has failed +<4> Name of the index whose results we are about to calculate. +<5> Total number of shards for index mentioned in 4. +<6> Successful shards for index mentioned in 4. +<7> Failed shards for index mentioned in 4. +<8> One of the failed shard ids of the failed index mentioned in 4. +<9> Total copies of the shard mentioned in 8. +<10> Successful copies of the shard mentioned in 8. +<11> Failed copies of the shard mentioned in 8. +<12> Reason for failure of copies of the shard mentioned in 8. +<13> Optional. Routing information (like id, state, version etc.) for the failed shard copies. +If the entire shard failed then this returns Optional.empty. + +By default, if the indices were not found, an `ElasticsearchException` will be thrown: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound] +-------------------------------------------------- +<1> Do something if the indices to be flushed were not found \ No newline at end of file diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 1c0e09c6c079e..6445f81a78824 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -65,6 +65,7 @@ Index Management:: * <> * <> * <> +* <> * <> * <> * <> @@ -86,6 +87,7 @@ include::indices/shrink_index.asciidoc[] include::indices/split_index.asciidoc[] include::indices/refresh.asciidoc[] include::indices/flush.asciidoc[] +include::indices/flush_synced.asciidoc[] include::indices/clear_cache.asciidoc[] include::indices/force_merge.asciidoc[] include::indices/rollover.asciidoc[] diff --git a/docs/reference/indices/recovery.asciidoc b/docs/reference/indices/recovery.asciidoc index 49f58e645bcda..ef1cac48dd5bc 100644 --- a/docs/reference/indices/recovery.asciidoc +++ b/docs/reference/indices/recovery.asciidoc @@ -88,6 +88,7 @@ Response: "total_time_in_millis" : 175576, "source" : { "repository" : "my_repository", + "snapshot_uuid" : "faOyRKQOQwy8ze5yhwlAcQ", "snapshot" : "my_snapshot", "index" : "index1", "version" : "{version}" diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java index 890e968fe60a4..3ef0bff91dee5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java @@ -19,31 +19,41 @@ package org.elasticsearch.action.admin.indices.flush; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; +import java.util.ArrayList; +import java.util.Map; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * The result of performing a sync flush operation on all shards of multiple indices */ public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment { + Map responsePerIndex; + Map shardCountsPerIndex; Map> shardsResultPerIndex; ShardCounts shardCounts; @@ -57,6 +67,20 @@ public SyncedFlushResponse(Map> shardsResu // ConcurrentHashMap this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values())); + Map shardsCountsPerIndex = new HashMap<>(); + for (Map.Entry> entry: shardsResultPerIndex.entrySet()) { + shardsCountsPerIndex.put(entry.getKey(), calculateShardCounts(entry.getValue())); + } + this.shardCountsPerIndex = unmodifiableMap(shardsCountsPerIndex); + this.responsePerIndex = unmodifiableMap(buildResponsePerIndex()); + } + + public SyncedFlushResponse(ShardCounts shardCounts, Map> shardsResultPerIndex, + Map shardCountsPerIndex) { + this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); + this.shardCounts = shardCounts; + this.shardCountsPerIndex = shardCountsPerIndex; + this.responsePerIndex = unmodifiableMap(buildResponsePerIndex()); } /** @@ -88,6 +112,56 @@ public Map> getShardsResultPerIndex() { return shardsResultPerIndex; } + /** + * @return FlushSyncedResponsePerIndex for each index that was sent in the request + */ + public Map getResponsePerIndex() { + return this.responsePerIndex; + } + + + private Map buildResponsePerIndex() { + Map responsePerIndex = new HashMap<>(); + for (Map.Entry entry: shardCountsPerIndex.entrySet()) { + String indexName = entry.getKey(); + ShardCounts shardCounts = entry.getValue(); + Map shardFailures = new HashMap<>(); + // If there were no failures shardFailures would be an empty array + if (shardCounts.failed > 0) { + List indexResult = shardsResultPerIndex.get(indexName); + for (ShardsSyncedFlushResult shardResults : indexResult) { + if (shardResults.failed()) { + shardFailures.put( + shardResults.shardId(), + new ShardFailure( + shardResults.shardId(), + shardResults.failureReason(), + shardResults.totalShards(), + shardResults.successfulShards(), + Optional.empty()) + ); + continue; + } + Map failedShards = shardResults.failedShards(); + for (Map.Entry shardEntry : failedShards.entrySet()) { + shardFailures.put( + shardResults.shardId(), + new ShardFailure( + shardResults.shardId(), + shardResults.failureReason(), + shardResults.totalShards(), + shardResults.successfulShards(), + Optional.of(shardEntry.getKey()) + ) + ); + } + } + } + responsePerIndex.put(indexName, new FlushSyncedResponsePerIndex(indexName, shardCounts, shardFailures)); + } + return responsePerIndex; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields._SHARDS); @@ -96,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (Map.Entry> indexEntry : shardsResultPerIndex.entrySet()) { List indexResult = indexEntry.getValue(); builder.startObject(indexEntry.getKey()); - ShardCounts indexShardCounts = calculateShardCounts(indexResult); + ShardCounts indexShardCounts = shardCountsPerIndex.get(indexEntry.getKey()); indexShardCounts.toXContent(builder, params); if (indexShardCounts.failed > 0) { builder.startArray(Fields.FAILURES); @@ -105,6 +179,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardResults.failureReason()); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.endObject(); continue; } @@ -113,6 +189,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardEntry.getValue().failureReason()); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.ROUTING, shardEntry.getKey()); builder.endObject(); } @@ -124,6 +202,124 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + parser.nextToken(); + return innerFromXContent(parser); + } + + private static SyncedFlushResponse innerFromXContent(XContentParser parser) throws IOException { + ShardCounts totalShardCounts = null; + Map shardsCountsPerIndex = new HashMap<>(); + Map> shardsResultPerIndex = new HashMap<>(); + // If it is an object we try to parse it for Fields._SHARD or for an index entry + for (Token curToken = parser.currentToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); + String currentName = parser.currentName(); + curToken = parser.nextToken(); + if (curToken == Token.START_OBJECT) { // Start parsing for _shard or for index + Boolean isIndex = !currentName.equals(Fields._SHARDS); + String indexName = isIndex ? currentName : null; + Integer totalShards = null; + Integer successfulShards = null; + Integer failedShards = null; + Map> failures = null; + for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + if (curToken == Token.FIELD_NAME) { + currentName = parser.currentName(); + curToken = parser.nextToken(); + switch (currentName) { + case Fields.TOTAL: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + totalShards = parser.intValue(); + break; + case Fields.SUCCESSFUL: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + successfulShards = parser.intValue(); + break; + case Fields.FAILED: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + failedShards = parser.intValue(); + break; + case Fields.FAILURES: + if (isIndex) { + ensureExpectedToken(Token.START_ARRAY, curToken, parser::getTokenLocation); + failures = shardFailuresFromXContent(parser, indexName); + } else { + parser.skipChildren(); + } + break; + default: + parser.skipChildren(); + break; + } + } else { + parser.skipChildren(); + } + } + if (totalShards != null && + successfulShards != null && + failedShards != null) { + ShardCounts shardCount = new ShardCounts(totalShards, successfulShards, failedShards); + if (!isIndex) { + totalShardCounts = shardCount; + } else { + List results = new ArrayList<>(); + if (failures != null) { + // All failures in this list belong to the same index + for (Map.Entry> entry: failures.entrySet()) { + Map shardResponses = new HashMap<>(); + for (ShardFailure container: entry.getValue()) { + container.maybeShardRouting.ifPresent(shardRouting -> + shardResponses.put(shardRouting, + new SyncedFlushService.ShardSyncedFlushResponse(container.failureReason) + ) + ); + } + // Size of entry.getValue() will at least be one + ShardFailure container = entry.getValue().get(0); + if (!shardResponses.isEmpty()) { + results.add( + new ShardsSyncedFlushResult(container.shardId, null, container.totalCopies, + container.successfulCopies, shardResponses) + ); + } else { + results.add( + new ShardsSyncedFlushResult(container.shardId, container.totalCopies, + container.successfulCopies, container.failureReason) + ); + } + } + } // if failures were null then no failures were reported + shardsCountsPerIndex.put(indexName, shardCount); + shardsResultPerIndex.put(indexName, results); + } + } + } else { // Else leave this tree alone + parser.skipChildren(); + } + } + return new SyncedFlushResponse(totalShardCounts, shardsResultPerIndex, shardsCountsPerIndex); + } + + private static Map> shardFailuresFromXContent( + XContentParser parser, + String indexName) throws IOException { + + Map> failures = new HashMap<>(); + for (Token curToken = parser.nextToken(); curToken != Token.END_ARRAY; curToken = parser.nextToken()) { + ensureExpectedToken(Token.START_OBJECT, curToken, parser::getTokenLocation); + ShardFailure failure = ShardFailure.fromXContent(parser, indexName); + // This is ugly but there is only one ShardsSyncedFlushResult for each shardId + // so this will work. + if (!failures.containsKey(failure.shardId)) { + failures.put(failure.shardId, new ArrayList<>()); + } + failures.get(failure.shardId).add(failure); + } + return failures; + } + static ShardCounts calculateShardCounts(Iterable results) { int total = 0, successful = 0, failed = 0; for (ShardsSyncedFlushResult result : results) { @@ -140,6 +336,131 @@ static ShardCounts calculateShardCounts(Iterable result return new ShardCounts(total, successful, failed); } + // Only used as a container for XContent + public static final class ShardFailure { + ShardId shardId; + String failureReason; + Optional maybeShardRouting; + int totalCopies; + int successfulCopies; + int failedCopies; + + ShardFailure(ShardId shardId, String failureReason, int totalCopies, int successfulCopies, + Optional maybeShardRouting) { + this.shardId = shardId; + this.failureReason = failureReason; + this.maybeShardRouting = maybeShardRouting; + this.totalCopies = totalCopies; + this.successfulCopies = successfulCopies; + this.failedCopies = this.totalCopies - this.successfulCopies; + } + + public ShardId getShardId() { + return shardId; + } + + public String getFailureReason() { + return failureReason; + } + + public Optional getShardRouting() { + return maybeShardRouting; + } + + public int getTotalCopies() { + return totalCopies; + } + + public int getSuccessfulCopies() { + return successfulCopies; + } + + public int getFailedCopies() { + return failedCopies; + } + + public static ShardFailure fromXContent(XContentParser parser, String indexName) throws IOException { + ShardRouting routing = null; + String failureReason = null; + Integer totalCopies = null; + Integer successfulCopies = null; + ShardId shardId = null; + Token curToken; + XContentLocation startLocation = parser.getTokenLocation(); + for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, curToken, parser::getTokenLocation); + String currentFieldName = parser.currentName(); + curToken = parser.nextToken(); + switch (currentFieldName) { + case Fields.SHARD: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + shardId = new ShardId( + indexName, + IndexMetaData.INDEX_UUID_NA_VALUE, + parser.intValue() + ); + break; + case Fields.REASON: + ensureExpectedToken(Token.VALUE_STRING, curToken, parser::getTokenLocation); + failureReason = parser.text(); + break; + case Fields.TOTAL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + totalCopies = parser.intValue(); + break; + case Fields.SUCCESSFUL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + successfulCopies = parser.intValue(); + break; + case Fields.ROUTING: + routing = ShardRouting.fromXContent(parser); + break; + default: + // If something else skip it + parser.skipChildren(); + break; + } + } + if (failureReason != null && + shardId != null && + totalCopies != null && + successfulCopies != null) { + return new ShardFailure(shardId, failureReason, totalCopies, successfulCopies, Optional.ofNullable(routing)); + } else { + throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); + } + } + } + + // Only used for response objects + public static final class FlushSyncedResponsePerIndex { + String index; + ShardCounts shardCounts; + Map shardFailures; + + FlushSyncedResponsePerIndex(String index, ShardCounts counts, Map shardFailures) { + this.index = index; + this.shardCounts = counts; + this.shardFailures = shardFailures; + } + + public int getTotalShards() { + return shardCounts.total; + } + + public int getSuccessfulShards() { + return shardCounts.successful; + } + + public int getFailedShards() { + return shardCounts.failed; + } + + public Map getShardFailures() { + return shardFailures; + } + } + static final class ShardCounts implements ToXContentFragment, Streamable { public int total; @@ -185,6 +506,9 @@ static final class Fields { static final String SUCCESSFUL = "successful"; static final String FAILED = "failed"; static final String FAILURES = "failures"; + static final String TOTAL_COPIES = "total_copies"; + static final String FAILED_COPIES = "failed_copies"; + static final String SUCCESSFUL_COPIES = "successful_copies"; static final String SHARD = "shard"; static final String ROUTING = "routing"; static final String REASON = "reason"; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index ff7aab4a25622..a311786434e6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -20,16 +20,23 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.Version; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; import java.util.Objects; +import org.elasticsearch.snapshots.SnapshotId; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Represents the recovery source of a shard. Available recovery types are: @@ -49,6 +56,95 @@ public final XContentBuilder toXContent(XContentBuilder builder, ToXContent.Para return builder.endObject(); } + public static RecoverySource fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentLocation startingPosition = parser.getTokenLocation(); + Type type = null; + RecoverySource recoverySource = null; + // The following fields should ideally be handled by the child classes but the structure of the + // JSON prohibits this for now. + Version version = null; + String index = null; + String snapshotRepository = null; + SnapshotId snapshotId = null; + String snapshotUUID = null; + String snapshotName = null; + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, t, parser::getTokenLocation); + String fieldName = parser.currentName(); + t = parser.nextToken(); + if (t.isValue()) { + switch (fieldName) { + case "type": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + String typeString = parser.text(); + type = Type.valueOf(parser.text()); + switch (type) { + case EMPTY_STORE: + recoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE; + break; + case EXISTING_STORE: + recoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE; + break; + case PEER: + recoverySource = PeerRecoverySource.INSTANCE; + break; + case SNAPSHOT: + // We don't do anything. Will construct it from other stuff later + break; + case LOCAL_SHARDS: + recoverySource = LocalShardsRecoverySource.INSTANCE; + break; + default: throw new ParsingException(parser.getTokenLocation(), + "unknown recovery type: " + typeString); + } + break; + case "repository": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + snapshotRepository = parser.text(); + break; + case "snapshot": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + snapshotName = parser.text(); + break; + case "snapshot_uuid": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + snapshotUUID = parser.text(); + break; + case "version": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + version = Version.fromString(parser.text()); + break; + case "index": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + index = parser.text(); + break; + default: + } + } else { // Else skip the tree + parser.skipChildren(); + } + } + // We only check if necessary information is present. Extra stuff is ignored. + if (type != null) { + if (recoverySource != null) { + return recoverySource; + } else if ( + type == Type.SNAPSHOT && + version != null && + index != null && + snapshotRepository != null && + snapshotName != null) { + snapshotId = new SnapshotId(snapshotName, snapshotUUID != null ? snapshotUUID : snapshotName); + return new SnapshotRecoverySource(new Snapshot(snapshotRepository, snapshotId), version, index); + } else { + throw new ParsingException(startingPosition, "Unable to recover RecoverySource from JSON"); + } + } else { + throw new ParsingException(startingPosition, "Unable to find type for RecoverySource from JSON"); + } + } + /** * to be overridden by subclasses */ @@ -198,6 +294,7 @@ public Type getType() { @Override public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.field("repository", snapshot.getRepository()) + .field("snapshot_uuid", snapshot.getSnapshotId().getUUID()) .field("snapshot", snapshot.getSnapshotId().getName()) .field("version", version.toString()) .field("index", index); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 6a9a105b6c432..ddef6b77d4948 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -19,16 +19,21 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -36,6 +41,8 @@ import java.util.Collections; import java.util.List; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + /** * {@link ShardRouting} immutably encapsulates information about shard * indexRoutings like id, state, version, etc. @@ -47,6 +54,20 @@ public final class ShardRouting implements Writeable, ToXContentObject { */ public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1; + /** + * XContent Fields + */ + static final String STATE = "state"; + static final String PRIMARY = "primary"; + static final String NODE = "node"; + static final String RELOCATING_NODE = "relocating_node"; + static final String SHARD = "shard"; + static final String INDEX = "index"; + static final String EXPECTED_SHARD_SIZE_IN_BYTES = "expected_shard_size_in_bytes"; + static final String RECOVERY_SOURCE = "recovery_source"; + static final String ALLOCATION_ID = "allocation_id"; + static final String UNASSIGNED_INFO = "unassigned_info"; + private final ShardId shardId; private final String currentNodeId; private final String relocatingNodeId; @@ -64,8 +85,9 @@ public final class ShardRouting implements Writeable, ToXContentObject { * A constructor to internally create shard routing instances, note, the internal flag should only be set to true * by either this class or tests. Visible for testing. */ - ShardRouting(ShardId shardId, String currentNodeId, - String relocatingNodeId, boolean primary, ShardRoutingState state, RecoverySource recoverySource, + ShardRouting(ShardId shardId, @Nullable String currentNodeId, + @Nullable String relocatingNodeId, boolean primary, + @Nullable ShardRoutingState state, RecoverySource recoverySource, UnassignedInfo unassignedInfo, AllocationId allocationId, long expectedShardSize) { this.shardId = shardId; this.currentNodeId = currentNodeId; @@ -615,20 +637,20 @@ public String shortSummary() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject() - .field("state", state()) - .field("primary", primary()) - .field("node", currentNodeId()) - .field("relocating_node", relocatingNodeId()) - .field("shard", id()) - .field("index", getIndexName()); + .field(STATE, state()) + .field(PRIMARY, primary()) + .field(NODE, currentNodeId()) + .field(RELOCATING_NODE, relocatingNodeId()) + .field(SHARD, id()) + .field(INDEX, getIndexName()); if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { - builder.field("expected_shard_size_in_bytes", expectedShardSize); + builder.field(EXPECTED_SHARD_SIZE_IN_BYTES, expectedShardSize); } if (recoverySource != null) { - builder.field("recovery_source", recoverySource); + builder.field(RECOVERY_SOURCE, recoverySource); } if (allocationId != null) { - builder.field("allocation_id"); + builder.field(ALLOCATION_ID); allocationId.toXContent(builder, params); } if (unassignedInfo != null) { @@ -637,6 +659,101 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } + + public static ShardRouting fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentLocation startingLocation = parser.getTokenLocation(); + ShardRoutingState state = null; + Boolean isPrimary = null; + String nodeId = null; + String relocatingNodeid = null; + Integer shardId = null; + String indexName = null; + long expectedShardSizeInBytes = -1; + RecoverySource recoverySource = null; + AllocationId allocationId = null; + UnassignedInfo unassignedInfo = null; + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); + String fieldName = parser.currentName(); + Token currentToken = parser.nextToken(); // Move to value of the field + switch (fieldName) { + case STATE: + if (currentToken == Token.VALUE_STRING) { + state = ShardRoutingState.valueOf(parser.text()); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } + break; + case PRIMARY: + ensureExpectedToken(Token.VALUE_BOOLEAN, currentToken, parser::getTokenLocation); + isPrimary = parser.booleanValue(); + break; + case NODE: + if (currentToken == Token.VALUE_STRING) { + nodeId = parser.text(); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } + break; + case RELOCATING_NODE: + if (currentToken == Token.VALUE_STRING) { + relocatingNodeid = parser.text(); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } + break; + case SHARD: + ensureExpectedToken(Token.VALUE_NUMBER, currentToken, parser::getTokenLocation); + shardId = parser.intValue(); + break; + case INDEX: + ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); + indexName = parser.text(); + break; + case EXPECTED_SHARD_SIZE_IN_BYTES: + ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); + expectedShardSizeInBytes = parser.longValue(); + break; + case RECOVERY_SOURCE: + recoverySource = RecoverySource.fromXContent(parser); + break; + case ALLOCATION_ID: + allocationId = AllocationId.fromXContent(parser); + break; + case UNASSIGNED_INFO: + unassignedInfo = UnassignedInfo.fromXContent(parser); + break; + default: + parser.skipChildren(); // Else skip the whole tree + break; + + } + } + if (state != null && + isPrimary != null && + shardId != null && + indexName != null) { + return + new ShardRouting( + new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), + nodeId, + relocatingNodeid, + isPrimary, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSizeInBytes + ); + } else { + throw new ParsingException(startingLocation, "Unable to construct ShardRouting information from JSON"); + } + } + /** * Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING} * shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index a543f4c3d3b3e..dbc36de37dfe4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -34,13 +35,18 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import java.io.IOException; import java.util.Locale; import java.util.Objects; +import org.joda.time.DateTime; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Holds additional information as to why the shard is in unassigned state. @@ -49,6 +55,8 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable { public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime"); + private static final String MSG_DELIMITER = ","; + public static final Setting INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING = Setting.positiveTimeSetting("index.unassigned.node_left.delayed_timeout", TimeValue.timeValueMinutes(1), Property.Dynamic, Property.IndexScope); @@ -347,7 +355,7 @@ public String getDetails() { if (message == null) { return null; } - return message + (failure == null ? "" : ", failure " + ExceptionsHelper.detailedMessage(failure)); + return message + (failure == null ? "" : MSG_DELIMITER + " failure " + ExceptionsHelper.detailedMessage(failure)); } /** @@ -431,21 +439,96 @@ public String toString() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("unassigned_info"); - builder.field("reason", reason); - builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)); + builder.field(Fields.REASON, reason); + builder.field(Fields.AT, DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)); if (failedAllocations > 0) { - builder.field("failed_attempts", failedAllocations); + builder.field(Fields.FAILED_ATTEMPTS, failedAllocations); } - builder.field("delayed", delayed); + builder.field(Fields.DELAYED, delayed); String details = getDetails(); if (details != null) { - builder.field("details", details); + builder.field(Fields.DETAILS, details); } - builder.field("allocation_status", lastAllocationStatus.value()); + builder.field(Fields.ALLOCATION_STATUS, lastAllocationStatus.value()); builder.endObject(); return builder; } + public static UnassignedInfo fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentLocation startingLocation = parser.getTokenLocation(); + Reason reason = null; + String message = null; + // The exception always remains null as constructing an exception from 'details' + // is too much work + Exception failure = null; + int failedAllocations = 0; + // See UnassignedInfo(StreamInput in) constructor for details on why we reset the time here + Long unassignedTimeNanos = System.nanoTime(); + Long unassignedTimeMillis = null; + Boolean delayed = null; + AllocationStatus allocationStatus = null; + + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, t, parser::getTokenLocation); + String fieldName = parser.currentName(); + t = parser.nextToken(); + switch (fieldName) { + case Fields.REASON: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + reason = Reason.valueOf(parser.text().toUpperCase(Locale.ROOT)); + break; + case Fields.AT: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + DateTime dt = DATE_TIME_FORMATTER.parser().parseDateTime(parser.text()); + unassignedTimeMillis = dt.getMillis(); + break; + case Fields.FAILED_ATTEMPTS: + ensureExpectedToken(Token.VALUE_NUMBER, t, parser::getTokenLocation); + failedAllocations = parser.intValue(); + break; + case Fields.DELAYED: + ensureExpectedToken(Token.VALUE_BOOLEAN, t, parser::getTokenLocation); + delayed = parser.booleanValue(); + break; + case Fields.DETAILS: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + // We ignore the exception but take the message out + // This only works if the message itself did not contain the delimiter + // The length of the resulting array from split can never be smaller than 1 + // The pattern MSG_DELIMITER is applied limit-1 times which serves our purpose + message = parser.text().split(MSG_DELIMITER, 2)[0]; + break; + case Fields.ALLOCATION_STATUS: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + allocationStatus = AllocationStatus.valueOf(parser.text().toUpperCase(Locale.ROOT)); + break; + default: + parser.skipChildren(); // else skip the whole tree with this fieldname + break; + } + } + if (reason != null && + unassignedTimeMillis != null && + delayed != null && + allocationStatus != null + ) { + return new UnassignedInfo(reason, message, null, failedAllocations, unassignedTimeNanos, + unassignedTimeMillis, delayed, allocationStatus); + } else { + throw new ParsingException(startingLocation, "Unable to construct UnassignedInfo from JSON"); + } + } + + static final class Fields { + public static final String REASON = "reason"; + public static final String AT = "at"; + public static final String FAILED_ATTEMPTS = "failed_attempts"; + public static final String DELAYED = "delayed"; + public static final String DETAILS = "details"; + public static final String ALLOCATION_STATUS = "allocation_status"; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java index 04564943ca790..109e0eb7c2721 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -41,8 +41,11 @@ public class ShardsSyncedFlushResult implements Streamable { private ShardId shardId; // some shards may be unassigned, so we need this as state private int totalShards; + // we will use lazy initialization for this if not getting from XContent + private int successfulShards; private ShardsSyncedFlushResult() { + this.successfulShards = -1; } public ShardId getShardId() { @@ -58,6 +61,7 @@ public ShardsSyncedFlushResult(ShardId shardId, int totalShards, String failureR this.shardResponses = emptyMap(); this.shardId = shardId; this.totalShards = totalShards; + this.successfulShards = -1; } /** @@ -69,6 +73,32 @@ public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, this.syncId = syncId; this.totalShards = totalShards; this.shardId = shardId; + this.successfulShards = -1; + } + + /** + * failure constructor for XContent deserialization + */ + public ShardsSyncedFlushResult(ShardId shardId, int totalShards, int successfulShards, String failureReason) { + this.syncId = null; + this.failureReason = failureReason; + this.shardResponses = emptyMap(); + this.shardId = shardId; + this.totalShards = totalShards; + this.successfulShards = successfulShards; + } + + /** + * success contructor for XContent deserialization + */ + public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, int successfulShards, + Map shardResponses) { + this.failureReason = null; + this.shardResponses = unmodifiableMap(new HashMap<>(shardResponses)); + this.syncId = syncId; + this.totalShards = totalShards; + this.shardId = shardId; + this.successfulShards = successfulShards; } /** @@ -101,13 +131,16 @@ public int totalShards() { * @return total number of successful shards */ public int successfulShards() { - int i = 0; - for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) { - if (result.success()) { - i++; + if (this.successfulShards < 0) { + int i = 0; + for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) { + if (result.success()) { + i++; + } } + this.successfulShards = i; } - return i; + return this.successfulShards; } /** diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java new file mode 100644 index 0000000000000..b35e205176e7e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.flush; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; +import org.elasticsearch.test.ESTestCase; + +public abstract class AbstractSyncedFlushTest extends ESTestCase { + protected static class TestPlan { + public SyncedFlushResponse.ShardCounts totalCounts; + public Map countsPerIndex = new HashMap<>(); + public ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); + public SyncedFlushResponse result; + } + + protected TestPlan createTestPlan() { + final TestPlan testPlan = new TestPlan(); + final Map> indicesResults = new HashMap<>(); + final int indexCount = randomIntBetween(1, 10); + int totalShards = 0; + int totalSuccesful = 0; + int totalFailed = 0; + for (int i = 0; i < indexCount; i++) { + final String index = "index_" + i; + int shards = randomIntBetween(1, 4); + int replicas = randomIntBetween(0, 2); + int successful = 0; + int failed = 0; + int failures = 0; + List shardsResults = new ArrayList<>(); + for (int shard = 0; shard < shards; shard++) { + final ShardId shardId = new ShardId(index, "_na_", shard); + if (randomInt(5) < 2) { + // total shard failure + failed += replicas + 1; + failures++; + shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); + } else { + Map shardResponses = new HashMap<>(); + for (int copy = 0; copy < replicas + 1; copy++) { + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + index, shard, "node_" + shardId + "_" + copy, null, + copy == 0, ShardRoutingState.STARTED + ); + if (randomInt(5) < 2) { + // shard copy failure + failed++; + failures++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); + } else { + successful++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); + } + } + shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); + } + } + indicesResults.put(index, shardsResults); + testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); + testPlan.expectedFailuresPerIndex.put(index, failures); + totalFailed += failed; + totalShards += shards * (replicas + 1); + totalSuccesful += successful; + } + testPlan.result = new SyncedFlushResponse(indicesResults); + testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccesful, totalFailed); + return testPlan; + } + + public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) { + if (first == null) { + assertNull(second); + } else { + assertNotNull(second); + assertEquals(first.successful, second.successful); + assertEquals(first.failed, second.failed); + assertEquals(first.total, second.total); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java new file mode 100644 index 0000000000000..6052c77786254 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.flush; + +import java.io.IOException; +import java.util.Map; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; + +public class SyncedFlushResponseTests extends AbstractSyncedFlushTest { + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + TestPlan plan = createTestPlan(); + SyncedFlushResponse response = plan.result; + assertNotNull(response); + builder.startObject(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); + assertNotNull(parsedResponse); + assertShardCounts(response.shardCounts, parsedResponse.shardCounts); + for (Map.Entry entry: response.getResponsePerIndex().entrySet()) { + String index = entry.getKey(); + // assertShardCounts(entry.getValue().shardCounts, parsedResponse.shardCountsPerIndex.get(index)); + SyncedFlushResponse.FlushSyncedResponsePerIndex responseResult = entry.getValue(); + SyncedFlushResponse.FlushSyncedResponsePerIndex parsedResult = parsedResponse.responsePerIndex.get(index); + assertNotNull(responseResult); + assertNotNull(parsedResult); + assertShardCounts(responseResult.shardCounts, parsedResult.shardCounts); + assertEquals(responseResult.shardFailures.size(), parsedResult.shardFailures.size()); + for (Map.Entry failureEntry: responseResult.shardFailures.entrySet()) { + ShardId id = failureEntry.getKey(); + SyncedFlushResponse.ShardFailure responseShardFailure = failureEntry.getValue(); + SyncedFlushResponse.ShardFailure parsedShardFailure = parsedResult.shardFailures.get(id); + assertNotNull(parsedShardFailure); + assertEquals(responseShardFailure.successfulCopies, parsedShardFailure.successfulCopies); + assertEquals(responseShardFailure.totalCopies, parsedShardFailure.totalCopies); + assertEquals(responseShardFailure.failedCopies, parsedShardFailure.failedCopies); + // We skip shard routing information here. + // Separate tests for shard routing verification exist in + // org.elasticsearch.cluster.routing.ShardRoutingTests + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java index 7040c92ec1d27..2bb100b4b5633 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java @@ -19,23 +19,15 @@ package org.elasticsearch.action.admin.indices.flush; -import com.carrotsearch.hppc.ObjectIntHashMap; -import com.carrotsearch.hppc.ObjectIntMap; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse.ShardCounts; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,14 +35,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -public class SyncedFlushUnitTests extends ESTestCase { - - private static class TestPlan { - public SyncedFlushResponse.ShardCounts totalCounts; - public Map countsPerIndex = new HashMap<>(); - public ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); - public SyncedFlushResponse result; - } +public class SyncedFlushUnitTests extends AbstractSyncedFlushTest { public void testIndicesSyncedFlushResult() throws IOException { final TestPlan testPlan = createTestPlan(); @@ -132,56 +117,4 @@ private void assertShardCount(String name, Map header, ShardCoun assertThat(name + " has unexpected failed count", (Integer) header.get("failed"), equalTo(expectedCounts.failed)); } - protected TestPlan createTestPlan() { - final TestPlan testPlan = new TestPlan(); - final Map> indicesResults = new HashMap<>(); - final int indexCount = randomIntBetween(1, 10); - int totalShards = 0; - int totalSuccesful = 0; - int totalFailed = 0; - for (int i = 0; i < indexCount; i++) { - final String index = "index_" + i; - int shards = randomIntBetween(1, 4); - int replicas = randomIntBetween(0, 2); - int successful = 0; - int failed = 0; - int failures = 0; - List shardsResults = new ArrayList<>(); - for (int shard = 0; shard < shards; shard++) { - final ShardId shardId = new ShardId(index, "_na_", shard); - if (randomInt(5) < 2) { - // total shard failure - failed += replicas + 1; - failures++; - shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); - } else { - Map shardResponses = new HashMap<>(); - for (int copy = 0; copy < replicas + 1; copy++) { - final ShardRouting shardRouting = TestShardRouting.newShardRouting(index, shard, "node_" + shardId + "_" + copy, null, - copy == 0, ShardRoutingState.STARTED); - if (randomInt(5) < 2) { - // shard copy failure - failed++; - failures++; - shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); - } else { - successful++; - shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); - } - } - shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); - } - } - indicesResults.put(index, shardsResults); - testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); - testPlan.expectedFailuresPerIndex.put(index, failures); - totalFailed += failed; - totalShards += shards * (replicas + 1); - totalSuccesful += successful; - } - testPlan.result = new SyncedFlushResponse(indicesResults); - testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccesful, totalFailed); - return testPlan; - } - } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java index 1929c15f7d5ce..cc82ee35727b1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java @@ -19,7 +19,13 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -38,6 +44,25 @@ public void testSerialization() throws IOException { assertEquals(recoverySource, serializedRecoverySource); } + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + RecoverySource original = TestShardRouting.randomRecoverySource(); + assertNotNull(original); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + original.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); // Move it to the first token + RecoverySource deserealized = RecoverySource.fromXContent(parser); + assertEquals(original.getType(), deserealized.getType()); + assertEquals(original, deserealized); + } + public void testRecoverySourceTypeOrder() { assertEquals(RecoverySource.Type.EMPTY_STORE.ordinal(), 0); assertEquals(RecoverySource.Type.EXISTING_STORE.ordinal(), 1); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index f87f918d99ecc..2d7d57482c56a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -21,6 +21,12 @@ import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotId; @@ -233,4 +239,32 @@ public void testExpectedSize() throws IOException { } } } + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + ShardRouting routing = randomShardRouting("index", randomInt(5)); + assertNotNull(routing); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + routing.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); + ShardRouting deserializedRouting = ShardRouting.fromXContent(parser); + assertNotNull(deserializedRouting); + assertEquals(routing.state(), deserializedRouting.state()); + assertEquals(routing.primary(), deserializedRouting.primary()); + assertEquals(routing.currentNodeId(), deserializedRouting.currentNodeId()); + assertEquals(routing.relocatingNodeId(), deserializedRouting.relocatingNodeId()); + assertEquals(routing.id(), deserializedRouting.id()); // Check if shardId is equals + assertEquals(routing.getIndexName(), deserializedRouting.getIndexName()); + assertEquals(routing.getExpectedShardSize(), deserializedRouting.getExpectedShardSize()); + assertEquals(routing.allocationId(), deserializedRouting.allocationId()); + // We skip RecoverySource and UnassignedInfo here. Will test the serialization for them in + // a different class + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index d8f7f6552f908..10d2907ae07b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -33,10 +33,16 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; @@ -78,11 +84,7 @@ public void testReasonOrdinalOrder() { } public void testSerialization() throws Exception { - UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); - UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), - System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + UnassignedInfo meta = createRandom(); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); out.close(); @@ -95,6 +97,41 @@ public void testSerialization() throws Exception { assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); } + public void testXContentSerialization() throws Exception { + final XContentType xContentType = randomFrom(XContentType.values()); + UnassignedInfo original = createRandom(); + assertNotNull(original); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + builder.startObject(); + original.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); // move to the outer object + parser.nextToken(); // move to the field name + assertEquals(XContentParser.Token.FIELD_NAME, parser.currentToken()); + parser.nextToken(); // move to inner object object and now start parsing + UnassignedInfo deserialized = UnassignedInfo.fromXContent(parser); + assertThat(deserialized.getReason(), equalTo(original.getReason())); + assertThat(deserialized.getUnassignedTimeInMillis(), equalTo(original.getUnassignedTimeInMillis())); + assertThat(deserialized.getMessage(), equalTo(original.getMessage())); + assertThat(deserialized.getDetails(), equalTo(original.getDetails())); + assertThat(deserialized.getNumFailedAllocations(), equalTo(original.getNumFailedAllocations())); + } + + private UnassignedInfo createRandom() { + UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); + return reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? + new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), + System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): + new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + } + public void testIndexCreated() { MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))