Skip to content

Commit

Permalink
Add metric to count search responses
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Jun 21, 2024
1 parent b8b7101 commit e635084
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,41 @@ public long buildTookInMillis() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
ActionListener<SearchResponse> loggingAndMetrics = listener.delegateFailureAndWrap((l, searchResponse) -> {
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
// Deduplicate failures by exception message and index
ShardOperationFailedException[] groupedFailures = ExceptionsHelper.groupBy(searchResponse.getShardFailures());
for (ShardOperationFailedException f : groupedFailures) {
boolean causeHas500Status = false;
if (f.getCause() != null) {
causeHas500Status = ExceptionsHelper.status(f.getCause()).getStatus() >= 500;
}
if ((f.status().getStatus() >= 500 || causeHas500Status)
&& ExceptionsHelper.isNodeOrShardUnavailableTypeException(f.getCause()) == false) {
logger.warn("TransportSearchAction shard failure (partial results response)", f);
ActionListener<SearchResponse> loggingAndMetrics = new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis());
SearchResponseMetrics.ResponseCountTotalStatus responseCountTotalStatus =
SearchResponseMetrics.ResponseCountTotalStatus.SUCCESS;
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
// Deduplicate failures by exception message and index
ShardOperationFailedException[] groupedFailures = ExceptionsHelper.groupBy(searchResponse.getShardFailures());
for (ShardOperationFailedException f : groupedFailures) {
boolean causeHas500Status = false;
if (f.getCause() != null) {
causeHas500Status = ExceptionsHelper.status(f.getCause()).getStatus() >= 500;
}
if ((f.status().getStatus() >= 500 || causeHas500Status)
&& ExceptionsHelper.isNodeOrShardUnavailableTypeException(f.getCause()) == false) {
logger.warn("TransportSearchAction shard failure (partial results response)", f);
responseCountTotalStatus = SearchResponseMetrics.ResponseCountTotalStatus.PARTIAL_FAILURE;
}
}
}
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus);
listener.onResponse(searchResponse);
} catch (Exception e) {
onFailure(e);
}
}
l.onResponse(searchResponse);
});

@Override
public void onFailure(Exception e) {
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE);
listener.onFailure(e);
}
};
executeRequest((SearchTask) task, searchRequest, loggingAndMetrics, AsyncSearchActionProvider::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,37 @@ public TransportSearchScrollAction(

@Override
protected void doExecute(Task task, SearchScrollRequest request, ActionListener<SearchResponse> listener) {
ActionListener<SearchResponse> loggingAndMetrics = listener.delegateFailureAndWrap((l, searchResponse) -> {
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
ShardOperationFailedException[] groupedFailures = ExceptionsHelper.groupBy(searchResponse.getShardFailures());
for (ShardOperationFailedException f : groupedFailures) {
Throwable cause = f.getCause() == null ? f : f.getCause();
if (ExceptionsHelper.status(cause).getStatus() >= 500
&& ExceptionsHelper.isNodeOrShardUnavailableTypeException(cause) == false) {
logger.warn("TransportSearchScrollAction shard failure (partial results response)", f);
ActionListener<SearchResponse> loggingAndMetrics = new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
searchResponseMetrics.recordTookTime(searchResponse.getTookInMillis());
SearchResponseMetrics.ResponseCountTotalStatus responseCountTotalStatus =
SearchResponseMetrics.ResponseCountTotalStatus.SUCCESS;
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
ShardOperationFailedException[] groupedFailures = ExceptionsHelper.groupBy(searchResponse.getShardFailures());
for (ShardOperationFailedException f : groupedFailures) {
Throwable cause = f.getCause() == null ? f : f.getCause();
if (ExceptionsHelper.status(cause).getStatus() >= 500
&& ExceptionsHelper.isNodeOrShardUnavailableTypeException(cause) == false) {
logger.warn("TransportSearchScrollAction shard failure (partial results response)", f);
responseCountTotalStatus = SearchResponseMetrics.ResponseCountTotalStatus.PARTIAL_FAILURE;
}
}
}
searchResponseMetrics.incrementResponseCount(responseCountTotalStatus);
listener.onResponse(searchResponse);
} catch (Exception e) {
listener.onFailure(e);
}
}
l.onResponse(searchResponse);
});

@Override
public void onFailure(Exception e) {
searchResponseMetrics.incrementResponseCount(SearchResponseMetrics.ResponseCountTotalStatus.FAILURE);
listener.onFailure(e);
}
};
try {
ParsedScrollId scrollId = parseScrollId(request.scrollId());
Runnable action = switch (scrollId.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,55 @@

package org.elasticsearch.rest.action.search;

import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Map;

public class SearchResponseMetrics {

public enum ResponseCountTotalStatus {
SUCCESS,
PARTIAL_FAILURE,
FAILURE
}

public static final String RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME = "status";

public static final String TOOK_DURATION_TOTAL_HISTOGRAM_NAME = "es.search_response.took_durations.histogram";
public static final String RESPONSE_COUNT_TOTAL_COUNTER_NAME = "es.search_response.response_count.counter";

private final LongHistogram tookDurationTotalMillisHistogram;
private final LongCounter responseCountTotalCounter;

public SearchResponseMetrics(MeterRegistry meterRegistry) {
this(
meterRegistry.registerLongHistogram(
TOOK_DURATION_TOTAL_HISTOGRAM_NAME,
"The SearchResponse.took durations in milliseconds, expressed as a histogram",
"millis"
),
meterRegistry.registerLongCounter(
RESPONSE_COUNT_TOTAL_COUNTER_NAME,
"The cumulative total of search responses with an attribute to describe"
+ "success, partial failure, or failure, expressed as a counter and an attribute",
"count"
)
);
}

private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram) {
private SearchResponseMetrics(LongHistogram tookDurationTotalMillisHistogram, LongCounter responseCountTotalCounter) {
this.tookDurationTotalMillisHistogram = tookDurationTotalMillisHistogram;
this.responseCountTotalCounter = responseCountTotalCounter;
}

public long recordTookTime(long tookTime) {
tookDurationTotalMillisHistogram.record(tookTime);
return tookTime;
}

public void incrementResponseCount(ResponseCountTotalStatus responseCountTotalStatus) {
responseCountTotalCounter.incrementBy(1L, Map.of(RESPONSE_COUNT_TOTAL_STATUS_ATTRIBUTE_NAME, responseCountTotalStatus));
}
}
Loading

0 comments on commit e635084

Please sign in to comment.