Skip to content

Commit

Permalink
refactor exporter, enrich response, add limits to windows
Browse files Browse the repository at this point in the history
  • Loading branch information
ansjcy committed Jan 17, 2024
1 parent f175179 commit b0fd5d2
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 115 deletions.
29 changes: 29 additions & 0 deletions plugins/query-insights/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

import org.opensearch.gradle.testclusters.RunTask
import org.opensearch.gradle.test.RestIntegTestTask

apply plugin: 'opensearch.java-rest-test'
apply plugin: 'opensearch.internal-cluster-test'
apply plugin: 'opensearch.testclusters'

opensearchplugin {
description 'OpenSearch Query Insights Plugin.'
Expand All @@ -18,3 +23,27 @@ opensearchplugin {

dependencies {
}

task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
}
tasks.named("check").configure { dependsOn(integTest) }

integTest {
// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}
testClusters.integTest {
testDistribution = "INTEG_TEST"

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
}

run {
useCluster testClusters.integTest
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;
Expand All @@ -21,11 +20,13 @@
* @opensearch.internal
*/
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
private QueryInsightsExporterType type;
private String identifier;

private boolean enabled = false;

/** The export interval of this exporter, default to 60 seconds */
private TimeValue exportInterval = TimeValue.timeValueSeconds(60);
QueryInsightsExporter(QueryInsightsExporterType type, String identifier) {
this.type = type;
this.identifier = identifier;
}

/**
* Export the data with the exporter.
Expand All @@ -34,24 +35,19 @@ public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
*/
public abstract void export(List<T> records) throws Exception;

public boolean getEnabled() {
return enabled;
public void setType(QueryInsightsExporterType type) {
this.type = type;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
public QueryInsightsExporterType getType() {
return type;
}

public TimeValue getExportInterval() {
return exportInterval;
public void setIdentifier(String identifier) {
this.identifier = identifier;
}

/**
* Set the export interval for the exporter.
*
* @param interval export interval
*/
public void setExportInterval(TimeValue interval) {
this.exportInterval = interval;
public String getIdentifier() {
return identifier;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import java.util.Locale;

/**
* Types for the Query Insights Exporters
*
* @opensearch.internal
*/
public enum QueryInsightsExporterType {
/* local index exporter */
LOCAL_INDEX("local_index");

private final String type;

QueryInsightsExporterType(String type) {
this.type = type;
}

public static QueryInsightsExporterType parse(String type) {
return valueOf(type.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,29 @@
/**
* Class to export data collected by search query analyzers to a local OpenSearch index
* <p>
* Mainly for use within the Query Insight framework
* Internal used within the Query Insight framework
*
* @opensearch.internal
*/
public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightsExporter<T> {

private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class);
private static final int INDEX_TIMEOUT = 60;

private final ClusterService clusterService;
private final Client client;

/** The OpenSearch index name to export the data to */
private final String localIndexName;

/** The mapping for the local index that holds the data */
private final InputStream localIndexMapping;

public QueryInsightsLocalIndexExporter(
boolean enabled,
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping
) {
this.setEnabled(enabled);
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName);
this.clusterService = clusterService;
this.client = client;
this.localIndexName = localIndexName;
this.localIndexMapping = localIndexMapping;
}

Expand All @@ -91,7 +85,9 @@ public synchronized void export(List<T> records) throws IOException {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", localIndexName));
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
Expand All @@ -102,7 +98,7 @@ public void onResponse(CreateIndexResponse response) {
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
localIndexName
getIdentifier()
)
);
}
Expand All @@ -123,7 +119,7 @@ public void onFailure(Exception e) {
*/
private boolean checkIfIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(this.localIndexName);
return clusterState.getRoutingTable().hasIndex(this.getIdentifier());
}

/**
Expand All @@ -133,7 +129,7 @@ private boolean checkIfIndexExists() {
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping(getIndexMappings())
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
}
Expand All @@ -145,7 +141,7 @@ private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> lis
* @throws IOException if an error occurs
*/
private synchronized void dropLocalIndex(ActionListener<AcknowledgedResponse> listener) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.localIndexName);
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.getIdentifier());
client.admin().indices().delete(deleteIndexRequest, listener);
}

Expand All @@ -170,22 +166,22 @@ private synchronized void bulkRecord(List<T> records) throws IOException {
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT));
for (T record : records) {
bulkRequest.add(
new IndexRequest(localIndexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
client.bulk(bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier()));
} else {
log.error(String.format(Locale.ROOT, "error when ingesting data for %s", localIndexName));
log.error(String.format(Locale.ROOT, "error when ingesting data for %s", getIdentifier()));
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", localIndexName, e));
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e));
}
});
}
Expand All @@ -197,23 +193,23 @@ public void onFailure(Exception e) {
* @throws IOException if an error occurs
*/
private synchronized void indexRecord(T record) throws IOException {
IndexRequest indexRequest = new IndexRequest(localIndexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
IndexRequest indexRequest = new IndexRequest(getIdentifier()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT));

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", getIdentifier()));
} else {
log.error(String.format(Locale.ROOT, "failed to index data for %s", localIndexName));
log.error(String.format(Locale.ROOT, "failed to index data for %s", getIdentifier()));
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", localIndexName, e));
log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", getIdentifier(), e));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.Locale;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

Expand All @@ -42,20 +46,41 @@ public final class SearchQueryLatencyListener extends SearchRequestOperationsLis
@Inject
public SearchQueryLatencyListener(ClusterService clusterService, TopQueriesByLatencyService topQueriesByLatencyService) {
this.topQueriesByLatencyService = topQueriesByLatencyService;
this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
clusterService.getClusterSettings().addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, this::setEnabled);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_SIZE, this.topQueriesByLatencyService::setTopNSize);
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.topQueriesByLatencyService::setTopNSize,
this.topQueriesByLatencyService::validateTopNSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.topQueriesByLatencyService::setWindowSize,
this.topQueriesByLatencyService::validateWindowSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_TYPE, this.topQueriesByLatencyService::setExporterType);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_WINDOW_SIZE, this.topQueriesByLatencyService::setWindowSize);
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
this.topQueriesByLatencyService::setExportInterval,
this.topQueriesByLatencyService::validateExportInterval
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER, this.topQueriesByLatencyService::setExporterIdentifier);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED, this.topQueriesByLatencyService::setExporterEnabled);

this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

@Override
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
this.topQueriesByLatencyService.setEnabled(enabled);
this.topQueriesByLatencyService.setEnableCollect(enabled);
}

@Override
Expand Down Expand Up @@ -86,7 +111,8 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search
context.getNumShards(),
request.indices(),
new HashMap<>(),
searchRequestContext.phaseTookMap()
searchRequestContext.phaseTookMap(),
System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()
);
} catch (Exception e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
Expand Down
Loading

0 comments on commit b0fd5d2

Please sign in to comment.