From d656e3db592f29466d35452867caa241f5429485 Mon Sep 17 00:00:00 2001 From: Rohit Ashiwal Date: Thu, 28 Sep 2023 18:33:41 +0530 Subject: [PATCH] Indexing: add Doc Status Counter (#8716) Currently, Opensearch returns a 200 OK response code for a Bulk API call, even though there can be partial/complete failures within the request E2E. This provides doc level stats with respect to the rest status code as 2xx, 4xx, 5xx etc. Signed-off-by: Rohit Ashiwal --- CHANGELOG.md | 1 + .../org/opensearch/core/rest/RestStatus.java | 9 + .../test/nodes.stats/11_indices_metrics.yml | 29 ++ .../org/opensearch/nodestats/NodeStatsIT.java | 276 ++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 47 ++- .../action/update/TransportUpdateAction.java | 25 +- .../opensearch/index/shard/IndexingStats.java | 123 +++++++- .../index/shard/InternalIndexingStats.java | 3 +- .../opensearch/indices/IndicesService.java | 10 + ...ActionIndicesThatCannotBeCreatedTests.java | 1 + .../bulk/TransportBulkActionIngestTests.java | 1 + .../action/bulk/TransportBulkActionTests.java | 3 + .../bulk/TransportBulkActionTookTests.java | 1 + .../org/opensearch/core/RestStatusTests.java | 91 ++++++ .../index/shard/IndexingStatsTests.java | 141 +++++++++ .../snapshots/SnapshotResiliencyTests.java | 1 + 16 files changed, 744 insertions(+), 18 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java create mode 100644 server/src/test/java/org/opensearch/core/RestStatusTests.java create mode 100644 server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 780db204aa1aa..b0b3ce261d517 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) - Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131)) - Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189)) +- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java index 313bc23bedc90..8441ce8b1b622 100644 --- a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java +++ b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java @@ -527,6 +527,15 @@ public int getStatus() { return status; } + /** + * Get category class of a rest status code. + * + * @return Integer representing class category of the concrete rest status code + */ + public int getStatusFamilyCode() { + return status / 100; + } + public static RestStatus readFrom(StreamInput in) throws IOException { return RestStatus.valueOf(in.readString()); } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml index 1f1f42890355e..3f79227ce64e8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml @@ -138,6 +138,35 @@ - is_false: nodes.$node_id.indices.translog - is_false: nodes.$node_id.indices.recovery +--- +"Metric - indexing doc_status": + - skip: + version: " - 2.99.99" + reason: "To be introduced in future release :: TODO: change if/when we backport to 2.x" + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: { metric: indices, index_metric: indexing } + + - is_false: nodes.$node_id.indices.docs + - is_false: nodes.$node_id.indices.store + - is_true: nodes.$node_id.indices.indexing + - is_true: nodes.$node_id.indices.indexing.doc_status + - is_false: nodes.$node_id.indices.get + - is_false: nodes.$node_id.indices.search + - is_false: nodes.$node_id.indices.merges + - is_false: nodes.$node_id.indices.refresh + - is_false: nodes.$node_id.indices.flush + - is_false: nodes.$node_id.indices.warmer + - is_false: nodes.$node_id.indices.query_cache + - is_false: nodes.$node_id.indices.fielddata + - is_false: nodes.$node_id.indices.completion + - is_false: nodes.$node_id.indices.segments + - is_false: nodes.$node_id.indices.translog + - is_false: nodes.$node_id.indices.recovery --- "Metric - recovery": diff --git a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java new file mode 100644 index 0000000000000..f270cb1399072 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java @@ -0,0 +1,276 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.nodestats; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.hamcrest.MatcherAssert; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@ClusterScope(scope = Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +public class NodeStatsIT extends OpenSearchIntegTestCase { + + private final DocStatusStats expectedDocStatusStats = new DocStatusStats(); + private static final String FIELD = "dummy_field"; + private static final String VALUE = "dummy_value"; + private static final Map SOURCE = singletonMap(FIELD, VALUE); + + public void testNodeIndicesStatsDocStatusStatsIndexBulk() { + { // Testing Index + final String INDEX = "test_index"; + final String ID = "id"; + { // Testing Normal Index + IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet(); + updateExpectedDocStatusCounter(response); + + MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertDocStatusStats(); + } + { // Testing Missing Alias + updateExpectedDocStatusCounter( + expectThrows( + IndexNotFoundException.class, + () -> client().index(new IndexRequest(INDEX).id("missing_alias").setRequireAlias(true).source(SOURCE)).actionGet() + ) + ); + assertDocStatusStats(); + } + { + // Test Missing Pipeline: Ingestion failure, not Indexing failure + expectThrows( + IllegalArgumentException.class, + () -> client().index(new IndexRequest(INDEX).id("missing_pipeline").setPipeline("missing").source(SOURCE)).actionGet() + ); + assertDocStatusStats(); + } + { // Testing Version Conflict + final String docId = "version_conflict"; + + updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet()); + updateExpectedDocStatusCounter( + expectThrows( + VersionConflictEngineException.class, + () -> client().index(new IndexRequest(INDEX).id(docId).source(SOURCE).setIfSeqNo(1L).setIfPrimaryTerm(99L)) + .actionGet() + ) + ); + assertDocStatusStats(); + } + } + { // Testing Bulk + final String INDEX = "bulk_index"; + + int sizeOfIndexRequests = scaledRandomIntBetween(10, 20); + int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + int sizeOfNotFoundRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + + BulkRequest bulkRequest = new BulkRequest(); + + for (int i = 0; i < sizeOfIndexRequests; ++i) { + bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(SOURCE)); + } + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + updateExpectedDocStatusCounter(itemResponse.getResponse()); + } + + refresh(INDEX); + bulkRequest.requests().clear(); + + for (int i = 0; i < sizeOfDeleteRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i))); + } + for (int i = 0; i < sizeOfNotFoundRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i))); + } + + response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFoundRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + updateExpectedDocStatusCounter(itemResponse.getResponse()); + } + + refresh(INDEX); + assertDocStatusStats(); + } + } + + public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() { + { // Testing Create + final String INDEX = "create_index"; + final String ID = "id"; + { // Testing Creation + IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE).create(true)).actionGet(); + updateExpectedDocStatusCounter(response); + + MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertDocStatusStats(); + } + { // Testing Version Conflict + final String docId = "version_conflict"; + + updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet()); + updateExpectedDocStatusCounter( + expectThrows( + VersionConflictEngineException.class, + () -> client().index(new IndexRequest(INDEX).id(docId).source(SOURCE).create(true)).actionGet() + ) + ); + assertDocStatusStats(); + } + } + { // Testing Delete + final String INDEX = "delete_index"; + final String ID = "id"; + { // Testing Deletion + IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet(); + updateExpectedDocStatusCounter(response); + + DeleteResponse deleteResponse = client().delete(new DeleteRequest(INDEX, ID)).actionGet(); + updateExpectedDocStatusCounter(deleteResponse); + + MatcherAssert.assertThat(response.getSeqNo(), greaterThanOrEqualTo(0L)); + MatcherAssert.assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED)); + assertDocStatusStats(); + } + { // Testing Non-Existing Doc + updateExpectedDocStatusCounter(client().delete(new DeleteRequest(INDEX, "does_not_exist")).actionGet()); + assertDocStatusStats(); + } + { // Testing Version Conflict + final String docId = "version_conflict"; + + updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet()); + updateExpectedDocStatusCounter( + expectThrows( + VersionConflictEngineException.class, + () -> client().delete(new DeleteRequest(INDEX, docId).setIfSeqNo(2L).setIfPrimaryTerm(99L)).actionGet() + ) + ); + + assertDocStatusStats(); + } + } + { // Testing Update + final String INDEX = "update_index"; + final String ID = "id"; + { // Testing Not Found + updateExpectedDocStatusCounter( + expectThrows( + DocumentMissingException.class, + () -> client().update(new UpdateRequest(INDEX, ID).doc(SOURCE)).actionGet() + ) + ); + assertDocStatusStats(); + } + { // Testing NoOp Update + updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet()); + + UpdateResponse response = client().update(new UpdateRequest(INDEX, ID).doc(SOURCE)).actionGet(); + updateExpectedDocStatusCounter(response); + + MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.NOOP)); + assertDocStatusStats(); + } + { // Testing Update + final String UPDATED_VALUE = "updated_value"; + UpdateResponse response = client().update(new UpdateRequest(INDEX, ID).doc(singletonMap(FIELD, UPDATED_VALUE))).actionGet(); + updateExpectedDocStatusCounter(response); + + MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.UPDATED)); + assertDocStatusStats(); + } + { // Testing Missing Alias + updateExpectedDocStatusCounter( + expectThrows( + IndexNotFoundException.class, + () -> client().update(new UpdateRequest(INDEX, ID).setRequireAlias(true).doc(new IndexRequest().source(SOURCE))) + .actionGet() + ) + ); + assertDocStatusStats(); + } + { // Testing Version Conflict + final String docId = "version_conflict"; + + updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet()); + updateExpectedDocStatusCounter( + expectThrows( + VersionConflictEngineException.class, + () -> client().update(new UpdateRequest(INDEX, docId).doc(SOURCE).setIfSeqNo(2L).setIfPrimaryTerm(99L)).actionGet() + ) + ); + assertDocStatusStats(); + } + } + } + + private void assertDocStatusStats() { + DocStatusStats docStatusStats = client().admin() + .cluster() + .prepareNodesStats() + .execute() + .actionGet() + .getNodes() + .get(0) + .getIndices() + .getIndexing() + .getTotal() + .getDocStatusStats(); + + assertTrue( + Arrays.equals( + docStatusStats.getDocStatusCounter(), + expectedDocStatusStats.getDocStatusCounter(), + Comparator.comparingLong(AtomicLong::longValue) + ) + ); + } + + private void updateExpectedDocStatusCounter(DocWriteResponse r) { + expectedDocStatusStats.inc(r.status()); + } + + private void updateExpectedDocStatusCounter(Exception e) { + expectedDocStatusStats.inc(ExceptionsHelper.status(e)); + } + +} diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index d8b7facc09df2..726ba7ba119af 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -78,7 +78,9 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; import org.opensearch.indices.IndexClosedException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; @@ -129,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); @@ -632,8 +650,11 @@ public void onResponse(BulkShardResponse bulkShardResponse) { if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } + + docStatusStats.inc(bulkItemResponse.status()); responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } + if (counter.decrementAndGet() == 0) { finishHim(); } @@ -644,22 +665,24 @@ public void onFailure(Exception e) { // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - DocWriteRequest docWriteRequest = request.request(); - responses.set( + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( request.id(), - new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ) + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ); + + docStatusStats.inc(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); } + if (counter.decrementAndGet() == 0) { finishHim(); } } private void finishHim() { + indicesService.addDocStatusStats(docStatusStats); listener.onResponse( new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) ); @@ -766,6 +789,10 @@ void executeBulk( final AtomicArray responses, Map indicesThatCannotBeCreated ) { + /* + * We are not wrapping the listener here to capture the response codes for performance benefits. It will + * be saving us an iteration over the responses array + */ new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); } diff --git a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java index 95735f71a38e7..819112eb497f6 100644 --- a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java @@ -32,6 +32,7 @@ package org.opensearch.action.update; +import org.opensearch.ExceptionsHelper; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionRunnable; import org.opensearch.action.DocWriteRequest; @@ -62,11 +63,13 @@ import org.opensearch.core.common.io.stream.NotSerializableExceptionWrapper; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaType; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; import org.opensearch.indices.IndicesService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -154,10 +157,13 @@ public static void resolveAndValidateRouting(Metadata metadata, String concreteI @Override protected void doExecute(Task task, final UpdateRequest request, final ActionListener listener) { if (request.isRequireAlias() && (clusterService.state().getMetadata().hasAlias(request.index()) == false)) { - throw new IndexNotFoundException( + IndexNotFoundException e = new IndexNotFoundException( "[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true] and [" + request.index() + "] is not an alias", request.index() ); + + incDocStatusStats(e); + throw e; } // if we don't have a master, we don't have metadata, that's fine, let it find a cluster-manager using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { @@ -193,7 +199,10 @@ public void onFailure(Exception e) { } private void innerExecute(final Task task, final UpdateRequest request, final ActionListener listener) { - super.doExecute(task, request, listener); + super.doExecute(task, request, ActionListener.wrap(listener::onResponse, e -> { + incDocStatusStats(e); + listener.onFailure(e); + })); } @Override @@ -330,7 +339,13 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< shard.noopUpdate(); } } + + DocStatusStats stats = new DocStatusStats(); + stats.inc(RestStatus.OK); + + indicesService.addDocStatusStats(stats); listener.onResponse(update); + break; default: throw new IllegalStateException("Illegal result " + result.getResponseResult()); @@ -361,4 +376,10 @@ private void handleUpdateFailureWithRetry( } listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); } + + private void incDocStatusStats(final Exception e) { + DocStatusStats stats = new DocStatusStats(); + stats.inc(ExceptionsHelper.status(e)); + indicesService.addDocStatusStats(stats); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java index 8953ef38da51b..f1abea81a6511 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java @@ -37,6 +37,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * Tracks indexing statistics @@ -59,6 +61,89 @@ public class IndexingStats implements Writeable, ToXContentFragment { */ public static class Stats implements Writeable, ToXContentFragment { + /** + * Tracks item level rest category class codes during indexing + * + * @opensearch.internal + */ + public static class DocStatusStats implements Writeable, ToXContentFragment { + + final AtomicLong[] docStatusCounter; + + public DocStatusStats() { + docStatusCounter = new AtomicLong[5]; + for (int i = 0; i < docStatusCounter.length; ++i) { + docStatusCounter[i] = new AtomicLong(0); + } + } + + public DocStatusStats(StreamInput in) throws IOException { + docStatusCounter = in.readArray(i -> new AtomicLong(i.readLong()), AtomicLong[]::new); + + assert docStatusCounter.length == 5 : "Length of incoming array should be 5! Got " + docStatusCounter.length; + } + + /** + * Increment counter for status + * + * @param status {@link RestStatus} + */ + public void inc(final RestStatus status) { + add(status, 1L); + } + + /** + * Increment counter for status by count + * + * @param status {@link RestStatus} + * @param delta The value to add + */ + void add(final RestStatus status, final long delta) { + docStatusCounter[status.getStatusFamilyCode() - 1].addAndGet(delta); + } + + /** + * Accumulate stats from the passed Object + * + * @param stats Instance storing {@link DocStatusStats} + */ + public void add(final DocStatusStats stats) { + if (null == stats) { + return; + } + + for (int i = 0; i < docStatusCounter.length; ++i) { + docStatusCounter[i].addAndGet(stats.docStatusCounter[i].longValue()); + } + } + + public AtomicLong[] getDocStatusCounter() { + return docStatusCounter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.DOC_STATUS); + + for (int i = 0; i < docStatusCounter.length; ++i) { + long value = docStatusCounter[i].longValue(); + + if (value > 0) { + String key = i + 1 + "xx"; + builder.field(key, value); + } + } + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeArray((o, v) -> o.writeLong(v.longValue()), docStatusCounter); + } + + } + private long indexCount; private long indexTimeInMillis; private long indexCurrent; @@ -69,8 +154,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long noopUpdateCount; private long throttleTimeInMillis; private boolean isThrottled; + private final DocStatusStats docStatusStats; - Stats() {} + Stats() { + docStatusStats = new DocStatusStats(); + } public Stats(StreamInput in) throws IOException { indexCount = in.readVLong(); @@ -83,6 +171,12 @@ public Stats(StreamInput in) throws IOException { noopUpdateCount = in.readVLong(); isThrottled = in.readBoolean(); throttleTimeInMillis = in.readLong(); + + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + docStatusStats = in.readOptionalWriteable(DocStatusStats::new); + } else { + docStatusStats = null; + } } public Stats( @@ -95,7 +189,8 @@ public Stats( long deleteCurrent, long noopUpdateCount, boolean isThrottled, - long throttleTimeInMillis + long throttleTimeInMillis, + DocStatusStats docStatusStats ) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; @@ -107,6 +202,7 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; + this.docStatusStats = docStatusStats; } public void add(Stats stats) { @@ -121,8 +217,10 @@ public void add(Stats stats) { noopUpdateCount += stats.noopUpdateCount; throttleTimeInMillis += stats.throttleTimeInMillis; - if (isThrottled != stats.isThrottled) { - isThrottled = true; // When combining if one is throttled set result to throttled. + isThrottled |= stats.isThrottled; // When combining if one is throttled set result to throttled. + + if (getDocStatusStats() != null) { + getDocStatusStats().add(stats.getDocStatusStats()); } } @@ -193,6 +291,10 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + public DocStatusStats getDocStatusStats() { + return docStatusStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); @@ -206,6 +308,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isThrottled); out.writeLong(throttleTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(docStatusStats); + } } @Override @@ -223,8 +328,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.IS_THROTTLED, isThrottled); builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); + + if (getDocStatusStats() != null) { + getDocStatusStats().toXContent(builder, params); + } + return builder; } + } private final Stats totalStats; @@ -279,7 +390,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par * * @opensearch.internal */ - static final class Fields { + private static final class Fields { static final String INDEXING = "indexing"; static final String INDEX_TOTAL = "index_total"; static final String INDEX_TIME = "index_time"; @@ -294,6 +405,7 @@ static final class Fields { static final String IS_THROTTLED = "is_throttled"; static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis"; static final String THROTTLED_TIME = "throttle_time"; + static final String DOC_STATUS = "doc_status"; } @Override @@ -303,4 +415,5 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + } diff --git a/server/src/main/java/org/opensearch/index/shard/InternalIndexingStats.java b/server/src/main/java/org/opensearch/index/shard/InternalIndexingStats.java index d7e15dd3e40f5..55b65bb4be6d8 100644 --- a/server/src/main/java/org/opensearch/index/shard/InternalIndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/InternalIndexingStats.java @@ -154,7 +154,8 @@ IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) { deleteCurrent.count(), noopUpdates.count(), isThrottled, - TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis) + TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis), + new IndexingStats.Stats.DocStatusStats() ); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 6f4c51940ce87..a72142e65c5e8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -133,6 +133,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; +import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; @@ -1058,6 +1059,15 @@ public IndicesQueryCache getIndicesQueryCache() { return indicesQueryCache; } + /** + * Accumulate stats from the passed Object + * + * @param stats Instance storing {@link DocStatusStats} + */ + public void addDocStatusStats(final DocStatusStats stats) { + oldShardsStats.indexingStats.getTotal().getDocStatusStats().add(stats); + } + /** * Statistics for old shards * diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 9ef0f85893fc8..0f67eff26cbde 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase( Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ) { @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 5410d6a88a5b9..515f6eae28a34 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -171,6 +171,7 @@ class TestTransportBulkAction extends TransportBulkAction { SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 098bd8e8d8cfe..10cad6fb147a2 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -60,6 +60,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.SystemIndices; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -88,6 +89,7 @@ import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; public class TransportBulkActionTests extends OpenSearchTestCase { @@ -115,6 +117,7 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), + mock(IndicesService.class), new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 829eee45cac5b..852e3837e1e7a 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -280,6 +280,7 @@ static class TestTransportBulkAction extends TransportBulkAction { indexNameExpressionResolver, autoCreateIndex, new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()), relativeTimeProvider ); diff --git a/server/src/test/java/org/opensearch/core/RestStatusTests.java b/server/src/test/java/org/opensearch/core/RestStatusTests.java new file mode 100644 index 0000000000000..f8dba99aa8b60 --- /dev/null +++ b/server/src/test/java/org/opensearch/core/RestStatusTests.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.action.ShardOperationFailedException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.PriorityQueue; + +public class RestStatusTests extends OpenSearchTestCase { + + public void testStatusReturns200ForNoFailures() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = randomIntBetween(1, totalShards); + + assertEquals(RestStatus.OK, RestStatus.status(successfulShards, totalShards)); + } + + public void testStatusReturns503ForUnavailableShards() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = 0; + + assertEquals(RestStatus.SERVICE_UNAVAILABLE, RestStatus.status(successfulShards, totalShards)); + } + + public void testStatusReturnsFailureStatusWhenFailuresExist() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = 0; + + TestException[] failures = new TestException[totalShards]; + PriorityQueue heapOfFailures = new PriorityQueue<>((x, y) -> y.status().compareTo(x.status())); + + for (int i = 0; i < totalShards; ++i) { + /* + * Status here doesn't need to convey failure and is not as per rest + * contract. We're not testing the contract, but if status() returns + * the greatest rest code from the failures selection + */ + RestStatus status = randomFrom(RestStatus.values()); + TestException failure = new TestException(status); + + failures[i] = failure; + heapOfFailures.add(failure); + } + + assertEquals(heapOfFailures.peek().status(), RestStatus.status(successfulShards, totalShards, failures)); + } + + public void testSerialization() throws IOException { + final RestStatus status = randomFrom(RestStatus.values()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + RestStatus.writeTo(out, status); + + try (StreamInput in = out.bytes().streamInput()) { + RestStatus deserializedStatus = RestStatus.readFrom(in); + + assertEquals(status, deserializedStatus); + } + } + } + + private static class TestException extends ShardOperationFailedException { + TestException(final RestStatus status) { + super("super-idx", randomInt(), "gone-fishing", status, new Throwable("cake")); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new IOException("not implemented"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + throw new IOException("not implemented"); + } + } + +} diff --git a/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java b/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java new file mode 100644 index 0000000000000..acf482552c260 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.concurrent.atomic.AtomicLong; + +public class IndexingStatsTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + IndexingStats stats = createTestInstance(); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + stats.writeTo(out); + + try (StreamInput in = out.bytes().streamInput()) { + IndexingStats deserializedStats = new IndexingStats(in); + + if (stats.getTotal() == null) { + assertNull(deserializedStats.getTotal()); + return; + } + + IndexingStats.Stats totalStats = stats.getTotal(); + IndexingStats.Stats deserializedTotalStats = deserializedStats.getTotal(); + + assertEquals(totalStats.getIndexCount(), deserializedTotalStats.getIndexCount()); + assertEquals(totalStats.getIndexTime(), deserializedTotalStats.getIndexTime()); + assertEquals(totalStats.getIndexCurrent(), deserializedTotalStats.getIndexCurrent()); + assertEquals(totalStats.getIndexFailedCount(), deserializedTotalStats.getIndexFailedCount()); + assertEquals(totalStats.getDeleteCount(), deserializedTotalStats.getDeleteCount()); + assertEquals(totalStats.getDeleteTime(), deserializedTotalStats.getDeleteTime()); + assertEquals(totalStats.getDeleteCurrent(), deserializedTotalStats.getDeleteCurrent()); + assertEquals(totalStats.getNoopUpdateCount(), deserializedTotalStats.getNoopUpdateCount()); + assertEquals(totalStats.isThrottled(), deserializedTotalStats.isThrottled()); + assertEquals(totalStats.getThrottleTime(), deserializedTotalStats.getThrottleTime()); + + if (totalStats.getDocStatusStats() == null) { + assertNull(deserializedTotalStats.getDocStatusStats()); + return; + } + + IndexingStats.Stats.DocStatusStats docStatusStats = totalStats.getDocStatusStats(); + IndexingStats.Stats.DocStatusStats deserializedDocStatusStats = deserializedTotalStats.getDocStatusStats(); + + assertTrue( + Arrays.equals( + docStatusStats.getDocStatusCounter(), + deserializedDocStatusStats.getDocStatusCounter(), + Comparator.comparingLong(AtomicLong::longValue) + ) + ); + } + } + } + + public void testToXContentForIndexingStats() throws IOException { + IndexingStats stats = createTestInstance(); + IndexingStats.Stats totalStats = stats.getTotal(); + AtomicLong[] counter = totalStats.getDocStatusStats().getDocStatusCounter(); + + String expected = "{\"indexing\":{\"index_total\":" + + totalStats.getIndexCount() + + ",\"index_time_in_millis\":" + + totalStats.getIndexTime().getMillis() + + ",\"index_current\":" + + totalStats.getIndexCurrent() + + ",\"index_failed\":" + + totalStats.getIndexFailedCount() + + ",\"delete_total\":" + + totalStats.getDeleteCount() + + ",\"delete_time_in_millis\":" + + totalStats.getDeleteTime().getMillis() + + ",\"delete_current\":" + + totalStats.getDeleteCurrent() + + ",\"noop_update_total\":" + + totalStats.getNoopUpdateCount() + + ",\"is_throttled\":" + + totalStats.isThrottled() + + ",\"throttle_time_in_millis\":" + + totalStats.getThrottleTime().getMillis() + + ",\"doc_status\":{\"1xx\":" + + counter[0] + + ",\"2xx\":" + + counter[1] + + ",\"3xx\":" + + counter[2] + + ",\"4xx\":" + + counter[3] + + ",\"5xx\":" + + counter[4] + + "}}}"; + + XContentBuilder xContentBuilder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON); + xContentBuilder.startObject(); + xContentBuilder = stats.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + + assertEquals(expected, xContentBuilder.toString()); + } + + private IndexingStats createTestInstance() { + IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats(); + for (int i = 1; i < 6; ++i) { + docStatusStats.add(RestStatus.fromCode(i * 100), randomNonNegativeLong()); + } + + IndexingStats.Stats stats = new IndexingStats.Stats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomBoolean(), + randomNonNegativeLong(), + docStatusStats + ); + + return new IndexingStats(stats); + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 2b432906ee128..09f5c1bea1a5e 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2210,6 +2210,7 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), + mock(IndicesService.class), new SystemIndices(emptyMap()) ) );