Skip to content

Commit

Permalink
consume query level cpu and memory usage in query insights (opensearc…
Browse files Browse the repository at this point in the history
…h-project#13739)

* consume query level cpu and memory usage in query insights

Signed-off-by: Chenyang Ji <cyji@amazon.com>

* handle failed requests metrics in query insights

Signed-off-by: Chenyang Ji <cyji@amazon.com>

* refactor the code to make it more maintainable

Signed-off-by: Chenyang Ji <cyji@amazon.com>

---------

Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy authored Jun 10, 2024
1 parent 8b1a4b3 commit 04a417a
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))
- [Query Insights] Add cpu and memory metrics to top n queries ([#13739](https://github.com/opensearch-project/OpenSearch/pull/13739))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,15 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Locale;
import java.util.Set;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume
}
switch (type) {
case LOCAL_INDEX:
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
if (indexPattern.length() == 0) {
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchTask;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
Expand All @@ -25,13 +27,14 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;

/**
* The listener for query insights services.
Expand All @@ -46,6 +49,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
private static final Logger log = LogManager.getLogger(QueryInsightsListener.class);

private final QueryInsightsService queryInsightsService;
private final ClusterService clusterService;

/**
* Constructor for QueryInsightsListener
Expand All @@ -55,26 +59,32 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
*/
@Inject
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
);
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
// Setting endpoints set up for top n queries, including enabling top n queries, window size and top n size
// Expected metricTypes are Latency, CPU and Memory.
for (MetricType type : MetricType.allMetricTypes()) {
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(getTopNEnabledSetting(type), v -> this.setEnableTopQueries(type, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getTopNSizeSetting(type),
v -> this.queryInsightsService.setTopNSize(type, v),
v -> this.queryInsightsService.validateTopNSize(type, v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getTopNWindowSizeSetting(type),
v -> this.queryInsightsService.setWindowSize(type, v),
v -> this.queryInsightsService.validateWindowSize(type, v)
);

this.setEnableTopQueries(type, clusterService.getClusterSettings().get(getTopNEnabledSetting(type)));
this.queryInsightsService.validateTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type)));
this.queryInsightsService.setTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type)));
this.queryInsightsService.validateWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}
}

/**
Expand Down Expand Up @@ -124,6 +134,27 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
constructSearchQueryRecord(context, searchRequestContext);
}

@Override
public void onRequestFailure(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
constructSearchQueryRecord(context, searchRequestContext);
}

private void constructSearchQueryRecord(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
SearchTask searchTask = context.getTask();
List<TaskResourceInfo> tasksResourceUsages = searchRequestContext.getPhaseResourceUsage();
tasksResourceUsages.add(
new TaskResourceInfo(
searchTask.getAction(),
searchTask.getId(),
searchTask.getParentTaskId().getId(),
clusterService.localNode().getId(),
searchTask.getTotalResourceStats()
)
);

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Expand All @@ -133,12 +164,25 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
);
}
if (queryInsightsService.isCollectionEnabled(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
);
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand All @@ -154,4 +198,5 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
Expand All @@ -27,7 +29,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
* Service responsible for gathering, analyzing, storing and exporting
Expand Down Expand Up @@ -86,11 +88,13 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
enableCollect.put(metricType, false);
topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory));
}
clusterSettings.addSettingsUpdateConsumer(
TOP_N_LATENCY_EXPORTER_SETTINGS,
(settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)),
(settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings))
);
for (MetricType type : MetricType.allMetricTypes()) {
clusterSettings.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporter(type, settings)),
(settings -> validateExporterConfig(type, settings))
);
}
}

/**
Expand Down Expand Up @@ -177,6 +181,78 @@ public boolean isEnabled() {
return false;
}

/**
* Validate the window size config for a metricType
*
* @param type {@link MetricType}
* @param windowSize {@link TimeValue}
*/
public void validateWindowSize(final MetricType type, final TimeValue windowSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateWindowSize(windowSize);
}
}

/**
* Set window size for a metricType
*
* @param type {@link MetricType}
* @param windowSize {@link TimeValue}
*/
public void setWindowSize(final MetricType type, final TimeValue windowSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setWindowSize(windowSize);
}
}

/**
* Validate the top n size config for a metricType
*
* @param type {@link MetricType}
* @param topNSize top n size
*/
public void validateTopNSize(final MetricType type, final int topNSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateTopNSize(topNSize);
}
}

/**
* Set the top n size config for a metricType
*
* @param type {@link MetricType}
* @param topNSize top n size
*/
public void setTopNSize(final MetricType type, final int topNSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setTopNSize(topNSize);
}
}

/**
* Set the exporter config for a metricType
*
* @param type {@link MetricType}
* @param settings exporter settings
*/
public void setExporter(final MetricType type, final Settings settings) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setExporter(settings);
}
}

/**
* Validate the exporter config for a metricType
*
* @param type {@link MetricType}
* @param settings exporter settings
*/
public void validateExporterConfig(final MetricType type, final Settings settings) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateExporterConfig(settings);
}
}

@Override
protected void doStart() {
if (isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
Expand Down Expand Up @@ -218,10 +218,7 @@ public void setExporter(final Settings settings) {
if (settings.get(EXPORTER_TYPE) != null) {
SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) {
queryInsightsExporterFactory.updateExporter(
exporter,
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
);
queryInsightsExporterFactory.updateExporter(exporter, settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN));
} else {
try {
queryInsightsExporterFactory.closeExporter(this.exporter);
Expand All @@ -230,7 +227,7 @@ public void setExporter(final Settings settings) {
}
this.exporter = queryInsightsExporterFactory.createExporter(
SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)),
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN)
);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public enum Attribute {
* The node id for this request
*/
NODE_ID,
/**
* Tasks level resource usages in this request
*/
TASK_RESOURCE_USAGES,
/**
* Custom search request labels
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum MetricType implements Comparator<Number> {
/**
* JVM heap usage metric type
*/
JVM;
MEMORY;

/**
* Read a MetricType from a StreamInput
Expand Down Expand Up @@ -93,10 +93,9 @@ public static Set<MetricType> allMetricTypes() {
public int compare(final Number a, final Number b) {
switch (this) {
case LATENCY:
return Long.compare(a.longValue(), b.longValue());
case JVM:
case CPU:
return Double.compare(a.doubleValue(), b.doubleValue());
case MEMORY:
return Long.compare(a.longValue(), b.longValue());
}
return -1;
}
Expand All @@ -110,10 +109,9 @@ public int compare(final Number a, final Number b) {
Number parseValue(final Object o) {
switch (this) {
case LATENCY:
return (Long) o;
case JVM:
case CPU:
return (Double) o;
case MEMORY:
return (Long) o;
default:
return (Number) o;
}
Expand Down
Loading

0 comments on commit 04a417a

Please sign in to comment.