Skip to content

Commit

Permalink
refactor the code to make it more maintainable
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jun 7, 2024
1 parent 0b18782 commit 2d19224
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Locale;
import java.util.Set;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume
}
switch (type) {
case LOCAL_INDEX:
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
if (indexPattern.length() == 0) {
throw new IllegalArgumentException("Empty index pattern configured for the exporter");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;

/**
* The listener for query insights services.
Expand All @@ -67,63 +61,30 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_CPU_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.CPU, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_CPU_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_CPU_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.CPU).validateWindowSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_MEMORY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.MEMORY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_MEMORY_QUERIES_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.MEMORY).validateWindowSize(v)
);
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.CPU, clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.CPU)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.CPU)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_CPU_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.MEMORY, clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.MEMORY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.MEMORY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_MEMORY_QUERIES_WINDOW_SIZE));
// Setting endpoints set up for top n queries, including enabling top n queries, window size and top n size
// Expected metricTypes are Latency, CPU and Memory.
for (MetricType type : MetricType.allMetricTypes()) {
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(getTopNEnabledSetting(type), v -> this.setEnableTopQueries(type, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getTopNSizeSetting(type),
v -> this.queryInsightsService.setTopNSize(type, v),
v -> this.queryInsightsService.validateTopNSize(type, v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
getTopNWindowSizeSetting(type),
v -> this.queryInsightsService.setWindowSize(type, v),
v -> this.queryInsightsService.validateWindowSize(type, v)
);

this.setEnableTopQueries(type, clusterService.getClusterSettings().get(getTopNEnabledSetting(type)));
this.queryInsightsService.validateTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type)));
this.queryInsightsService.setTopNSize(type, clusterService.getClusterSettings().get(getTopNSizeSetting(type)));
this.queryInsightsService.validateWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}
}

/**
Expand Down Expand Up @@ -175,6 +136,7 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {}
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
constructSearchQueryRecord(context, searchRequestContext);
}

@Override
public void onRequestFailure(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
constructSearchQueryRecord(context, searchRequestContext);
Expand Down Expand Up @@ -220,7 +182,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASKS_RESOURCE_USAGES, tasksResourceUsages);
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
Expand All @@ -27,7 +29,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
* Service responsible for gathering, analyzing, storing and exporting
Expand Down Expand Up @@ -86,11 +88,13 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
enableCollect.put(metricType, false);
topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory));
}
clusterSettings.addSettingsUpdateConsumer(
TOP_N_LATENCY_EXPORTER_SETTINGS,
(settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)),
(settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings))
);
for (MetricType type : MetricType.allMetricTypes()) {
clusterSettings.addSettingsUpdateConsumer(
getExporterSettings(type),
(settings -> setExporter(type, settings)),
(settings -> validateExporterConfig(type, settings))
);
}
}

/**
Expand Down Expand Up @@ -177,6 +181,78 @@ public boolean isEnabled() {
return false;
}

/**
* Validate the window size config for a metricType
*
* @param type {@link MetricType}
* @param windowSize {@link TimeValue}
*/
public void validateWindowSize(final MetricType type, final TimeValue windowSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateWindowSize(windowSize);
}
}

/**
* Set window size for a metricType
*
* @param type {@link MetricType}
* @param windowSize {@link TimeValue}
*/
public void setWindowSize(final MetricType type, final TimeValue windowSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setWindowSize(windowSize);
}
}

/**
* Validate the top n size config for a metricType
*
* @param type {@link MetricType}
* @param topNSize top n size
*/
public void validateTopNSize(final MetricType type, final int topNSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateTopNSize(topNSize);
}
}

/**
* Set the top n size config for a metricType
*
* @param type {@link MetricType}
* @param topNSize top n size
*/
public void setTopNSize(final MetricType type, final int topNSize) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setTopNSize(topNSize);
}
}

/**
* Set the exporter config for a metricType
*
* @param type {@link MetricType}
* @param settings exporter settings
*/
public void setExporter(final MetricType type, final Settings settings) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).setExporter(settings);
}
}

/**
* Validate the exporter config for a metricType
*
* @param type {@link MetricType}
* @param settings exporter settings
*/
public void validateExporterConfig(final MetricType type, final Settings settings) {
if (topQueriesServices.containsKey(type)) {
topQueriesServices.get(type).validateExporterConfig(settings);
}
}

@Override
protected void doStart() {
if (isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
Expand Down Expand Up @@ -218,10 +218,7 @@ public void setExporter(final Settings settings) {
if (settings.get(EXPORTER_TYPE) != null) {
SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) {
queryInsightsExporterFactory.updateExporter(
exporter,
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
);
queryInsightsExporterFactory.updateExporter(exporter, settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN));
} else {
try {
queryInsightsExporterFactory.closeExporter(this.exporter);
Expand All @@ -230,7 +227,7 @@ public void setExporter(final Settings settings) {
}
this.exporter = queryInsightsExporterFactory.createExporter(
SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)),
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN)
);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public enum Attribute {
/**
* Tasks level resource usages in this request
*/
TASKS_RESOURCE_USAGES,
TASK_RESOURCE_USAGES,
/**
* Custom search request labels
*/
Expand Down
Loading

0 comments on commit 2d19224

Please sign in to comment.