Skip to content

Commit

Permalink
refactor exporter, enrich response, add limits to windows
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jan 19, 2024
1 parent f175179 commit e2c15ba
Show file tree
Hide file tree
Showing 21 changed files with 911 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -46,7 +45,7 @@
/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, SearchPlugin {
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
/**
* Default constructor
*/
Expand Down Expand Up @@ -97,7 +96,11 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;
Expand All @@ -21,11 +20,13 @@
* @opensearch.internal
*/
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
private QueryInsightsExporterType type;
private String identifier;

private boolean enabled = false;

/** The export interval of this exporter, default to 60 seconds */
private TimeValue exportInterval = TimeValue.timeValueSeconds(60);
QueryInsightsExporter(QueryInsightsExporterType type, String identifier) {
this.type = type;
this.identifier = identifier;
}

/**
* Export the data with the exporter.
Expand All @@ -34,24 +35,19 @@ public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
*/
public abstract void export(List<T> records) throws Exception;

public boolean getEnabled() {
return enabled;
public void setType(QueryInsightsExporterType type) {
this.type = type;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
public QueryInsightsExporterType getType() {
return type;
}

public TimeValue getExportInterval() {
return exportInterval;
public void setIdentifier(String identifier) {
this.identifier = identifier;
}

/**
* Set the export interval for the exporter.
*
* @param interval export interval
*/
public void setExportInterval(TimeValue interval) {
this.exportInterval = interval;
public String getIdentifier() {
return identifier;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import java.util.Locale;

/**
* Types for the Query Insights Exporters
*
* @opensearch.internal
*/
public enum QueryInsightsExporterType {
/* local index exporter */
LOCAL_INDEX("local_index");

private final String type;

QueryInsightsExporterType(String type) {
this.type = type;
}

public static QueryInsightsExporterType parse(String type) {
return valueOf(type.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -40,35 +37,29 @@
/**
* Class to export data collected by search query analyzers to a local OpenSearch index
* <p>
* Mainly for use within the Query Insight framework
* Internal used within the Query Insight framework
*
* @opensearch.internal
*/
public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightsExporter<T> {

private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class);
private static final int INDEX_TIMEOUT = 60;

private final ClusterService clusterService;
private final Client client;

/** The OpenSearch index name to export the data to */
private final String localIndexName;

/** The mapping for the local index that holds the data */
private final InputStream localIndexMapping;

public QueryInsightsLocalIndexExporter(
boolean enabled,
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping
) {
this.setEnabled(enabled);
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName);
this.clusterService = clusterService;
this.client = client;
this.localIndexName = localIndexName;
this.localIndexMapping = localIndexMapping;
}

Expand All @@ -91,7 +82,9 @@ public synchronized void export(List<T> records) throws IOException {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", localIndexName));
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
Expand All @@ -102,7 +95,7 @@ public void onResponse(CreateIndexResponse response) {
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
localIndexName
getIdentifier()
)
);
}
Expand All @@ -123,7 +116,7 @@ public void onFailure(Exception e) {
*/
private boolean checkIfIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(this.localIndexName);
return clusterState.getRoutingTable().hasIndex(this.getIdentifier());
}

/**
Expand All @@ -133,22 +126,11 @@ private boolean checkIfIndexExists() {
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping(getIndexMappings())
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
}

/**
* Drop the local OpenSearch Index created by the exporter
*
* @param listener the listener to be notified upon completion
* @throws IOException if an error occurs
*/
private synchronized void dropLocalIndex(ActionListener<AcknowledgedResponse> listener) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.localIndexName);
client.admin().indices().delete(deleteIndexRequest, listener);
}

/**
* Get the index mapping of the local index
*
Expand All @@ -165,55 +147,34 @@ private String getIndexMappings() throws IOException {
* @param records the data to export
* @throws IOException if an error occurs
*/
private synchronized void bulkRecord(List<T> records) throws IOException {
private void bulkRecord(List<T> records) throws IOException {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT));
for (T record : records) {
bulkRequest.add(
new IndexRequest(localIndexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
client.bulk(bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", localIndexName));
} else {
log.error(String.format(Locale.ROOT, "error when ingesting data for %s", localIndexName));
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", localIndexName, e));
}
});
}

/**
* Index one document to the predefined local OpenSearch Index
*
* @param record the document to export
* @throws IOException if an error occurs
*/
private synchronized void indexRecord(T record) throws IOException {
IndexRequest indexRequest = new IndexRequest(localIndexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT));

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format(Locale.ROOT, "successfully indexed data for %s ", localIndexName));
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier()));
} else {
log.error(String.format(Locale.ROOT, "failed to index data for %s", localIndexName));
log.error(
String.format(
Locale.ROOT,
"error when ingesting data for %s, error: %s",
getIdentifier(),
response.buildFailureMessage()
)
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "failed to index data for %s, error: %s", localIndexName, e));
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.Locale;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

Expand All @@ -42,20 +46,41 @@ public final class SearchQueryLatencyListener extends SearchRequestOperationsLis
@Inject
public SearchQueryLatencyListener(ClusterService clusterService, TopQueriesByLatencyService topQueriesByLatencyService) {
this.topQueriesByLatencyService = topQueriesByLatencyService;
this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
clusterService.getClusterSettings().addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, this::setEnabled);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_SIZE, this.topQueriesByLatencyService::setTopNSize);
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.topQueriesByLatencyService::setTopNSize,
this.topQueriesByLatencyService::validateTopNSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.topQueriesByLatencyService::setWindowSize,
this.topQueriesByLatencyService::validateWindowSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_TYPE, this.topQueriesByLatencyService::setExporterType);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_WINDOW_SIZE, this.topQueriesByLatencyService::setWindowSize);
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
this.topQueriesByLatencyService::setExportInterval,
this.topQueriesByLatencyService::validateExportInterval
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER, this.topQueriesByLatencyService::setExporterIdentifier);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED, this.topQueriesByLatencyService::setExporterEnabled);

this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

@Override
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
this.topQueriesByLatencyService.setEnabled(enabled);
this.topQueriesByLatencyService.setEnableCollect(enabled);
}

@Override
Expand Down Expand Up @@ -86,7 +111,8 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search
context.getNumShards(),
request.indices(),
new HashMap<>(),
searchRequestContext.phaseTookMap()
searchRequestContext.phaseTookMap(),
System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()
);
} catch (Exception e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
Expand Down
Loading

0 comments on commit e2c15ba

Please sign in to comment.