diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f64dcf82bd4d..5887c0e6d6c85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -194,6 +194,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) - Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) - Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) +- [Query Insights] Query Insights Plugin Implementation ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903)) ### Deprecated diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index e9a6d798b8323..d3fb9f82c3715 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -141,6 +141,7 @@ configure([ project(":plugins:mapper-annotated-text"), project(":plugins:mapper-murmur3"), project(":plugins:mapper-size"), + project(":plugins:query-insights"), project(":plugins:repository-azure"), project(":plugins:repository-gcs"), project(":plugins:repository-hdfs"), diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle new file mode 100644 index 0000000000000..f457502794c6c --- /dev/null +++ b/plugins/query-insights/build.gradle @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ +apply plugin: 'opensearch.java-rest-test' +apply plugin: 'opensearch.internal-cluster-test' + +opensearchplugin { + description 'OpenSearch Query Insights Plugin.' + classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' +} + +dependencies { +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java new file mode 100644 index 0000000000000..c2bfd3810141d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchPlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for Query Insights. + */ +public class QueryInsightsPlugin extends Plugin implements ActionPlugin, SearchPlugin { + /** + * Default constructor + */ + public QueryInsightsPlugin() {} + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + return List.of(); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(); + } + + @Override + public List> getActions() { + return List.of(); + } + + @Override + public List> getSettings() { + return List.of(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java new file mode 100644 index 0000000000000..21b084e957298 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.util.List; + +/** + * Simple abstract class to export data collected by search query analyzers + *

+ * Mainly for use within the Query Insight framework + * + * @opensearch.internal + */ +public abstract class QueryInsightsExporter> { + private QueryInsightsExporterType type; + private String identifier; + + QueryInsightsExporter(QueryInsightsExporterType type, String identifier) { + this.type = type; + this.identifier = identifier; + } + + /** + * Export the data with the exporter. + * + * @param records the data to export + */ + public abstract void export(List records) throws Exception; + + public void setType(QueryInsightsExporterType type) { + this.type = type; + } + + public QueryInsightsExporterType getType() { + return type; + } + + public void setIdentifier(String identifier) { + this.identifier = identifier; + } + + public String getIdentifier() { + return identifier; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java new file mode 100644 index 0000000000000..92466411ac47c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import java.util.Locale; + +/** + * Types for the Query Insights Exporters + * + * @opensearch.internal + */ +public enum QueryInsightsExporterType { + /* local index exporter */ + LOCAL_INDEX("local_index"); + + private final String type; + + QueryInsightsExporterType(String type) { + this.type = type; + } + + public static QueryInsightsExporterType parse(String type) { + return valueOf(type.toUpperCase(Locale.ROOT)); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java new file mode 100644 index 0000000000000..6e615b58e8609 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporter.java @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Locale; +import java.util.Objects; + +/** + * Class to export data collected by search query analyzers to a local OpenSearch index + *

+ * Internal used within the Query Insight framework + * + * @opensearch.internal + */ +public class QueryInsightsLocalIndexExporter> extends QueryInsightsExporter { + private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class); + private static final int INDEX_TIMEOUT = 60; + + private final ClusterService clusterService; + private final Client client; + + /** The mapping for the local index that holds the data */ + private final InputStream localIndexMapping; + + public QueryInsightsLocalIndexExporter( + ClusterService clusterService, + Client client, + String localIndexName, + InputStream localIndexMapping + ) { + super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName); + this.clusterService = clusterService; + this.client = client; + this.localIndexMapping = localIndexMapping; + } + + /** + * Export the data to the predefined local OpenSearch Index + * + * @param records the data to export + * @throws IOException if an error occurs + */ + @Override + public synchronized void export(List records) throws IOException { + if (records.size() == 0) { + return; + } + if (checkIfIndexExists()) { + bulkRecord(records); + } else { + // local index not exist + initLocalIndex(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse response) { + if (response.isAcknowledged()) { + log.debug( + String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier()) + ); + try { + bulkRecord(records); + } catch (IOException e) { + log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e)); + } + } else { + log.error( + String.format( + Locale.ROOT, + "request to created local index %s for query insight not acknowledged.", + getIdentifier() + ) + ); + } + } + + @Override + public void onFailure(Exception e) { + log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e)); + } + }); + } + } + + /** + * Util function to check if a local OpenSearch Index exists + * + * @return boolean + */ + private boolean checkIfIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.getRoutingTable().hasIndex(this.getIdentifier()); + } + + /** + * Initialize the local OpenSearch Index for the exporter + * + * @param listener the listener to be notified upon completion + * @throws IOException if an error occurs + */ + private synchronized void initLocalIndex(ActionListener listener) throws IOException { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings()) + .settings(Settings.builder().put("index.hidden", false).build()); + client.admin().indices().create(createIndexRequest, listener); + } + + /** + * Drop the local OpenSearch Index created by the exporter + * + * @param listener the listener to be notified upon completion + * @throws IOException if an error occurs + */ + private synchronized void dropLocalIndex(ActionListener listener) throws IOException { + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getIdentifier()); + client.admin().indices().delete(deleteIndexRequest, listener); + } + + /** + * Get the index mapping of the local index + * + * @return String to represent the index mapping + * @throws IOException if an error occurs + */ + private String getIndexMappings() throws IOException { + return new String(Objects.requireNonNull(this.localIndexMapping).readAllBytes(), Charset.defaultCharset()); + } + + /** + * Bulk ingest the data into to the predefined local OpenSearch Index + * + * @param records the data to export + * @throws IOException if an error occurs + */ + private synchronized void bulkRecord(List records) throws IOException { + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT)); + for (T record : records) { + bulkRequest.add( + new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + client.bulk(bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { + log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier())); + } else { + log.error(String.format(Locale.ROOT, "error when ingesting data for %s", getIdentifier())); + } + } + + @Override + public void onFailure(Exception e) { + log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e)); + } + }); + } + + /** + * Index one document to the predefined local OpenSearch Index + * + * @param record the document to export + * @throws IOException if an error occurs + */ + private synchronized void indexRecord(T record) throws IOException { + IndexRequest indexRequest = new IndexRequest(getIdentifier()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT)); + + client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { + log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", getIdentifier())); + } else { + log.error(String.format(Locale.ROOT, "failed to index data for %s", getIdentifier())); + } + } + + @Override + public void onFailure(Exception e) { + log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", getIdentifier(), e)); + } + }); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java new file mode 100644 index 0000000000000..3ccbfc5c4cb24 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Exporters for Query Insights + */ +package org.opensearch.plugin.insights.core.exporter; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java new file mode 100644 index 0000000000000..1f970cf59f207 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; + +/** + * Service responsible for gathering, analyzing, storing and exporting data related to + * search queries, based on certain dimensions. + * + * @param The type of record that stores in the service + * @param The type of Collection that holds the aggregated data + * @param The type of exporter that exports the aggregated and processed data + * + * @opensearch.internal + */ +public abstract class QueryInsightsService, S extends Collection, E extends QueryInsightsExporter> + extends AbstractLifecycleComponent { + private static final Logger log = LogManager.getLogger(QueryInsightsService.class); + /** enable insight data collection */ + private boolean enableCollect; + + /** enable insight data export */ + private boolean enableExport; + + /** The internal store that holds the query insight data */ + @Nullable + protected S store; + + /** The exporter that exports the query insight data to certain sink */ + @Nullable + protected E exporter; + + /** The export interval of this exporter, default to 1 day */ + protected TimeValue exportInterval = QueryInsightsSettings.MIN_EXPORT_INTERVAL; + + /** The internal OpenSearch thread pool that execute async processing and exporting tasks*/ + protected final ThreadPool threadPool; + protected volatile Scheduler.Cancellable scheduledFuture; + + @Inject + public QueryInsightsService(ThreadPool threadPool, @Nullable S store, @Nullable E exporter) { + this.threadPool = threadPool; + this.store = store; + this.exporter = exporter; + } + + /** + * Ingest one record to the query insight store + * + * @param record the record to ingest + */ + protected void ingestQueryData(R record) { + if (this.store != null) { + this.store.add(record); + } + } + + /** + * Get all records that are in the query insight store, + * By default, return the records in sorted order. + * + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getQueryData() throws IllegalArgumentException { + if (!enableCollect) { + throw new IllegalArgumentException("Cannot get query data when query insight feature is not enabled."); + } + clearOutdatedData(); + List queries = new ArrayList<>(store); + queries.sort(Collections.reverseOrder()); + return queries; + } + + /** + * Clear all outdated data in the store + */ + public abstract void clearOutdatedData(); + + /** + * Restart the exporter with new config + */ + public abstract void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier); + + /** + * Clear all data in the store + */ + public void clearAllData() { + store.clear(); + } + + public void setEnableCollect(boolean enableCollect) { + this.enableCollect = enableCollect; + } + + public boolean getEnableCollect() { + return this.enableCollect; + } + + public void setEnableExport(boolean enableExport) { + this.enableExport = enableExport; + } + + public boolean getEnableExport() { + return this.enableExport; + } + + /** + * Start the Query Insight Service. + */ + @Override + protected void doStart() { + if (exporter != null && getEnableExport()) { + scheduledFuture = threadPool.scheduleWithFixedDelay(this::doExport, exportInterval, ThreadPool.Names.GENERIC); + } + } + + /** + * Stop the Query Insight Service + */ + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + if (exporter != null && getEnableExport()) { + doExport(); + } + } + } + + private void doExport() { + List storedData = getQueryData(); + try { + exporter.export(storedData); + log.debug(String.format(Locale.ROOT, "finish exporting query insight data to sink %s", storedData)); + } catch (Exception e) { + throw new RuntimeException(String.format(Locale.ROOT, "failed to export query insight data to sink, error: %s", e)); + } + } + + @Override + protected void doClose() {} + + public TimeValue getExportInterval() { + return exportInterval; + } + + /** + * Set the export interval for the exporter. + * + * @param interval export interval + */ + public void setExportInterval(TimeValue interval) { + this.exportInterval = interval; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java new file mode 100644 index 0000000000000..5068f28234f6d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Classes for Query Insights + */ +package org.opensearch.plugin.insights.core.service; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java new file mode 100644 index 0000000000000..04d1f9bfff7e1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of Query Insights + */ +package org.opensearch.plugin.insights; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecord.java new file mode 100644 index 0000000000000..0130d1551f4f9 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecord.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.action.search.SearchType; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; + +/** + * The Latency record stored in the Query Insight Framework + * + * @opensearch.internal + */ +public final class SearchQueryLatencyRecord extends SearchQueryRecord { + + private static final String PHASE_LATENCY_MAP = "phaseLatencyMap"; + private static final String TOOK = "tookInNs"; + + // latency info for each search phase + private final Map phaseLatencyMap; + + public SearchQueryLatencyRecord(final StreamInput in) throws IOException { + super(in); + this.phaseLatencyMap = in.readMap(StreamInput::readString, StreamInput::readLong); + this.setValue(in.readLong()); + } + + public SearchQueryLatencyRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final Map phaseLatencyMap, + final Long tookInNanos + ) { + super(timestamp, searchType, source, totalShards, indices, propertyMap, tookInNanos); + this.phaseLatencyMap = phaseLatencyMap; + } + + public Map getPhaseLatencyMap() { + return phaseLatencyMap; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP, this.getTimestamp()); + builder.field(SEARCH_TYPE, this.getSearchType()); + builder.field(SOURCE, this.getSource()); + builder.field(TOTAL_SHARDS, this.getTotalShards()); + builder.field(INDICES, this.getIndices()); + builder.field(PROPERTY_MAP, this.getPropertyMap()); + builder.field(PHASE_LATENCY_MAP, this.getPhaseLatencyMap()); + builder.field(TOOK, this.getValue()); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(getTimestamp()); + out.writeString(getSearchType().toString()); + out.writeString(getSource()); + out.writeInt(getTotalShards()); + out.writeStringArray(getIndices()); + out.writeMap(getPropertyMap()); + out.writeMap(phaseLatencyMap, StreamOutput::writeString, StreamOutput::writeLong); + out.writeLong(getValue()); + } + + public boolean equals(SearchQueryLatencyRecord other) { + if (!super.equals(other)) { + return false; + } + for (String key : phaseLatencyMap.keySet()) { + if (!other.getPhaseLatencyMap().containsKey(key)) { + return false; + } + if (!phaseLatencyMap.get(key).equals(other.getPhaseLatencyMap().get(key))) { + return false; + } + } + return true; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java new file mode 100644 index 0000000000000..9c99bc80c882a --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchType; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; + +/** + * Simple abstract class that represent record stored in the Query Insight Framework + * + * @param The value type associated with the record + * @opensearch.internal + */ +public abstract class SearchQueryRecord> + implements + Comparable>, + Writeable, + ToXContentObject { + + private static final Logger log = LogManager.getLogger(SearchQueryRecord.class); + protected static final String TIMESTAMP = "timestamp"; + protected static final String SEARCH_TYPE = "searchType"; + protected static final String SOURCE = "source"; + protected static final String TOTAL_SHARDS = "totalShards"; + protected static final String INDICES = "indices"; + protected static final String PROPERTY_MAP = "propertyMap"; + protected static final String VALUE = "value"; + + private final Long timestamp; + + private final SearchType searchType; + + private final String source; + + private final int totalShards; + + private final String[] indices; + + // TODO: add user-account which initialized the request in the future + private final Map propertyMap; + + private T value; + + public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { + this.timestamp = in.readLong(); + this.searchType = SearchType.fromString(in.readString().toLowerCase(Locale.ROOT)); + this.source = in.readString(); + this.totalShards = in.readInt(); + this.indices = in.readStringArray(); + this.propertyMap = in.readMap(); + } + + public SearchQueryRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final T value + ) { + this(timestamp, searchType, source, totalShards, indices, propertyMap); + this.value = value; + } + + public SearchQueryRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap + ) { + this.timestamp = timestamp; + this.searchType = searchType; + this.source = source; + this.totalShards = totalShards; + this.indices = indices; + this.propertyMap = propertyMap; + } + + /** + * The timestamp of the top query. + */ + public Long getTimestamp() { + return timestamp; + } + + /** + * The manner at which the search operation is executed. + */ + public SearchType getSearchType() { + return searchType; + } + + /** + * The search source that was executed by the query. + */ + public String getSource() { + return source; + } + + /** + * Total number of shards as part of the search query across all indices + */ + public int getTotalShards() { + return totalShards; + } + + /** + * The indices involved in the search query + */ + public String[] getIndices() { + return indices; + } + + /** + * Get the value of the query metric record + */ + public T getValue() { + return value; + } + + /** + * Set the value of the query metric record + */ + public void setValue(T value) { + this.value = value; + } + + /** + * Extra attributes and information about a search query + */ + public Map getPropertyMap() { + return propertyMap; + } + + @Override + public int compareTo(SearchQueryRecord otherRecord) { + return value.compareTo(otherRecord.getValue()); + } + + public boolean equals(SearchQueryRecord other) { + if (false == this.timestamp.equals(other.getTimestamp()) + && this.searchType.equals(other.getSearchType()) + && this.source.equals(other.getSource()) + && this.totalShards == other.getTotalShards() + && this.indices.length == other.getIndices().length + && this.propertyMap.size() == other.getPropertyMap().size() + && this.value.equals(other.getValue())) { + return false; + } + for (int i = 0; i < indices.length; i++) { + if (!indices[i].equals(other.getIndices()[i])) { + return false; + } + } + for (String key : propertyMap.keySet()) { + if (!other.getPropertyMap().containsKey(key)) { + return false; + } + if (!propertyMap.get(key).equals(other.getPropertyMap().get(key))) { + return false; + } + } + return true; + } + + @SuppressWarnings("unchecked") + private T castToValue(Object obj) throws ClassCastException { + try { + return (T) obj; + } catch (Exception e) { + log.error(String.format(Locale.ROOT, "error casting query insight record value, error: %s", e)); + throw e; + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP, timestamp); + builder.field(SEARCH_TYPE, searchType); + builder.field(SOURCE, source); + builder.field(TOTAL_SHARDS, totalShards); + builder.field(INDICES, indices); + builder.field(PROPERTY_MAP, propertyMap); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeString(searchType.toString()); + out.writeString(source); + out.writeInt(totalShards); + out.writeStringArray(indices); + out.writeMap(propertyMap); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java new file mode 100644 index 0000000000000..c59ec1550f54b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Data Models for Query Insight Records + */ +package org.opensearch.plugin.insights.rules.model; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java new file mode 100644 index 0000000000000..e271964a90fd7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.settings; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; + +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +/** + * Settings for Query Insights Plugin + * + * @opensearch.api + * @opensearch.experimental + */ +public class QueryInsightsSettings { + /** + * Default Values and Settings + */ + public static final TimeValue MAX_WINDOW_SIZE = new TimeValue(86400, TimeUnit.SECONDS); + public static final int MAX_N_SIZE = 100; + public static final TimeValue MIN_EXPORT_INTERVAL = new TimeValue(86400, TimeUnit.SECONDS);; + public static final String DEFAULT_LOCAL_INDEX_MAPPING = "mappings/top_n_queries_record.json"; + /** Default window size in seconds to keep the top N queries with latency data in query insight store */ + public static final int DEFAULT_WINDOW_SIZE = 60; + /** Default top N size to keep the data in query insight store */ + public static final int DEFAULT_TOP_N_SIZE = 3; + + /** + * Query Insights base uri + */ + public static final String PLUGINS_BASE_URI = "/_insights"; + + /** + * Settings for Top Queries + * + */ + public static final String TOP_QUERIES_BASE_URI = PLUGINS_BASE_URI + "/top_queries"; + public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.top_n_queries"; + + public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency"; + /** + * Boolean setting for enabling top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Int setting to define the top n size for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".top_n_size", + DEFAULT_TOP_N_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Time setting to define the window size in seconds for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", + new TimeValue(DEFAULT_WINDOW_SIZE, TimeUnit.SECONDS), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Settings for exporters + */ + public static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter"; + + /** + * Boolean setting for enabling top queries by latency exporter + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for top queries by latency exporter type + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_TYPE = new Setting<>( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".type", + QueryInsightsExporterType.LOCAL_INDEX.name(), + QueryInsightsExporterType::parse, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting for top queries by latency exporter interval + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".interval", + MIN_EXPORT_INTERVAL, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Setting for identifier (e.g. index name for index exporter) top queries by latency exporter + * Default value is "top_queries_since_{current_timestamp}" + */ + public static final Setting TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER = Setting.simpleString( + TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".identifier", + "top_queries_since_" + System.currentTimeMillis(), + value -> { + if (value == null || value.length() == 0) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index name for [%s]", TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX + ".identifier") + ); + } + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default constructor + */ + public QueryInsightsSettings() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java new file mode 100644 index 0000000000000..f3152bbf966cb --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Settings for Query Insights Plugin + */ +package org.opensearch.plugin.insights.settings; diff --git a/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json b/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json new file mode 100644 index 0000000000000..3bdfa835e813e --- /dev/null +++ b/plugins/query-insights/src/main/resources/mappings/top_n_queries_record.json @@ -0,0 +1,40 @@ +{ + "_meta" : { + "schema_version": 1 + }, + "properties" : { + "timestamp" : { + "type" : "date" + }, + "search_type" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "total_shards" : { + "type" : "integer" + }, + "indices" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "phase_latency_map" : { + "type" : "object" + }, + "property_map" : { + "type" : "object" + }, + "value" : { + "type" : "long" + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java new file mode 100644 index 0000000000000..45255b055984f --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Assert; + +public class QueryInsightsPluginTests extends OpenSearchTestCase { + + public void testDummy() { + Assert.assertEquals(1, 1); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..e62adc18a22f9 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + for (int i = 0; i < countOfRecords; ++i) { + Map propertyMap = new HashMap<>(); + int countOfProperties = randomIntBetween(2, 5); + for (int j = 0; j < countOfProperties; ++j) { + propertyMap.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)); + } + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + records.add( + new SearchQueryLatencyRecord( + System.currentTimeMillis(), + SearchType.QUERY_THEN_FETCH, + "{\"size\":20}", + randomIntBetween(1, 100), + randomArray(1, 3, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)), + propertyMap, + phaseLatencyMap, + phaseLatencyMap.values().stream().mapToLong(x -> x).sum() + ) + ); + } + return records; + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder(List records1, List records2) { + Set set2 = new TreeSet<>(new LatencyRecordComparator()); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } + + static class LatencyRecordComparator implements Comparator { + @Override + public int compare(SearchQueryLatencyRecord record1, SearchQueryLatencyRecord record2) { + if (record1.equals(record2)) { + return 0; + } + return record1.compareTo(record2); + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java new file mode 100644 index 0000000000000..32400af61c360 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsLocalIndexExporterTests.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.exporter; + +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link QueryInsightsLocalIndexExporter}. + */ +public class QueryInsightsLocalIndexExporterTests extends OpenSearchTestCase { + private String LOCAL_INDEX_NAME = "top-queries"; + + public void testExportWhenIndexExists() throws IOException { + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 10); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .timeout(TimeValue.timeValueSeconds(60)); + for (SearchQueryLatencyRecord record : records) { + bulkRequest.add( + new IndexRequest(LOCAL_INDEX_NAME).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + + ClusterState clusterState = mock(ClusterState.class); + RoutingTable mockRoutingTable = mock(RoutingTable.class); + when(mockRoutingTable.hasIndex(anyString())).thenReturn(true); + when(clusterState.getRoutingTable()).thenReturn(mockRoutingTable); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + QueryInsightsLocalIndexExporter queryInsightLocalIndexExporter = new QueryInsightsLocalIndexExporter<>( + clusterService, + client, + LOCAL_INDEX_NAME, + null + ); + + queryInsightLocalIndexExporter.export(records); + + verify(client, times(1)).bulk( + argThat((BulkRequest request) -> request.requests().toString().equals(bulkRequest.requests().toString())), + any() + ); + } + + public void testConcurrentExportWhenIndexExists() throws IOException, InterruptedException { + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 10); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .timeout(TimeValue.timeValueSeconds(60)); + for (SearchQueryLatencyRecord record : records) { + bulkRequest.add( + new IndexRequest(LOCAL_INDEX_NAME).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + + ClusterState clusterState = mock(ClusterState.class); + RoutingTable mockRoutingTable = mock(RoutingTable.class); + when(mockRoutingTable.hasIndex(anyString())).thenReturn(true); + when(clusterState.getRoutingTable()).thenReturn(mockRoutingTable); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + int numBulk = 50; + Thread[] threads = new Thread[numBulk]; + Phaser phaser = new Phaser(numBulk + 1); + CountDownLatch countDownLatch = new CountDownLatch(numBulk); + + final List> queryInsightLocalIndexExporters = new ArrayList<>(); + for (int i = 0; i < numBulk; i++) { + queryInsightLocalIndexExporters.add(new QueryInsightsLocalIndexExporter<>(clusterService, client, LOCAL_INDEX_NAME, null)); + } + + for (int i = 0; i < numBulk; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsLocalIndexExporter thisExporter = queryInsightLocalIndexExporters.get(finalI); + try { + thisExporter.export(records); + } catch (IOException e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(client, times(numBulk)).bulk( + argThat((BulkRequest request) -> request.requests().toString().equals(bulkRequest.requests().toString())), + any() + ); + } + + public void testExportWhenIndexNotExists() throws IOException { + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 10); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .timeout(TimeValue.timeValueSeconds(60)); + for (SearchQueryLatencyRecord record : records) { + bulkRequest.add( + new IndexRequest(LOCAL_INDEX_NAME).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + + ClusterState clusterState = mock(ClusterState.class); + RoutingTable mockRoutingTable = mock(RoutingTable.class); + when(mockRoutingTable.hasIndex(anyString())).thenReturn(false); + when(clusterState.getRoutingTable()).thenReturn(mockRoutingTable); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + final int length = randomIntBetween(1, 1024); + ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]); + InputStreamStreamInput streamInput = new InputStreamStreamInput(is); + + QueryInsightsLocalIndexExporter queryInsightLocalIndexExporter = new QueryInsightsLocalIndexExporter<>( + clusterService, + client, + LOCAL_INDEX_NAME, + streamInput + ); + + queryInsightLocalIndexExporter.export(records); + + verify(indicesAdminClient, times(1)).create( + argThat((CreateIndexRequest request) -> request.index().equals(LOCAL_INDEX_NAME)), + any() + ); + } + + public void testConcurrentExportWhenIndexNotExists() throws IOException, InterruptedException { + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesAdminClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 10); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .timeout(TimeValue.timeValueSeconds(60)); + for (SearchQueryLatencyRecord record : records) { + bulkRequest.add( + new IndexRequest(LOCAL_INDEX_NAME).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + + ClusterState clusterState = mock(ClusterState.class); + RoutingTable mockRoutingTable = mock(RoutingTable.class); + when(mockRoutingTable.hasIndex(anyString())).thenReturn(false).thenReturn(true); + when(clusterState.getRoutingTable()).thenReturn(mockRoutingTable); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + int numBulk = 50; + Thread[] threads = new Thread[numBulk]; + Phaser phaser = new Phaser(numBulk + 1); + CountDownLatch countDownLatch = new CountDownLatch(numBulk); + + final int length = randomIntBetween(1, 1024); + ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]); + InputStreamStreamInput streamInput = new InputStreamStreamInput(is); + + final List> queryInsightLocalIndexExporters = new ArrayList<>(); + for (int i = 0; i < numBulk; i++) { + queryInsightLocalIndexExporters.add( + new QueryInsightsLocalIndexExporter<>(clusterService, client, LOCAL_INDEX_NAME, streamInput) + ); + } + + for (int i = 0; i < numBulk; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsLocalIndexExporter thisExporter = queryInsightLocalIndexExporters.get(finalI); + try { + thisExporter.export(records); + } catch (IOException e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(indicesAdminClient, times(1)).create( + argThat((CreateIndexRequest request) -> request.index().equals(LOCAL_INDEX_NAME)), + any() + ); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java new file mode 100644 index 0000000000000..e704e768a43e6 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Granular tests for the {@link SearchQueryLatencyRecord} class. + */ +public class SearchQueryLatencyRecordTests extends OpenSearchTestCase { + + /** + * Check that if the serialization, deserialization and equals functions are working as expected + */ + public void testSerializationAndEquals() throws Exception { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + List copiedRecords = new ArrayList<>(); + for (SearchQueryLatencyRecord record : records) { + copiedRecords.add(roundTripRecord(record)); + } + assertTrue(QueryInsightsTestUtils.checkRecordsEquals(records, copiedRecords)); + + } + + /** + * Serialize and deserialize a SearchQueryLatencyRecord. + * @param record A SearchQueryLatencyRecord to serialize. + * @return The deserialized, "round-tripped" record. + */ + private static SearchQueryLatencyRecord roundTripRecord(SearchQueryLatencyRecord record) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + record.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new SearchQueryLatencyRecord(in); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 96cea17ff4972..f738c182c06da 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -359,7 +359,7 @@ boolean isFinalReduce() { * request. When created through {@link #subSearchRequest(SearchRequest, String[], String, long, boolean)}, this method returns * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ - long getOrCreateAbsoluteStartMillis() { + public long getOrCreateAbsoluteStartMillis() { return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index eceac7204b196..383d9b5e82fe2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -22,7 +22,7 @@ * @opensearch.internal */ @InternalApi -class SearchRequestContext { +public class SearchRequestContext { private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; @@ -47,7 +47,7 @@ void updatePhaseTookMap(String phaseName, Long tookTime) { this.phaseTookMap.put(phaseName, tookTime); } - Map phaseTookMap() { + public Map phaseTookMap() { return phaseTookMap; } @@ -70,7 +70,7 @@ void setAbsoluteStartNanos(long absoluteStartNanos) { /** * Request start time in nanos */ - long getAbsoluteStartNanos() { + public long getAbsoluteStartNanos() { return absoluteStartNanos; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2a09cc084f79f..2acb35af667f0 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -31,21 +31,21 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - abstract void onPhaseStart(SearchPhaseContext context); + protected abstract void onPhaseStart(SearchPhaseContext context); - abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); + protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context); - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean isEnabled(SearchRequest searchRequest) { + protected boolean isEnabled(SearchRequest searchRequest) { return isEnabled(); } - boolean isEnabled() { + protected boolean isEnabled() { return enabled; } @@ -69,7 +69,7 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseStart(context); @@ -80,7 +80,7 @@ void onPhaseStart(SearchPhaseContext context) { } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseEnd(context, searchRequestContext); @@ -91,7 +91,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseFailure(context); @@ -102,7 +102,7 @@ void onPhaseFailure(SearchPhaseContext context) { } @Override - void onRequestStart(SearchRequestContext searchRequestContext) { + protected void onRequestStart(SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onRequestStart(searchRequestContext); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 7f25f9026f215..74e04d976cb1c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,19 +134,19 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} @Override - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 88d599a0dcdaa..ac32b08afb7f6 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -58,12 +58,12 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); phaseStats.current.dec(); phaseStats.total.inc(); @@ -71,7 +71,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 78c5ba4412c68..1cb336e18b12c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -119,13 +119,13 @@ public void testStandardListenerAndPerRequestListenerDisabled() { public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { return new SearchRequestOperationsListener() { @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} }; } }