diff --git a/CHANGELOG.md b/CHANGELOG.md
index d3dfe121a8522..9a07931f2c3c0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
+- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))
### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
diff --git a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java
index ae4f4c65b28d2..06f6fe46ffbf8 100644
--- a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java
+++ b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java
@@ -431,6 +431,13 @@ public enum RestStatus {
* next-hop server.
*/
EXPECTATION_FAILED(417),
+ /**
+ * Any attempt to brew coffee with a teapot should result in the error code "418 I'm a teapot". The resulting
+ * entity body MAY be short and stout.
+ *
+ * @see I'm a teapot!
+ */
+ I_AM_A_TEAPOT(418),
/**
* The request was directed at a server that is not able to produce a response. This can be sent by a server
* that is not configured to produce responses for the combination of scheme and authority that are included
@@ -559,4 +566,8 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper
public static RestStatus fromCode(int code) {
return CODE_TO_STATUS.get(code);
}
+
+ public static Boolean isValidRestCode(int code) {
+ return fromCode(code) != null;
+ }
}
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml
index 1f1f42890355e..b0ce51902a7cc 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml
+++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml
@@ -138,6 +138,34 @@
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery
+---
+"Metric - indexing doc_status":
+ - skip:
+ features: [arbitrary_key]
+ - do:
+ nodes.info: {}
+ - set:
+ nodes._arbitrary_key_: node_id
+
+ - do:
+ nodes.stats: { metric: indices, index_metric: indexing }
+
+ - is_false: nodes.$node_id.indices.docs
+ - is_false: nodes.$node_id.indices.store
+ - is_true: nodes.$node_id.indices.indexing
+ - is_true: nodes.$node_id.indices.indexing.doc_status
+ - is_false: nodes.$node_id.indices.get
+ - is_false: nodes.$node_id.indices.search
+ - is_false: nodes.$node_id.indices.merges
+ - is_false: nodes.$node_id.indices.refresh
+ - is_false: nodes.$node_id.indices.flush
+ - is_false: nodes.$node_id.indices.warmer
+ - is_false: nodes.$node_id.indices.query_cache
+ - is_false: nodes.$node_id.indices.fielddata
+ - is_false: nodes.$node_id.indices.completion
+ - is_false: nodes.$node_id.indices.segments
+ - is_false: nodes.$node_id.indices.translog
+ - is_false: nodes.$node_id.indices.recovery
---
"Metric - recovery":
diff --git a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java
new file mode 100644
index 0000000000000..1e58dbaed7f65
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java
@@ -0,0 +1,107 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.nodestats;
+
+import org.hamcrest.MatcherAssert;
+import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+import org.opensearch.index.shard.IndexingStats;
+import org.opensearch.test.OpenSearchIntegTestCase;
+import org.opensearch.test.OpenSearchIntegTestCase.Scope;
+import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.equalTo;
+
+@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
+public class NodeStatsIT extends OpenSearchIntegTestCase {
+
+ private static final String INDEX = "test_index";
+ private static final Map expectedDocStatusCounter;
+
+ static {
+ expectedDocStatusCounter = new HashMap<>();
+ expectedDocStatusCounter.put(200, new AtomicLong(0));
+ expectedDocStatusCounter.put(201, new AtomicLong(0));
+ expectedDocStatusCounter.put(404, new AtomicLong(0));
+ }
+
+ public void testNodeIndicesStatsBulk() {
+ int sizeOfIndexRequests = scaledRandomIntBetween(10, 20);
+ int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);
+ int sizeOfNotFountRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);
+
+ BulkRequest bulkRequest = new BulkRequest();
+
+ for (int i = 0; i < sizeOfIndexRequests; ++i) {
+ bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(Requests.INDEX_CONTENT_TYPE, "field", "value"));
+ }
+
+ BulkResponse response = client().bulk(bulkRequest).actionGet();
+
+ MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
+ MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests));
+
+ for (BulkItemResponse itemResponse : response.getItems()) {
+ expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet();
+ }
+
+ refresh(INDEX);
+ bulkRequest.requests().clear();
+
+ for (int i = 0; i < sizeOfDeleteRequests; ++i) {
+ bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i)));
+ }
+ for (int i = 0; i < sizeOfNotFountRequests; ++i) {
+ bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i)));
+ }
+
+ response = client().bulk(bulkRequest).actionGet();
+
+ MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
+ MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFountRequests));
+
+ for (BulkItemResponse itemResponse : response.getItems()) {
+ expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet();
+ }
+
+ refresh(INDEX);
+
+ NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
+ IndexingStats.Stats stats = nodesStatsResponse.getNodes().get(0).getIndices().getIndexing().getTotal();
+
+ MatcherAssert.assertThat(stats.getIndexCount(), greaterThan(0L));
+ MatcherAssert.assertThat(stats.getIndexTime().duration(), greaterThan(0L));
+ MatcherAssert.assertThat(stats.getIndexCurrent(), notNullValue());
+ MatcherAssert.assertThat(stats.getIndexFailedCount(), notNullValue());
+ MatcherAssert.assertThat(stats.getDeleteCount(), greaterThan(0L));
+ MatcherAssert.assertThat(stats.getDeleteTime().duration(), greaterThan(0L));
+ MatcherAssert.assertThat(stats.getDeleteCurrent(), notNullValue());
+ MatcherAssert.assertThat(stats.getNoopUpdateCount(), notNullValue());
+ MatcherAssert.assertThat(stats.isThrottled(), notNullValue());
+ MatcherAssert.assertThat(stats.getThrottleTime(), notNullValue());
+
+ Map docStatusCounter = stats.getDocStatusStats().getDocStatusCounter();
+
+ for (Integer key : docStatusCounter.keySet()) {
+ MatcherAssert.assertThat(docStatusCounter.get(key).longValue(), equalTo(expectedDocStatusCounter.get(key).longValue()));
+ }
+ }
+
+}
diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java
index 5a3a34e9a2ebe..7942594bf8652 100644
--- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java
+++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java
@@ -264,6 +264,43 @@ public CommonStats(StreamInput in) throws IOException {
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
}
+ // public: visible for testing
+ public CommonStats(
+ DocsStats docs,
+ StoreStats store,
+ IndexingStats indexing,
+ GetStats get,
+ SearchStats search,
+ MergeStats merge,
+ RefreshStats refresh,
+ FlushStats flush,
+ WarmerStats warmer,
+ QueryCacheStats queryCache,
+ FieldDataStats fieldData,
+ CompletionStats completion,
+ SegmentsStats segments,
+ TranslogStats translog,
+ RequestCacheStats requestCache,
+ RecoveryStats recoveryStats
+ ) {
+ this.docs = docs;
+ this.store = store;
+ this.indexing = indexing;
+ this.get = get;
+ this.search = search;
+ this.merge = merge;
+ this.refresh = refresh;
+ this.flush = flush;
+ this.warmer = warmer;
+ this.queryCache = queryCache;
+ this.fieldData = fieldData;
+ this.completion = completion;
+ this.segments = segments;
+ this.translog = translog;
+ this.requestCache = requestCache;
+ this.recoveryStats = recoveryStats;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(docs);
diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java
index f6ca9022a5bff..d55d03101bd94 100644
--- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java
@@ -79,6 +79,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.IndexClosedException;
+import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
@@ -129,6 +130,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = request.request();
- responses.set(
+ final DocWriteRequest> docWriteRequest = request.request();
+ final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
- new BulkItemResponse(
- request.id(),
- docWriteRequest.opType(),
- new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
- )
+ docWriteRequest.opType(),
+ new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
+
+ indicesService.incrementDocStatusCounter(bulkItemResponse.status());
+ responses.set(request.id(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index d70ea16cf5fdd..8d2cda829c768 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -261,6 +261,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
IndicesService.CLUSTER_REPLICATION_TYPE_SETTING,
+ IndicesService.INDEXING_DOC_STATUS_KEYS_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
Metadata.SETTING_READ_ONLY_SETTING,
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java
index f45417a20036e..88cd118dfe465 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java
@@ -44,6 +44,9 @@
import java.io.IOException;
import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Tracks indexing statistics
@@ -59,6 +62,74 @@ public class IndexingStats implements Writeable, ToXContentFragment {
*/
public static class Stats implements Writeable, ToXContentFragment {
+ /**
+ * Tracks item level rest status codes during indexing
+ *
+ * @opensearch.internal
+ */
+ public static class DocStatusStats implements Writeable, ToXContentFragment {
+
+ private final Map docStatusCounter;
+
+ public DocStatusStats() {
+ this(new ConcurrentHashMap<>());
+ }
+
+ // public: visible for testing
+ public DocStatusStats(Map docStatusCounter) {
+ this.docStatusCounter = docStatusCounter;
+ }
+
+ public DocStatusStats(StreamInput in) throws IOException {
+ int size = in.readInt();
+ docStatusCounter = new ConcurrentHashMap<>();
+
+ for (int i = 0; i < size; ++i) {
+ docStatusCounter.put(in.readInt(), new AtomicLong(in.readLong()));
+ }
+ }
+
+ public void add(DocStatusStats stats) {
+ for (Map.Entry entry : stats.docStatusCounter.entrySet()) {
+ int k = entry.getKey();
+ AtomicLong v = entry.getValue();
+
+ docStatusCounter.computeIfAbsent(k, x -> new AtomicLong(0));
+ docStatusCounter.get(k).addAndGet(v.longValue());
+ }
+ }
+
+ public void inc(int status) {
+ docStatusCounter.computeIfAbsent(status, x -> new AtomicLong(0));
+ docStatusCounter.get(status).incrementAndGet();
+ }
+
+ public Map getDocStatusCounter() {
+ return docStatusCounter;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(Fields.DOC_STATUS);
+
+ for (Map.Entry entry : docStatusCounter.entrySet()) {
+ builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue());
+ }
+
+ return builder.endObject();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeInt(docStatusCounter.size());
+
+ for (Map.Entry entry : docStatusCounter.entrySet()) {
+ out.writeInt(entry.getKey());
+ out.writeLong(entry.getValue().longValue());
+ }
+ }
+ }
+
private long indexCount;
private long indexTimeInMillis;
private long indexCurrent;
@@ -70,7 +141,11 @@ public static class Stats implements Writeable, ToXContentFragment {
private long throttleTimeInMillis;
private boolean isThrottled;
- Stats() {}
+ private final DocStatusStats docStatusStats;
+
+ Stats() {
+ docStatusStats = new DocStatusStats();
+ }
public Stats(StreamInput in) throws IOException {
indexCount = in.readVLong();
@@ -83,6 +158,12 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();
+
+ if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
+ docStatusStats = in.readOptionalWriteable(DocStatusStats::new);
+ } else {
+ docStatusStats = null;
+ }
}
public Stats(
@@ -96,6 +177,35 @@ public Stats(
long noopUpdateCount,
boolean isThrottled,
long throttleTimeInMillis
+ ) {
+ this(
+ indexCount,
+ indexTimeInMillis,
+ indexCurrent,
+ indexFailedCount,
+ deleteCount,
+ deleteTimeInMillis,
+ deleteCurrent,
+ noopUpdateCount,
+ isThrottled,
+ throttleTimeInMillis,
+ new DocStatusStats()
+ );
+ }
+
+ // public: visible for testing
+ public Stats(
+ long indexCount,
+ long indexTimeInMillis,
+ long indexCurrent,
+ long indexFailedCount,
+ long deleteCount,
+ long deleteTimeInMillis,
+ long deleteCurrent,
+ long noopUpdateCount,
+ boolean isThrottled,
+ long throttleTimeInMillis,
+ DocStatusStats docStatusStats
) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
@@ -107,6 +217,7 @@ public Stats(
this.noopUpdateCount = noopUpdateCount;
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
+ this.docStatusStats = docStatusStats;
}
public void add(Stats stats) {
@@ -124,6 +235,10 @@ public void add(Stats stats) {
if (isThrottled != stats.isThrottled) {
isThrottled = true; // When combining if one is throttled set result to throttled.
}
+
+ if (getDocStatusStats() != null) {
+ getDocStatusStats().add(stats.getDocStatusStats());
+ }
}
/**
@@ -193,6 +308,10 @@ public long getNoopUpdateCount() {
return noopUpdateCount;
}
+ public DocStatusStats getDocStatusStats() {
+ return docStatusStats;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
@@ -206,6 +325,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);
+ if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
+ out.writeOptionalWriteable(docStatusStats);
+ }
}
@Override
@@ -223,6 +345,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.IS_THROTTLED, isThrottled);
builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());
+
+ if (getDocStatusStats() != null) {
+ getDocStatusStats().toXContent(builder, params);
+ }
+
return builder;
}
}
@@ -294,6 +421,7 @@ static final class Fields {
static final String IS_THROTTLED = "is_throttled";
static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis";
static final String THROTTLED_TIME = "throttle_time";
+ static final String DOC_STATUS = "doc_status";
}
@Override
diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java
index 0ffad8ce65b7a..64fbde7b188c2 100644
--- a/server/src/main/java/org/opensearch/indices/IndicesService.java
+++ b/server/src/main/java/org/opensearch/indices/IndicesService.java
@@ -150,6 +150,7 @@
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.core.rest.RestStatus;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.internal.AliasFilter;
@@ -283,6 +284,47 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);
+ private static final List INDEXING_DOC_STATUS_DEFAULT_KEYS = List.of(
+ "200",
+ "201",
+ "400",
+ "401",
+ "403",
+ "404",
+ "405",
+ "409",
+ "412",
+ "429",
+ "500",
+ "502",
+ "503",
+ "504"
+ //200, 201, 400, 409, 413, 429, 500, 503, 504
+ );
+
+ private static String validateDocStatusKey(String key) {
+ int result;
+
+ try {
+ result = Integer.parseInt(key);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Illegal value for rest status code: " + key);
+ }
+
+ if (RestStatus.isValidRestCode(result)) {
+ return key;
+ } else {
+ throw new IllegalArgumentException("Illegal value for rest status code: " + key);
+ }
+ }
+
+ public static final Setting> INDEXING_DOC_STATUS_KEYS_SETTING = Setting.listSetting(
+ "cluster.doc_status_keys",
+ INDEXING_DOC_STATUS_DEFAULT_KEYS,
+ IndicesService::validateDocStatusKey,
+ Property.NodeScope
+ );
+
/**
* The node's settings.
*/
@@ -1012,6 +1054,14 @@ public IndicesQueryCache getIndicesQueryCache() {
return indicesQueryCache;
}
+ public void incrementDocStatusCounter(final RestStatus status) {
+ int code = status.getStatus();
+
+ if (INDEXING_DOC_STATUS_KEYS_SETTING.get(clusterService.getSettings()).contains(String.valueOf(code))) {
+ oldShardsStats.indexingStats.getTotal().getDocStatusStats().inc(code);
+ }
+ }
+
/**
* Statistics for old shards
*
diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java
index cbf7032b50ca5..c367c6936182b 100644
--- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java
+++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java
@@ -32,6 +32,7 @@
package org.opensearch.action.admin.cluster.node.stats;
+import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
@@ -42,6 +43,8 @@
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.http.HttpStats;
+import org.opensearch.index.shard.IndexingStats;
+import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.indices.breaker.CircuitBreakerStats;
import org.opensearch.ingest.IngestStats;
@@ -65,6 +68,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -83,6 +87,31 @@ public void testSerialization() throws IOException {
NodeStats deserializedNodeStats = new NodeStats(in);
assertEquals(nodeStats.getNode(), deserializedNodeStats.getNode());
assertEquals(nodeStats.getTimestamp(), deserializedNodeStats.getTimestamp());
+ if (nodeStats.getIndices() == null) {
+ assertNull(deserializedNodeStats.getIndices());
+ } else {
+ IndexingStats.Stats indexingStats = nodeStats.getIndices().getIndexing().getTotal();
+ IndexingStats.Stats deserializedIndexingStats = deserializedNodeStats.getIndices().getIndexing().getTotal();
+
+ assertEquals(indexingStats.getIndexCount(), deserializedIndexingStats.getIndexCount());
+ assertEquals(indexingStats.getIndexTime(), deserializedIndexingStats.getIndexTime());
+ assertEquals(indexingStats.getIndexCurrent(), deserializedIndexingStats.getIndexCurrent());
+ assertEquals(indexingStats.getIndexFailedCount(), deserializedIndexingStats.getIndexFailedCount());
+ assertEquals(indexingStats.getDeleteCount(), deserializedIndexingStats.getDeleteCount());
+ assertEquals(indexingStats.getDeleteCurrent(), deserializedIndexingStats.getDeleteCurrent());
+ assertEquals(indexingStats.getDeleteTime(), deserializedIndexingStats.getDeleteTime());
+ assertEquals(indexingStats.getNoopUpdateCount(), deserializedIndexingStats.getNoopUpdateCount());
+ assertEquals(indexingStats.isThrottled(), deserializedIndexingStats.isThrottled());
+ assertEquals(indexingStats.getThrottleTime(), deserializedIndexingStats.getThrottleTime());
+
+ Map indexingDocStatusCounter = indexingStats.getDocStatusStats().getDocStatusCounter();
+ Map deserializedDocStatusCounter = deserializedIndexingStats.getDocStatusStats()
+ .getDocStatusCounter();
+
+ for (Integer key : indexingDocStatusCounter.keySet()) {
+ assertEquals(indexingDocStatusCounter.get(key).longValue(), deserializedDocStatusCounter.get(key).longValue());
+ }
+ }
if (nodeStats.getOs() == null) {
assertNull(deserializedNodeStats.getOs());
} else {
@@ -722,7 +751,7 @@ public static NodeStats createNodeStats() {
return new NodeStats(
node,
randomNonNegativeLong(),
- null,
+ createNodeIndicesStats(),
osStats,
processStats,
jvmStats,
@@ -747,6 +776,40 @@ public static NodeStats createNodeStats() {
);
}
+ private static NodeIndicesStats createNodeIndicesStats() {
+ if (rarely()) {
+ return null;
+ }
+
+ Map docStatusCounter = new HashMap<>();
+ int size = randomInt(5);
+
+ for (int i = 0; i < size; ++i) {
+ docStatusCounter.put(randomInt(), new AtomicLong(randomLong()));
+ }
+
+ IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats(docStatusCounter);
+ IndexingStats.Stats stats = new IndexingStats.Stats(
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ Math.abs(randomInt()),
+ randomBoolean(),
+ Math.abs(randomInt()),
+ docStatusStats
+ );
+ IndexingStats indexingStats = new IndexingStats(stats);
+
+ return new NodeIndicesStats(
+ new CommonStats(null, null, indexingStats, null, null, null, null, null, null, null, null, null, null, null, null, null),
+ Collections.emptyMap()
+ );
+ }
+
private OperationStats getPipelineStats(List pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
index 0846a5f8dec5c..79237216ed16a 100644
--- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
@@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase(
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
+ null,
new SystemIndices(emptyMap())
) {
@Override
diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java
index 6a514b47e55a4..75928a179ad72 100644
--- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java
@@ -170,6 +170,7 @@ class TestTransportBulkAction extends TransportBulkAction {
SETTINGS,
new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
+ null,
new SystemIndices(emptyMap())
);
}
diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java
index d53b860e6524a..580bc46765809 100644
--- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java
@@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction {
new Resolver(),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
new IndexingPressureService(Settings.EMPTY, clusterService),
+ null,
new SystemIndices(emptyMap())
);
}
diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java
index 2361b69e9b82c..854b0478932bf 100644
--- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java
@@ -278,6 +278,7 @@ static class TestTransportBulkAction extends TransportBulkAction {
indexNameExpressionResolver,
autoCreateIndex,
new IndexingPressureService(Settings.EMPTY, clusterService),
+ null,
new SystemIndices(emptyMap()),
relativeTimeProvider
);
diff --git a/server/src/test/java/org/opensearch/rest/RestStatusTests.java b/server/src/test/java/org/opensearch/rest/RestStatusTests.java
new file mode 100644
index 0000000000000..56aa386ecbaaf
--- /dev/null
+++ b/server/src/test/java/org/opensearch/rest/RestStatusTests.java
@@ -0,0 +1,24 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.rest;
+
+import org.opensearch.core.rest.RestStatus;
+import org.opensearch.test.OpenSearchTestCase;
+
+public class RestStatusTests extends OpenSearchTestCase {
+
+ public void testValidRestCode() {
+ assertTrue(RestStatus.isValidRestCode(418));
+ }
+
+ public void testInvalidRestCode() {
+ assertFalse(RestStatus.isValidRestCode(1999));
+ }
+
+}
diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
index e461e457bead1..988eeedce7263 100644
--- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
@@ -1977,6 +1977,7 @@ public void onFailure(final Exception e) {
indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())),
new IndexingPressureService(settings, clusterService),
+ mock(IndicesService.class),
new SystemIndices(emptyMap())
)
);