Skip to content

Commit

Permalink
Increment latency, cpu and memory histograms for query/aggregation/so…
Browse files Browse the repository at this point in the history
…rt types

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>
  • Loading branch information
deshsidd committed Jul 17, 2024
1 parent 811f4a5 commit f29f530
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.plugin.insights.core.service.categorizer;

import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Collection;
import java.util.Map;

/**
* Increments the counters related to Aggregation Search Queries.
Expand All @@ -32,32 +34,34 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters

/**
* Increment aggregation related counters
*
* @param aggregatorFactories input aggregations
* @param measurements latency, cpu, memory measurements
*/
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories) {
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories, Map<MetricType, Number> measurements) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder);
incrementCountersRecursively(aggregationBuilder, measurements);
}
}

private void incrementCountersRecursively(AggregationBuilder aggregationBuilder) {
private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map<MetricType, Number> measurements) {
// Increment counters for the current aggregation
String aggregationType = aggregationBuilder.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, aggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, aggregationType), measurements);

// Recursively process sub-aggregations if any
Collection<AggregationBuilder> subAggregations = aggregationBuilder.getSubAggregations();
if (subAggregations != null && !subAggregations.isEmpty()) {
for (AggregationBuilder subAggregation : subAggregations) {
incrementCountersRecursively(subAggregation);
incrementCountersRecursively(subAggregation, measurements);
}
}

// Process pipeline aggregations
Collection<PipelineAggregationBuilder> pipelineAggregations = aggregationBuilder.getPipelineAggregations();
for (PipelineAggregationBuilder pipelineAggregation : pipelineAggregations) {
String pipelineAggregationType = pipelineAggregation.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType), measurements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.List;
import java.util.Map;

/**
* Class to categorize the search queries based on the type and increment the relevant counters.
Expand Down Expand Up @@ -64,50 +66,53 @@ public static SearchQueryCategorizer getInstance(MetricsRegistry metricsRegistry
}

/**
* Consume records and increment counters for the records
* Consume records and increment counters for the records including latency, cpu and memory histograms.
* @param records records to consume
*/
public void consumeRecords(List<SearchQueryRecord> records) {
for (SearchQueryRecord record : records) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
categorize(source);
categorize(record);
}
}

/**
* Increment categorizations counters for the given source search query
* @param source search query source
* Increment categorizations counters for the given source search query and
* also increment latency, cpu and memory related histograms.
* @param record search query source
*/
public void categorize(SearchSourceBuilder source) {
public void categorize(SearchQueryRecord record) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
Map<MetricType, Number> measurements = record.getMeasurements();

QueryBuilder topLevelQueryBuilder = source.query();
logQueryShape(topLevelQueryBuilder);
incrementQueryTypeCounters(topLevelQueryBuilder);
incrementQueryAggregationCounters(source.aggregations());
incrementQuerySortCounters(source.sorts());
incrementQueryTypeCounters(topLevelQueryBuilder, measurements);
incrementQueryAggregationCounters(source.aggregations(), measurements);
incrementQuerySortCounters(source.sorts(), measurements);
}

private void incrementQuerySortCounters(List<SortBuilder<?>> sorts) {
private void incrementQuerySortCounters(List<SortBuilder<?>> sorts, Map<MetricType, Number> measurements) {
if (sorts != null && sorts.size() > 0) {
for (SortBuilder<?> sortBuilder : sorts) {
String sortOrder = sortBuilder.order().toString();
searchQueryCounters.incrementSortCounter(1, Tags.create().addTag("sort_order", sortOrder));
searchQueryCounters.incrementSortCounter(1, Tags.create().addTag("sort_order", sortOrder), measurements);
}
}
}

private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations) {
private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations, Map<MetricType, Number> measurements) {
if (aggregations == null) {
return;
}

searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories());
searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories(), measurements);
}

private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder) {
private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map<MetricType, Number> measurements) {
if (topLevelQueryBuilder == null) {
return;
}
QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters);
QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters, measurements);
topLevelQueryBuilder.visit(searchQueryVisitor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.apache.lucene.search.BooleanClause;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.plugin.insights.rules.model.MetricType;

import java.util.Map;

/**
* Class to visit the query builder tree and also track the level information.
Expand All @@ -19,21 +22,23 @@
final class SearchQueryCategorizingVisitor implements QueryBuilderVisitor {
private final int level;
private final SearchQueryCounters searchQueryCounters;
private final Map<MetricType, Number> measurements;

public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters) {
this(searchQueryCounters, 0);
public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters, Map<MetricType, Number> measurements) {
this(searchQueryCounters, 0, measurements);
}

private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level) {
private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level, Map<MetricType, Number> measurements) {
this.searchQueryCounters = counters;
this.level = level;
this.measurements = measurements;
}

public void accept(QueryBuilder qb) {
searchQueryCounters.incrementCounter(qb, level);
searchQueryCounters.incrementCounter(qb, level, measurements);
}

public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
return new SearchQueryCategorizingVisitor(searchQueryCounters, level + 1);
return new SearchQueryCategorizingVisitor(searchQueryCounters, level + 1, measurements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.plugin.insights.core.service.categorizer;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

Expand All @@ -22,6 +24,7 @@
*/
public final class SearchQueryCounters {
private static final String LEVEL_TAG = "level";
private static final String TYPE_TAG = "type";
private static final String UNIT = "1";
private final MetricsRegistry metricsRegistry;
/**
Expand All @@ -36,6 +39,20 @@ public final class SearchQueryCounters {
* Counter for sort
*/
private final Counter sortCounter;

/**
* Histogram for latency per query type
*/
private final Histogram queryTypeLatencyHistogram;
/**
* Histogram for cpu per query type
*/
private final Histogram queryTypeCpuHistogram;
/**
* Histogram for memory per query type
*/
private final Histogram queryTypeMemoryHistogram;

private final Map<Class<? extends QueryBuilder>, Counter> queryHandlers;
/**
* Counter name to Counter object map
Expand Down Expand Up @@ -64,38 +81,61 @@ public SearchQueryCounters(MetricsRegistry metricsRegistry) {
"Counter for the number of top level sort search queries",
UNIT
);
this.queryTypeLatencyHistogram = metricsRegistry.createHistogram(
"search.query.type.latency.histogram",
"Histogram for the latency per query type",
UNIT
);
this.queryTypeCpuHistogram = metricsRegistry.createHistogram(
"search.query.type.cpu.histogram",
"Histogram for the cpu per query type",
UNIT
);
this.queryTypeMemoryHistogram = metricsRegistry.createHistogram(
"search.query.type.memory.histogram",
"Histogram for the memory per query type",
UNIT
);
this.queryHandlers = new HashMap<>();

}

/**
* Increment counter
* @param queryBuilder query builder
* @param level level of query builder, 0 being highest level
*/
public void incrementCounter(QueryBuilder queryBuilder, int level) {
public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricType, Number> measurements) {
String uniqueQueryCounterName = queryBuilder.getName();

Counter counter = nameToQueryTypeCounters.computeIfAbsent(uniqueQueryCounterName, k -> createQueryCounter(k));
counter.add(1, Tags.create().addTag(LEVEL_TAG, level));
incrementAllHistograms(Tags.create().addTag(LEVEL_TAG, level).addTag(TYPE_TAG, uniqueQueryCounterName), measurements);
}

/**
* Increment aggregate counter
* @param value value to increment
* @param tags tags
*/
public void incrementAggCounter(double value, Tags tags) {
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
aggCounter.add(value, tags);
incrementAllHistograms(tags, measurements);
}

/**
* Increment sort counter
* @param value value to increment
* @param tags tags
*/
public void incrementSortCounter(double value, Tags tags) {
public void incrementSortCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
sortCounter.add(value, tags);
incrementAllHistograms(tags, measurements);
}

private void incrementAllHistograms(Tags tags, Map<MetricType, Number> measurements) {
queryTypeLatencyHistogram.record(measurements.get(MetricType.LATENCY).doubleValue(), tags);
queryTypeCpuHistogram.record(measurements.get(MetricType.CPU).doubleValue(), tags);
queryTypeMemoryHistogram.record(measurements.get(MetricType.MEMORY).doubleValue(), tags);
}

/**
Expand Down Expand Up @@ -131,4 +171,28 @@ private Counter createQueryCounter(String counterName) {
);
return counter;
}

/**
* Get Query type latency histogram
* @return histogram
*/
public Histogram getQueryTypeLatencyHistogram() {
return queryTypeLatencyHistogram;
}

/**
* Get Query type cpu histogram
* @return histogram
*/
public Histogram getQueryTypeCpuHistogram() {
return queryTypeCpuHistogram;
}

/**
* Get Query type memory histogram
* @return histogram
*/
public Histogram getQueryTypeMemoryHistogram() {
return queryTypeMemoryHistogram;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ final public class QueryInsightsTestUtils {

public QueryInsightsTestUtils() {}

/**
* Returns list of randomly generated search query records.
* @param count number of records
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count) {
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0);
}

/**
* Returns list of randomly generated search query records.
* @param count number of records
* @param searchSourceBuilder source
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count, SearchSourceBuilder searchSourceBuilder) {
List<SearchQueryRecord> records = generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0);
for (SearchQueryRecord record : records) {
record.getAttributes().put(Attribute.SOURCE, searchSourceBuilder);
}
return records;
}

/**
* Creates a List of random Query Insight Records for testing purpose
*/
Expand Down
Loading

0 comments on commit f29f530

Please sign in to comment.