Skip to content

Commit

Permalink
Move query categorization changes to plugin (#16)
Browse files Browse the repository at this point in the history
* Move query categorization changes to plugin

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Fix SearchSourceBuilder read/write test failures

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Fix tests

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Fix starting and stopping query insights service

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Unit tests for feature enable/disable and refactor logic

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

---------

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>
  • Loading branch information
deshsidd authored Jul 17, 2024
1 parent 2534e01 commit 811f4a5
Show file tree
Hide file tree
Showing 17 changed files with 1,088 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
import org.opensearch.plugin.insights.settings.QueryCategorizationSettings;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -49,7 +53,7 @@
/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin {
/**
* Default constructor
*/
Expand All @@ -67,10 +71,17 @@ public Collection<Object> createComponents(
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final MetricsRegistry metricsRegistry
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
threadPool,
client,
metricsRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down Expand Up @@ -119,7 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,24 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
* @param isCurrentMetricEnabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
this.queryInsightsService.enableCollection(metricType, enabled);
if (!enabled) {
// disable QueryInsightsListener only if all metrics collections are disabled now.
if (!queryInsightsService.isEnabled()) {
super.setEnabled(false);
this.queryInsightsService.stop();
public void setEnableTopQueries(final MetricType metricType, final boolean isCurrentMetricEnabled) {
boolean isTopNFeaturePreviouslyDisabled = !queryInsightsService.isTopNFeatureEnabled();
this.queryInsightsService.enableCollection(metricType, isCurrentMetricEnabled);
boolean isTopNFeatureCurrentlyDisabled = !queryInsightsService.isTopNFeatureEnabled();

if (isTopNFeatureCurrentlyDisabled) {
super.setEnabled(false);
if (!isTopNFeaturePreviouslyDisabled) {
queryInsightsService.checkAndStopQueryInsights();
}
} else {
super.setEnabled(true);
// restart QueryInsightsListener only if none of metrics collections is enabled before.
if (isAllMetricsDisabled) {
this.queryInsightsService.stop();
this.queryInsightsService.start();
if (isTopNFeaturePreviouslyDisabled) {
queryInsightsService.checkAndRestartQueryInsights();
}
}

}

@Override
Expand Down Expand Up @@ -176,7 +174,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.opensearch.plugin.insights.core.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -29,13 +33,17 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
* Service responsible for gathering, analyzing, storing and exporting
* information related to search queries
*/
public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
Expand Down Expand Up @@ -67,15 +75,25 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;

/**
* Constructor of the QueryInsightsService
*
* @param clusterSettings OpenSearch cluster level settings
* @param threadPool The OpenSearch thread pool to run async tasks
* @param client OS client
* @param metricsRegistry Opentelemetry Metrics registry
*/
@Inject
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
public QueryInsightsService(
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry
) {
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -93,6 +111,10 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
(settings -> validateExporterConfig(type, settings))
);
}

this.searchQueryMetricsEnabled = clusterSettings.get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
clusterSettings.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}

/**
Expand Down Expand Up @@ -133,6 +155,14 @@ public void drainRecords() {
topQueriesServices.get(metricType).consumeRecords(records);
}
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.consumeRecords(records);
} catch (Exception e) {
logger.error("Error while trying to categorize the queries.", e);
}
}
}

/**
Expand Down Expand Up @@ -166,11 +196,20 @@ public boolean isCollectionEnabled(final MetricType metricType) {
}

/**
* Check if query insights service is enabled
* Check if any feature of Query Insights service is enabled, right now includes Top N and Categorization.
*
* @return if query insights service is enabled
*/
public boolean isEnabled() {
public boolean isAnyFeatureEnabled() {
return isTopNFeatureEnabled() || isSearchQueryMetricsFeatureEnabled();
}

/**
* Check if top N enabled for any metric type
*
* @return if top N feature is enabled
*/
public boolean isTopNFeatureEnabled() {
for (MetricType t : MetricType.allMetricTypes()) {
if (isCollectionEnabled(t)) {
return true;
Expand All @@ -179,6 +218,33 @@ public boolean isEnabled() {
return false;
}

/**
* Is search query metrics feature enabled.
* @return boolean flag
*/
public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Stops query insights service if no features enabled
*/
public void checkAndStopQueryInsights() {
if (!isAnyFeatureEnabled()) {
this.stop();
}
}

/**
* Restarts query insights service if any feature enabled
*/
public void checkAndRestartQueryInsights() {
if (isAnyFeatureEnabled()) {
this.stop();
this.start();
}
}

/**
* Validate the window size config for a metricType
*
Expand Down Expand Up @@ -239,6 +305,32 @@ public void setExporter(final MetricType type, final Settings settings) {
}
}

/**
* Set search query metrics enabled to enable collection of search query categorization metrics
* @param searchQueryMetricsEnabled boolean flag
*/
public void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
boolean oldSearchQueryMetricsEnabled = isSearchQueryMetricsFeatureEnabled();
this.searchQueryMetricsEnabled = searchQueryMetricsEnabled;
if (searchQueryMetricsEnabled) {
if (!oldSearchQueryMetricsEnabled) {
checkAndRestartQueryInsights();
}
} else {
if (oldSearchQueryMetricsEnabled) {
checkAndStopQueryInsights();
}
}
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
*/
public SearchQueryCategorizer getSearchQueryCategorizer() {
return this.searchQueryCategorizer;
}

/**
* Validate the exporter config for a metricType
*
Expand All @@ -253,7 +345,7 @@ public void validateExporterConfig(final MetricType type, final Settings setting

@Override
protected void doStart() {
if (isEnabled()) {
if (isAnyFeatureEnabled()) {
scheduledFuture = threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.service.categorizer;

import org.apache.lucene.search.BooleanClause;
import org.opensearch.common.SetOnce;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* Class to traverse the QueryBuilder tree and capture the query shape
*/
public final class QueryShapeVisitor implements QueryBuilderVisitor {
private final SetOnce<String> queryType = new SetOnce<>();
private final Map<BooleanClause.Occur, List<QueryShapeVisitor>> childVisitors = new EnumMap<>(BooleanClause.Occur.class);

@Override
public void accept(QueryBuilder qb) {
queryType.set(qb.getName());
}

@Override
public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
// Should get called once per Occur value
if (childVisitors.containsKey(occur)) {
throw new IllegalStateException("child visitor already called for " + occur);
}
final List<QueryShapeVisitor> childVisitorList = new ArrayList<>();
QueryBuilderVisitor childVisitorWrapper = new QueryBuilderVisitor() {
QueryShapeVisitor currentChild;

@Override
public void accept(QueryBuilder qb) {
currentChild = new QueryShapeVisitor();
childVisitorList.add(currentChild);
currentChild.accept(qb);
}

@Override
public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
return currentChild.getChildVisitor(occur);
}
};
childVisitors.put(occur, childVisitorList);
return childVisitorWrapper;
}

/**
* Convert query builder tree to json
* @return json query builder tree as a string
*/
public String toJson() {
StringBuilder outputBuilder = new StringBuilder("{\"type\":\"").append(queryType.get()).append("\"");
for (Map.Entry<BooleanClause.Occur, List<QueryShapeVisitor>> entry : childVisitors.entrySet()) {
outputBuilder.append(",\"").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append("\"[");
boolean first = true;
for (QueryShapeVisitor child : entry.getValue()) {
if (!first) {
outputBuilder.append(",");
}
outputBuilder.append(child.toJson());
first = false;
}
outputBuilder.append("]");
}
outputBuilder.append("}");
return outputBuilder.toString();
}

/**
* Pretty print the query builder tree
* @param indent indent size
* @return Query builder tree as a pretty string
*/
public String prettyPrintTree(String indent) {
StringBuilder outputBuilder = new StringBuilder(indent).append(queryType.get()).append("\n");
for (Map.Entry<BooleanClause.Occur, List<QueryShapeVisitor>> entry : childVisitors.entrySet()) {
outputBuilder.append(indent).append(" ").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append(":\n");
for (QueryShapeVisitor child : entry.getValue()) {
outputBuilder.append(child.prettyPrintTree(indent + " "));
}
}
return outputBuilder.toString();
}

/**
* Default constructor
*/
public QueryShapeVisitor() {}
}
Loading

0 comments on commit 811f4a5

Please sign in to comment.