diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle index f457502794c6c..e24c796abdfdb 100644 --- a/plugins/query-insights/build.gradle +++ b/plugins/query-insights/build.gradle @@ -8,8 +8,13 @@ * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ + +import org.opensearch.gradle.testclusters.RunTask +import org.opensearch.gradle.test.RestIntegTestTask + apply plugin: 'opensearch.java-rest-test' apply plugin: 'opensearch.internal-cluster-test' +apply plugin: 'opensearch.testclusters' opensearchplugin { description 'OpenSearch Query Insights Plugin.' @@ -18,3 +23,27 @@ opensearchplugin { dependencies { } + +task integTest(type: RestIntegTestTask) { + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath +} +tasks.named("check").configure { dependsOn(integTest) } + +integTest { + // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' + } +} +testClusters.integTest { + testDistribution = "INTEG_TEST" + + // This installs our plugin into the testClusters + plugin(project.tasks.bundlePlugin.archiveFile) +} + +run { + useCluster testClusters.integTest +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index c0fae822a4ee7..ad7a4787a2a8b 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -97,7 +97,11 @@ public List> getSettings() { // Settings for top N queries QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, - QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER ); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java index bd4cb3324c6b7..21b084e957298 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -8,7 +8,6 @@ package org.opensearch.plugin.insights.core.exporter; -import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import java.util.List; @@ -21,11 +20,13 @@ * @opensearch.internal */ public abstract class QueryInsightsExporter> { + private QueryInsightsExporterType type; + private String identifier; - private boolean enabled = false; - - /** The export interval of this exporter, default to 60 seconds */ - private TimeValue exportInterval = TimeValue.timeValueSeconds(60); + QueryInsightsExporter(QueryInsightsExporterType type, String identifier) { + this.type = type; + this.identifier = identifier; + } /** * Export the data with the exporter. @@ -34,24 +35,19 @@ public abstract class QueryInsightsExporter> { */ public abstract void export(List records) throws Exception; - public boolean getEnabled() { - return enabled; + public void setType(QueryInsightsExporterType type) { + this.type = type; } - public void setEnabled(boolean enabled) { - this.enabled = enabled; + public QueryInsightsExporterType getType() { + return type; } - public TimeValue getExportInterval() { - return exportInterval; + public void setIdentifier(String identifier) { + this.identifier = identifier; } - /** - * Set the export interval for the exporter. - * - * @param interval export interval - */ - public void setExportInterval(TimeValue interval) { - this.exportInterval = interval; + public String getIdentifier() { + return identifier; } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java new file mode 100644 index 0000000000000..92466411ac47c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import java.util.Locale; + +/** + * Types for the Query Insights Exporters + * + * @opensearch.internal + */ +public enum QueryInsightsExporterType { + /* local index exporter */ + LOCAL_INDEX("local_index"); + + private final String type; + + QueryInsightsExporterType(String type) { + this.type = type; + } + + public static QueryInsightsExporterType parse(String type) { + return valueOf(type.toUpperCase(Locale.ROOT)); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java index 1690d8bad7973..6e615b58e8609 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java @@ -40,35 +40,29 @@ /** * Class to export data collected by search query analyzers to a local OpenSearch index *

- * Mainly for use within the Query Insight framework + * Internal used within the Query Insight framework * * @opensearch.internal */ public class QueryInsightsLocalIndexExporter> extends QueryInsightsExporter { - private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class); private static final int INDEX_TIMEOUT = 60; private final ClusterService clusterService; private final Client client; - /** The OpenSearch index name to export the data to */ - private final String localIndexName; - /** The mapping for the local index that holds the data */ private final InputStream localIndexMapping; public QueryInsightsLocalIndexExporter( - boolean enabled, ClusterService clusterService, Client client, String localIndexName, InputStream localIndexMapping ) { - this.setEnabled(enabled); + super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName); this.clusterService = clusterService; this.client = client; - this.localIndexName = localIndexName; this.localIndexMapping = localIndexMapping; } @@ -91,7 +85,9 @@ public synchronized void export(List records) throws IOException { @Override public void onResponse(CreateIndexResponse response) { if (response.isAcknowledged()) { - log.debug(String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", localIndexName)); + log.debug( + String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier()) + ); try { bulkRecord(records); } catch (IOException e) { @@ -102,7 +98,7 @@ public void onResponse(CreateIndexResponse response) { String.format( Locale.ROOT, "request to created local index %s for query insight not acknowledged.", - localIndexName + getIdentifier() ) ); } @@ -123,7 +119,7 @@ public void onFailure(Exception e) { */ private boolean checkIfIndexExists() { ClusterState clusterState = clusterService.state(); - return clusterState.getRoutingTable().hasIndex(this.localIndexName); + return clusterState.getRoutingTable().hasIndex(this.getIdentifier()); } /** @@ -133,7 +129,7 @@ private boolean checkIfIndexExists() { * @throws IOException if an error occurs */ private synchronized void initLocalIndex(ActionListener listener) throws IOException { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping(getIndexMappings()) + CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings()) .settings(Settings.builder().put("index.hidden", false).build()); client.admin().indices().create(createIndexRequest, listener); } @@ -145,7 +141,7 @@ private synchronized void initLocalIndex(ActionListener lis * @throws IOException if an error occurs */ private synchronized void dropLocalIndex(ActionListener listener) throws IOException { - DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.localIndexName); + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getIdentifier()); client.admin().indices().delete(deleteIndexRequest, listener); } @@ -170,22 +166,22 @@ private synchronized void bulkRecord(List records) throws IOException { .timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT)); for (T record : records) { bulkRequest.add( - new IndexRequest(localIndexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) ); } client.bulk(bulkRequest, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { - log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", localIndexName)); + log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier())); } else { - log.error(String.format(Locale.ROOT, "error when ingesting data for %s", localIndexName)); + log.error(String.format(Locale.ROOT, "error when ingesting data for %s", getIdentifier())); } } @Override public void onFailure(Exception e) { - log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", localIndexName, e)); + log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e)); } }); } @@ -197,7 +193,7 @@ public void onFailure(Exception e) { * @throws IOException if an error occurs */ private synchronized void indexRecord(T record) throws IOException { - IndexRequest indexRequest = new IndexRequest(localIndexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + IndexRequest indexRequest = new IndexRequest(getIdentifier()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT)); @@ -205,15 +201,15 @@ private synchronized void indexRecord(T record) throws IOException { @Override public void onResponse(IndexResponse response) { if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { - log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", localIndexName)); + log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", getIdentifier())); } else { - log.error(String.format(Locale.ROOT, "failed to index data for %s", localIndexName)); + log.error(String.format(Locale.ROOT, "failed to index data for %s", getIdentifier())); } } @Override public void onFailure(Exception e) { - log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", localIndexName, e)); + log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", getIdentifier(), e)); } }); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java index 2bc29a1b11ae2..4a7a3ff9dc547 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java @@ -24,6 +24,10 @@ import java.util.Locale; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; @@ -42,20 +46,41 @@ public final class SearchQueryLatencyListener extends SearchRequestOperationsLis @Inject public SearchQueryLatencyListener(ClusterService clusterService, TopQueriesByLatencyService topQueriesByLatencyService) { this.topQueriesByLatencyService = topQueriesByLatencyService; - this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); - this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); - this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); clusterService.getClusterSettings().addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, this::setEnabled); clusterService.getClusterSettings() - .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_SIZE, this.topQueriesByLatencyService::setTopNSize); + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_SIZE, + this.topQueriesByLatencyService::setTopNSize, + this.topQueriesByLatencyService::validateTopNSize + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + this.topQueriesByLatencyService::setWindowSize, + this.topQueriesByLatencyService::validateWindowSize + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_TYPE, this.topQueriesByLatencyService::setExporterType); clusterService.getClusterSettings() - .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_WINDOW_SIZE, this.topQueriesByLatencyService::setWindowSize); + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL, + this.topQueriesByLatencyService::setExportInterval, + this.topQueriesByLatencyService::validateExportInterval + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER, this.topQueriesByLatencyService::setExporterIdentifier); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED, this.topQueriesByLatencyService::setExporterEnabled); + + this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); } @Override public void setEnabled(boolean enabled) { super.setEnabled(enabled); - this.topQueriesByLatencyService.setEnabled(enabled); + this.topQueriesByLatencyService.setEnableCollect(enabled); } @Override @@ -86,7 +111,8 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search context.getNumShards(), request.indices(), new HashMap<>(), - searchRequestContext.phaseTookMap() + searchRequestContext.phaseTookMap(), + System.nanoTime() - searchRequestContext.getAbsoluteStartNanos() ); } catch (Exception e) { log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 2c98cb8060070..1f970cf59f207 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -13,8 +13,11 @@ import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -37,7 +40,11 @@ public abstract class QueryInsightsService, S extends Collection, E extends QueryInsightsExporter> extends AbstractLifecycleComponent { private static final Logger log = LogManager.getLogger(QueryInsightsService.class); - private boolean enabled; + /** enable insight data collection */ + private boolean enableCollect; + + /** enable insight data export */ + private boolean enableExport; /** The internal store that holds the query insight data */ @Nullable @@ -47,9 +54,12 @@ public abstract class QueryInsightsService, S ext @Nullable protected E exporter; + /** The export interval of this exporter, default to 1 day */ + protected TimeValue exportInterval = QueryInsightsSettings.MIN_EXPORT_INTERVAL; + /** The internal OpenSearch thread pool that execute async processing and exporting tasks*/ - private final ThreadPool threadPool; - private volatile Scheduler.Cancellable scheduledFuture; + protected final ThreadPool threadPool; + protected volatile Scheduler.Cancellable scheduledFuture; @Inject public QueryInsightsService(ThreadPool threadPool, @Nullable S store, @Nullable E exporter) { @@ -77,7 +87,7 @@ protected void ingestQueryData(R record) { * @throws IllegalArgumentException if query insight is disabled in the cluster */ public List getQueryData() throws IllegalArgumentException { - if (!enabled) { + if (!enableCollect) { throw new IllegalArgumentException("Cannot get query data when query insight feature is not enabled."); } clearOutdatedData(); @@ -91,6 +101,11 @@ public List getQueryData() throws IllegalArgumentException { */ public abstract void clearOutdatedData(); + /** + * Restart the exporter with new config + */ + public abstract void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier); + /** * Clear all data in the store */ @@ -98,12 +113,20 @@ public void clearAllData() { store.clear(); } - public void setEnabled(boolean enabled) { - this.enabled = enabled; + public void setEnableCollect(boolean enableCollect) { + this.enableCollect = enableCollect; + } + + public boolean getEnableCollect() { + return this.enableCollect; + } + + public void setEnableExport(boolean enableExport) { + this.enableExport = enableExport; } - public boolean getEnabled() { - return this.enabled; + public boolean getEnableExport() { + return this.enableExport; } /** @@ -111,12 +134,8 @@ public boolean getEnabled() { */ @Override protected void doStart() { - if (exporter != null && exporter.getEnabled()) { - scheduledFuture = threadPool.scheduleWithFixedDelay( - this::doExportAndClear, - exporter.getExportInterval(), - ThreadPool.Names.GENERIC - ); + if (exporter != null && getEnableExport()) { + scheduledFuture = threadPool.scheduleWithFixedDelay(this::doExport, exportInterval, ThreadPool.Names.GENERIC); } } @@ -127,17 +146,16 @@ protected void doStart() { protected void doStop() { if (scheduledFuture != null) { scheduledFuture.cancel(); - if (exporter != null && exporter.getEnabled()) { - doExportAndClear(); + if (exporter != null && getEnableExport()) { + doExport(); } } } - private void doExportAndClear() { + private void doExport() { List storedData = getQueryData(); try { exporter.export(storedData); - clearAllData(); log.debug(String.format(Locale.ROOT, "finish exporting query insight data to sink %s", storedData)); } catch (Exception e) { throw new RuntimeException(String.format(Locale.ROOT, "failed to export query insight data to sink, error: %s", e)); @@ -146,4 +164,17 @@ private void doExportAndClear() { @Override protected void doClose() {} + + public TimeValue getExportInterval() { + return exportInterval; + } + + /** + * Set the export interval for the exporter. + * + * @param interval export interval + */ + public void setExportInterval(TimeValue interval) { + this.exportInterval = interval; + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java index 8434eef16740c..197e600ca9c8c 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java @@ -15,14 +15,20 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; import org.opensearch.plugin.insights.core.exporter.QueryInsightsLocalIndexExporter; import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.ThreadPool; import java.util.Locale; import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_LOCAL_INDEX_MAPPING; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_EXPORT_INTERVAL; + /** * Service responsible for gathering, analyzing, storing and exporting * top N queries with high latency data for search queries @@ -32,7 +38,7 @@ public class TopQueriesByLatencyService extends QueryInsightsService< SearchQueryLatencyRecord, PriorityBlockingQueue, - QueryInsightsLocalIndexExporter> { + QueryInsightsExporter> { private static final Logger log = LogManager.getLogger(TopQueriesByLatencyService.class); /** Default window size in seconds to keep the top N queries with latency data in query insight store */ @@ -41,13 +47,20 @@ public class TopQueriesByLatencyService extends QueryInsightsService< /** Default top N size to keep the data in query insight store */ public static final int DEFAULT_TOP_N_SIZE = 3; + private static final TimeValue delay = TimeValue.ZERO; + private int topNSize = DEFAULT_TOP_N_SIZE; private TimeValue windowSize = TimeValue.timeValueSeconds(DEFAULT_WINDOW_SIZE); + private final ClusterService clusterService; + private final Client client; + @Inject public TopQueriesByLatencyService(ThreadPool threadPool, ClusterService clusterService, Client client) { super(threadPool, new PriorityBlockingQueue<>(), null); + this.clusterService = clusterService; + this.client = client; } /** @@ -60,6 +73,7 @@ public TopQueriesByLatencyService(ThreadPool threadPool, ClusterService clusterS * @param indices The indices involved in the search query * @param propertyMap Extra attributes and information about a search query * @param phaseLatencyMap Contains phase level latency information in a search query + * @param tookInNanos Total search request took time in nanoseconds */ public void ingestQueryData( final Long timestamp, @@ -68,7 +82,8 @@ public void ingestQueryData( final int totalShards, final String[] indices, final Map propertyMap, - final Map phaseLatencyMap + final Map phaseLatencyMap, + final Long tookInNanos ) { if (timestamp <= 0) { log.error( @@ -90,13 +105,16 @@ public void ingestQueryData( ); return; } - super.ingestQueryData( - new SearchQueryLatencyRecord(timestamp, searchType, source, totalShards, indices, propertyMap, phaseLatencyMap) - ); - // remove top elements for fix sizing priority queue - if (this.store.size() > this.getTopNSize()) { - this.store.poll(); - } + this.threadPool.schedule(() -> { + super.ingestQueryData( + new SearchQueryLatencyRecord(timestamp, searchType, source, totalShards, indices, propertyMap, phaseLatencyMap, tookInNanos) + ); + // remove top elements for fix sizing priority queue + if (this.store.size() > this.getTopNSize()) { + this.store.poll(); + } + }, delay, ThreadPool.Names.GENERIC); + log.debug(String.format(Locale.ROOT, "successfully ingested: %s", this.store)); } @@ -109,16 +127,120 @@ public void setTopNSize(int size) { this.topNSize = size; } - public void setWindowSize(TimeValue windowSize) { - this.windowSize = windowSize; + public void validateTopNSize(int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE.getKey() + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } } public int getTopNSize() { return this.topNSize; } + public void setWindowSize(TimeValue windowSize) { + this.windowSize = windowSize; + } + + public void validateWindowSize(TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0) { + throw new IllegalArgumentException( + "Window size setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey() + + "]" + + " should be smaller than max window size [" + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "was (" + + windowSize + + " > " + + QueryInsightsSettings.MAX_WINDOW_SIZE + + ")" + ); + } + } + public TimeValue getWindowSize() { return this.windowSize; } + public void setExporterType(QueryInsightsExporterType type) { + resetExporter( + getEnableExport(), + type, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void setExporterEnabled(boolean enabled) { + super.setEnableExport(enabled); + resetExporter( + enabled, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void setExporterIdentifier(String identifier) { + resetExporter( + getEnableExport(), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + identifier + ); + } + + public void setExportInterval(TimeValue interval) { + super.setExportInterval(interval); + resetExporter( + getEnableExport(), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void validateExportInterval(TimeValue exportInterval) { + if (exportInterval.getSeconds() < MIN_EXPORT_INTERVAL.getSeconds()) { + throw new IllegalArgumentException( + "Export Interval setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL.getKey() + + "]" + + " should not be smaller than minimal export interval size [" + + MIN_EXPORT_INTERVAL + + "]" + + "was (" + + exportInterval + + " < " + + MIN_EXPORT_INTERVAL + + ")" + ); + } + } + + public void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier) { + this.stop(); + this.exporter = null; + + if (!enabled) { + return; + } + if (type.equals(QueryInsightsExporterType.LOCAL_INDEX)) { + this.exporter = new QueryInsightsLocalIndexExporter<>( + clusterService, + client, + identifier, + TopQueriesByLatencyService.class.getClassLoader().getResourceAsStream(DEFAULT_LOCAL_INDEX_MAPPING) + ); + } + this.start(); + } + } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java index efa89538551c7..37767dcee8fe5 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -34,13 +34,16 @@ public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; + private final int top_n_size; public TopQueriesResponse(StreamInput in) throws IOException { super(in); + top_n_size = in.readInt(); } - public TopQueriesResponse(ClusterName clusterName, List nodes, List failures) { + public TopQueriesResponse(ClusterName clusterName, List nodes, List failures, int top_n_size) { super(clusterName, nodes, failures); + this.top_n_size = top_n_size; } @Override @@ -51,6 +54,7 @@ protected List readNodesFrom(StreamInput in) throws IOException { @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { out.writeList(nodes); + out.writeLong(top_n_size); } @Override @@ -87,6 +91,7 @@ private void toClusterLevelResult(XContentBuilder builder, Params params, List { private static final String PHASE_LATENCY_MAP = "phaseLatencyMap"; + private static final String TOOK = "tookInNs"; // latency info for each search phase private final Map phaseLatencyMap; @@ -31,6 +32,7 @@ public final class SearchQueryLatencyRecord extends SearchQueryRecord { public SearchQueryLatencyRecord(final StreamInput in) throws IOException { super(in); this.phaseLatencyMap = in.readMap(StreamInput::readString, StreamInput::readLong); + this.setValue(in.readLong()); } public SearchQueryLatencyRecord( @@ -40,10 +42,10 @@ public SearchQueryLatencyRecord( final int totalShards, final String[] indices, final Map propertyMap, - final Map phaseLatencyMap + final Map phaseLatencyMap, + final Long tookInNanos ) { - super(timestamp, searchType, source, totalShards, indices, propertyMap, phaseLatencyMap.values().stream().mapToLong(x -> x).sum()); - + super(timestamp, searchType, source, totalShards, indices, propertyMap, tookInNanos); this.phaseLatencyMap = phaseLatencyMap; } @@ -61,13 +63,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(INDICES, this.getIndices()); builder.field(PROPERTY_MAP, this.getPropertyMap()); builder.field(PHASE_LATENCY_MAP, this.getPhaseLatencyMap()); + builder.field(TOOK, this.getValue()); return builder.endObject(); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + out.writeLong(getTimestamp()); + out.writeString(getSearchType().toString()); + out.writeString(getSource()); + out.writeInt(getTotalShards()); + out.writeStringArray(getIndices()); + out.writeMap(getPropertyMap()); out.writeMap(phaseLatencyMap, StreamOutput::writeString, StreamOutput::writeLong); + out.writeLong(getValue()); } public boolean equals(SearchQueryLatencyRecord other) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index ff079f4caa4cc..9c99bc80c882a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -42,7 +42,7 @@ public abstract class SearchQueryRecord> protected static final String PROPERTY_MAP = "propertyMap"; protected static final String VALUE = "value"; - protected final Long timestamp; + private final Long timestamp; private final SearchType searchType; @@ -64,7 +64,6 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce this.totalShards = in.readInt(); this.indices = in.readStringArray(); this.propertyMap = in.readMap(); - this.value = castToValue(in.readGenericValue()); } public SearchQueryRecord( @@ -202,7 +201,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TOTAL_SHARDS, totalShards); builder.field(INDICES, indices); builder.field(PROPERTY_MAP, propertyMap); - builder.field(VALUE, value); return builder.endObject(); } @@ -214,6 +212,5 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(totalShards); out.writeStringArray(indices); out.writeMap(propertyMap); - out.writeGenericValue(value); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java index 0048a989cb0dd..fa7996cc6f3f7 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -21,6 +21,7 @@ import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; @@ -70,7 +71,16 @@ protected TopQueriesResponse newResponse( List responses, List failures ) { - return new TopQueriesResponse(clusterService.getClusterName(), responses, failures); + if (topQueriesRequest.getMetricType() == TopQueriesRequest.Metric.LATENCY) { + return new TopQueriesResponse( + clusterService.getClusterName(), + responses, + failures, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE) + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", topQueriesRequest.getMetricType())); + } } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 04dd8a8f14ed5..343b5ba0e13a3 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -10,7 +10,9 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; +import java.util.Locale; import java.util.concurrent.TimeUnit; import static org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService.DEFAULT_TOP_N_SIZE; @@ -23,6 +25,13 @@ * @opensearch.experimental */ public class QueryInsightsSettings { + /** + * Default Values and Settings + */ + public static final TimeValue MAX_WINDOW_SIZE = new TimeValue(86400, TimeUnit.SECONDS); + public static final int MAX_N_SIZE = 100; + public static final TimeValue MIN_EXPORT_INTERVAL = new TimeValue(86400, TimeUnit.SECONDS);; + public static final String DEFAULT_LOCAL_INDEX_MAPPING = "mappings/top_n_queries_record.json"; /** * Query Insights base uri */ @@ -66,6 +75,60 @@ public class QueryInsightsSettings { Setting.Property.Dynamic ); + /** + * Settings for exporters + */ + public static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter"; + + /** + * Boolean setting for enabling top queries by latency exporter + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for top queries by latency exporter type + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_TYPE = new Setting<>( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".type", + QueryInsightsExporterType.LOCAL_INDEX.name(), + QueryInsightsExporterType::parse, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for top queries by latency exporter interval + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".interval", + MIN_EXPORT_INTERVAL, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting for identifier (e.g. index name for index exporter) top queries by latency exporter + * Default value is "top_queries_since_{current_timestamp}" + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER = Setting.simpleString( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".identifier", + "top_queries_since_" + System.currentTimeMillis(), + value -> { + if (value == null || value.length() == 0) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index name for [%s]", TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".identifier") + ); + } + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** * Default constructor */ diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 6d06b47f5788c..a0de894ace6a0 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -71,7 +71,8 @@ public static List generateQueryInsightRecords(int low randomIntBetween(1, 100), randomArray(1, 3, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)), propertyMap, - phaseLatencyMap + phaseLatencyMap, + phaseLatencyMap.values().stream().mapToLong(x -> x).sum() ) ); } @@ -104,7 +105,8 @@ public static TopQueries createTopQueries() { randomInt(), randomArray(1, 3, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)), propertyMap, - phaseLatencyMap + phaseLatencyMap, + phaseLatencyMap.values().stream().mapToLong(x -> x).sum() ) ); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java index 7a12e6751f1b7..32400af61c360 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java @@ -82,7 +82,6 @@ public void testExportWhenIndexExists() throws IOException { when(clusterService.state()).thenReturn(clusterState); QueryInsightsLocalIndexExporter queryInsightLocalIndexExporter = new QueryInsightsLocalIndexExporter<>( - true, clusterService, client, LOCAL_INDEX_NAME, @@ -135,9 +134,7 @@ public void testConcurrentExportWhenIndexExists() throws IOException, Interrupte final List> queryInsightLocalIndexExporters = new ArrayList<>(); for (int i = 0; i < numBulk; i++) { - queryInsightLocalIndexExporters.add( - new QueryInsightsLocalIndexExporter<>(true, clusterService, client, LOCAL_INDEX_NAME, null) - ); + queryInsightLocalIndexExporters.add(new QueryInsightsLocalIndexExporter<>(clusterService, client, LOCAL_INDEX_NAME, null)); } for (int i = 0; i < numBulk; i++) { @@ -199,7 +196,6 @@ public void testExportWhenIndexNotExists() throws IOException { InputStreamStreamInput streamInput = new InputStreamStreamInput(is); QueryInsightsLocalIndexExporter queryInsightLocalIndexExporter = new QueryInsightsLocalIndexExporter<>( - true, clusterService, client, LOCAL_INDEX_NAME, @@ -257,7 +253,7 @@ public void testConcurrentExportWhenIndexNotExists() throws IOException, Interru final List> queryInsightLocalIndexExporters = new ArrayList<>(); for (int i = 0; i < numBulk; i++) { queryInsightLocalIndexExporters.add( - new QueryInsightsLocalIndexExporter<>(true, clusterService, client, LOCAL_INDEX_NAME, streamInput) + new QueryInsightsLocalIndexExporter<>(clusterService, client, LOCAL_INDEX_NAME, streamInput) ); } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java index c7572c35bbe87..5228e0054c440 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.anyMap; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -53,6 +54,10 @@ public void testOnRequestEnd() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); @@ -91,7 +96,8 @@ public void testOnRequestEnd() { eq(numberOfShards), eq(indices), anyMap(), - eq(phaseLatencyMap) + eq(phaseLatencyMap), + anyLong() ); } @@ -107,6 +113,10 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); @@ -165,7 +175,8 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { eq(numberOfShards), eq(indices), anyMap(), - eq(phaseLatencyMap) + eq(phaseLatencyMap), + anyLong() ); } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java index e0a29ea66969e..86f01caad8e7d 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java @@ -13,11 +13,14 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.Node; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -30,9 +33,20 @@ */ public class TopQueriesByLatencyServiceTests extends OpenSearchTestCase { - public void testIngestQueryDataWithLargeWindow() { + private ThreadPool threadPool; + + @Before + public void setup() { + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "top n queries tests").build()); + } + + @After + public void shutdown() throws Exception { + terminate(threadPool); + } + + public void testIngestQueryDataWithLargeWindow() throws InterruptedException { final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); @@ -40,13 +54,17 @@ public void testIngestQueryDataWithLargeWindow() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); - topQueriesByLatencyService.setEnabled(true); + topQueriesByLatencyService.setEnableCollect(true); topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); @@ -58,15 +76,19 @@ public void testIngestQueryDataWithLargeWindow() { record.getTotalShards(), record.getIndices(), record.getPropertyMap(), - record.getPhaseLatencyMap() + record.getPhaseLatencyMap(), + record.getValue() ); } + Thread.sleep(1000); assertTrue(QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder(topQueriesByLatencyService.getQueryData(), records)); } public void testConcurrentIngestQueryDataWithLargeWindow() throws InterruptedException { final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); + final ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default ingest tests").build() + ); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); @@ -74,13 +96,17 @@ public void testConcurrentIngestQueryDataWithLargeWindow() throws InterruptedExc clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); - topQueriesByLatencyService.setEnabled(true); + topQueriesByLatencyService.setEnableCollect(true); topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); @@ -100,7 +126,8 @@ public void testConcurrentIngestQueryDataWithLargeWindow() throws InterruptedExc records.get(finalI).getTotalShards(), records.get(finalI).getIndices(), records.get(finalI).getPropertyMap(), - records.get(finalI).getPhaseLatencyMap() + records.get(finalI).getPhaseLatencyMap(), + records.get(finalI).getValue() ); countDownLatch.countDown(); }); @@ -108,11 +135,12 @@ public void testConcurrentIngestQueryDataWithLargeWindow() throws InterruptedExc } phaser.arriveAndAwaitAdvance(); countDownLatch.await(); + Thread.sleep(1000); assertTrue(QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder(topQueriesByLatencyService.getQueryData(), records)); } - public void testSmallWindowClearOutdatedData() { + public void testSmallWindowClearOutdatedData() throws InterruptedException { final Client client = mock(Client.class); final ThreadPool threadPool = mock(ThreadPool.class); @@ -122,13 +150,17 @@ public void testSmallWindowClearOutdatedData() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); - topQueriesByLatencyService.setEnabled(true); + topQueriesByLatencyService.setEnableCollect(true); topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); topQueriesByLatencyService.setWindowSize(new TimeValue(-1)); @@ -140,15 +172,16 @@ public void testSmallWindowClearOutdatedData() { record.getTotalShards(), record.getIndices(), record.getPropertyMap(), - record.getPhaseLatencyMap() + record.getPhaseLatencyMap(), + record.getValue() ); } + Thread.sleep(1000); assertEquals(0, topQueriesByLatencyService.getQueryData().size()); } - public void testSmallNSize() { + public void testSmallNSize() throws InterruptedException { final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); Settings.Builder settingsBuilder = Settings.builder(); Settings settings = settingsBuilder.build(); @@ -156,13 +189,17 @@ public void testSmallNSize() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); - ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); - topQueriesByLatencyService.setEnabled(true); + topQueriesByLatencyService.setEnableCollect(true); topQueriesByLatencyService.setTopNSize(1); topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); @@ -174,9 +211,11 @@ public void testSmallNSize() { record.getTotalShards(), record.getIndices(), record.getPropertyMap(), - record.getPhaseLatencyMap() + record.getPhaseLatencyMap(), + record.getValue() ); } + Thread.sleep(1000); assertEquals(1, topQueriesByLatencyService.getQueryData().size()); } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java index 54c10d93419d2..11063cbedd248 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -28,7 +28,7 @@ public class TopQueriesResponseTests extends OpenSearchTestCase { public void testToXContent() throws Exception { TopQueries topQueries = QueryInsightsTestUtils.createTopQueries(); ClusterName clusterName = new ClusterName("test-cluster"); - TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>()); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10); TopQueriesResponse deserializedResponse = roundTripResponse(response); assertEquals(response.toString(), deserializedResponse.toString()); }