Skip to content

Commit

Permalink
Indexing: add Doc status counter
Browse files Browse the repository at this point in the history
Currently, Opensearch returns a 200 OK response code for a Bulk API
call, even though there can be partial/complete failures within the
request E2E. Tracking these failures requires client to parse the
response on their side and make sense of them. But, a general idea
around trend in growth of different rest status codes at item level
can provide insights on how indexing engine is performing.

Signed-off-by: Rohit Ashiwal <rashiwal@amazon.com>
  • Loading branch information
r1walz committed Jul 17, 2023
1 parent ca74aac commit 877a5e2
Show file tree
Hide file tree
Showing 16 changed files with 472 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
11 changes: 11 additions & 0 deletions libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,13 @@ public enum RestStatus {
* next-hop server.
*/
EXPECTATION_FAILED(417),
/**
* Any attempt to brew coffee with a teapot should result in the error code "418 I'm a teapot". The resulting
* entity body MAY be short and stout.
*
* @see <a href="https://www.rfc-editor.org/rfc/rfc2324#section-2.3.2">I'm a teapot!</a>
*/
I_AM_A_TEAPOT(418),
/**
* The request was directed at a server that is not able to produce a response. This can be sent by a server
* that is not configured to produce responses for the combination of scheme and authority that are included
Expand Down Expand Up @@ -559,4 +566,8 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper
public static RestStatus fromCode(int code) {
return CODE_TO_STATUS.get(code);
}

public static Boolean isValidRestCode(int code) {
return fromCode(code) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,34 @@
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - indexing doc_status":
- skip:
features: [arbitrary_key]
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats: { metric: indices, index_metric: indexing }

- is_false: nodes.$node_id.indices.docs
- is_false: nodes.$node_id.indices.store
- is_true: nodes.$node_id.indices.indexing
- is_true: nodes.$node_id.indices.indexing.doc_status
- is_false: nodes.$node_id.indices.get
- is_false: nodes.$node_id.indices.search
- is_false: nodes.$node_id.indices.merges
- is_false: nodes.$node_id.indices.refresh
- is_false: nodes.$node_id.indices.flush
- is_false: nodes.$node_id.indices.warmer
- is_false: nodes.$node_id.indices.query_cache
- is_false: nodes.$node_id.indices.fielddata
- is_false: nodes.$node_id.indices.completion
- is_false: nodes.$node_id.indices.segments
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - recovery":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.nodestats;

import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Requests;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class NodeStatsIT extends OpenSearchIntegTestCase {

private static final String INDEX = "test_index";
private static final Map<Integer, AtomicLong> expectedDocStatusCounter;

static {
expectedDocStatusCounter = new HashMap<>();
expectedDocStatusCounter.put(200, new AtomicLong(0));
expectedDocStatusCounter.put(201, new AtomicLong(0));
expectedDocStatusCounter.put(404, new AtomicLong(0));
}

public void testNodeIndicesStatsBulk() {
int sizeOfIndexRequests = scaledRandomIntBetween(10, 20);
int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);
int sizeOfNotFountRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);

BulkRequest bulkRequest = new BulkRequest();

for (int i = 0; i < sizeOfIndexRequests; ++i) {
bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(Requests.INDEX_CONTENT_TYPE, "field", "value"));
}

BulkResponse response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet();
}

refresh(INDEX);
bulkRequest.requests().clear();

for (int i = 0; i < sizeOfDeleteRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i)));
}
for (int i = 0; i < sizeOfNotFountRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i)));
}

response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFountRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet();
}

refresh(INDEX);

NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
IndexingStats.Stats stats = nodesStatsResponse.getNodes().get(0).getIndices().getIndexing().getTotal();

MatcherAssert.assertThat(stats.getIndexCount(), greaterThan(0L));
MatcherAssert.assertThat(stats.getIndexTime().duration(), greaterThan(0L));
MatcherAssert.assertThat(stats.getIndexCurrent(), notNullValue());
MatcherAssert.assertThat(stats.getIndexFailedCount(), notNullValue());
MatcherAssert.assertThat(stats.getDeleteCount(), greaterThan(0L));
MatcherAssert.assertThat(stats.getDeleteTime().duration(), greaterThan(0L));
MatcherAssert.assertThat(stats.getDeleteCurrent(), notNullValue());
MatcherAssert.assertThat(stats.getNoopUpdateCount(), notNullValue());
MatcherAssert.assertThat(stats.isThrottled(), notNullValue());
MatcherAssert.assertThat(stats.getThrottleTime(), notNullValue());

Map<Integer, AtomicLong> docStatusCounter = stats.getDocStatusStats().getDocStatusCounter();

for (Integer key : docStatusCounter.keySet()) {
MatcherAssert.assertThat(docStatusCounter.get(key).longValue(), equalTo(expectedDocStatusCounter.get(key).longValue()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,43 @@ public CommonStats(StreamInput in) throws IOException {
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
}

// public: visible for testing
public CommonStats(
DocsStats docs,
StoreStats store,
IndexingStats indexing,
GetStats get,
SearchStats search,
MergeStats merge,
RefreshStats refresh,
FlushStats flush,
WarmerStats warmer,
QueryCacheStats queryCache,
FieldDataStats fieldData,
CompletionStats completion,
SegmentsStats segments,
TranslogStats translog,
RequestCacheStats requestCache,
RecoveryStats recoveryStats
) {
this.docs = docs;
this.store = store;
this.indexing = indexing;
this.get = get;
this.search = search;
this.merge = merge;
this.refresh = refresh;
this.flush = flush;
this.warmer = warmer;
this.queryCache = queryCache;
this.fieldData = fieldData;
this.completion = completion;
this.segments = segments;
this.translog = translog;
this.requestCache = requestCache;
this.recoveryStats = recoveryStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(docs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
Expand Down Expand Up @@ -129,6 +130,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;

@Inject
Expand All @@ -143,6 +145,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
) {
this(
Expand All @@ -156,6 +159,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
autoCreateIndex,
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
);
Expand All @@ -172,6 +176,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
Expand All @@ -187,6 +192,7 @@ public TransportBulkAction(
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressureService = indexingPressureService;
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -632,6 +638,8 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -644,15 +652,15 @@ public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
)
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
IndicesService.CLUSTER_REPLICATION_TYPE_SETTING,
IndicesService.INDEXING_DOC_STATUS_KEYS_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
Metadata.SETTING_READ_ONLY_SETTING,
Expand Down
Loading

0 comments on commit 877a5e2

Please sign in to comment.