diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..5be7b26
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,33 @@
+# EditorConfig: http://editorconfig.org/
+
+root = true
+
+[*]
+charset = utf-8
+trim_trailing_whitespace = true
+insert_final_newline = true
+indent_style = space
+
+[*.gradle]
+indent_size = 2
+
+[*.groovy]
+indent_size = 4
+
+[*.java]
+indent_size = 4
+
+[*.json]
+indent_size = 2
+
+[*.py]
+indent_size = 2
+
+[*.sh]
+indent_size = 2
+
+[*.{yml,yaml}]
+indent_size = 2
+
+[*.{xsd,xml}]
+indent_size = 4
diff --git a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
index 03c4011..8f18676 100644
--- a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
+++ b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
@@ -52,7 +52,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
/**
* Constructor for QueryInsightsListener
*
- * @param clusterService The Node's cluster service.
+ * @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
@@ -91,7 +91,7 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
* and query insights services.
*
* @param metricType {@link MetricType}
- * @param enabled boolean
+ * @param enabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
index 0814e66..a6b7b79 100644
--- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
+++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
@@ -136,17 +136,15 @@ public int getTopNSize() {
* @param size the wanted top N size
*/
public void validateTopNSize(final int size) {
- if (size > QueryInsightsSettings.MAX_N_SIZE) {
+ if (size < 1 || size > QueryInsightsSettings.MAX_N_SIZE) {
throw new IllegalArgumentException(
"Top N size setting for ["
+ metricType
+ "]"
- + " should be smaller than max top N size ["
+ + " should be between 1 and "
+ QueryInsightsSettings.MAX_N_SIZE
- + "was ("
+ + ", was ("
+ size
- + " > "
- + QueryInsightsSettings.MAX_N_SIZE
+ ")"
);
}
@@ -154,6 +152,7 @@ public void validateTopNSize(final int size) {
/**
* Set enable flag for the service
+ *
* @param enabled boolean
*/
public void setEnabled(final boolean enabled) {
@@ -251,7 +250,7 @@ public void validateExporterConfig(Settings settings) {
/**
* Get all top queries records that are in the current top n queries store
* Optionally include top N records from the last window.
- *
+ *
* By default, return the records in sorted order.
*
* @param includeLastWindow if the top N queries from the last window should be included
diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
index 9d341d8..9d9f3db 100644
--- a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
+++ b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
@@ -8,11 +8,18 @@
package org.opensearch.plugin.insights.rules.model;
+import org.apache.lucene.util.ArrayUtil;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Locale;
+import java.util.Map;
/**
* Valid attributes for a search query record
@@ -65,7 +72,7 @@ static Attribute readFromStream(final StreamInput in) throws IOException {
/**
* Write Attribute to a StreamOutput
*
- * @param out the StreamOutput to write
+ * @param out the StreamOutput to write
* @param attribute the Attribute to write
* @throws IOException IOException
*/
@@ -73,6 +80,70 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO
out.writeString(attribute.toString());
}
+ /**
+ * Write Attribute value to a StreamOutput
+ *
+ * @param out the StreamOutput to write
+ * @param attributeValue the Attribute value to write
+ */
+ @SuppressWarnings("unchecked")
+ public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
+ if (attributeValue instanceof List) {
+ out.writeList((List extends Writeable>) attributeValue);
+ } else {
+ out.writeGenericValue(attributeValue);
+ }
+ }
+
+ /**
+ * Read attribute value from the input stream given the Attribute type
+ *
+ * @param in the {@link StreamInput} input to read
+ * @param attribute attribute type to differentiate between Source and others
+ * @return parse value
+ * @throws IOException IOException
+ */
+ public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException {
+ if (attribute == Attribute.TASK_RESOURCE_USAGES) {
+ return in.readList(TaskResourceInfo::readFromStream);
+ } else {
+ return in.readGenericValue();
+ }
+ }
+
+ /**
+ * Read attribute map from the input stream
+ *
+ * @param in the {@link StreamInput} to read
+ * @return parsed attribute map
+ * @throws IOException IOException
+ */
+ public static Map readAttributeMap(StreamInput in) throws IOException {
+ int size = readArraySize(in);
+ if (size == 0) {
+ return Collections.emptyMap();
+ }
+ Map map = new HashMap<>(size);
+
+ for (int i = 0; i < size; i++) {
+ Attribute key = readFromStream(in);
+ Object value = readAttributeValue(in, key);
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ private static int readArraySize(StreamInput in) throws IOException {
+ final int arraySize = in.readVInt();
+ if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) {
+ throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize);
+ }
+ if (arraySize < 0) {
+ throw new NegativeArraySizeException("array size must be positive but was: " + arraySize);
+ }
+ return arraySize;
+ }
+
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
index e241a9e..4b8d7a4 100644
--- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
+++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
@@ -43,7 +43,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce
measurements = new HashMap<>();
in.readMap(MetricType::readFromStream, StreamInput::readGenericValue)
.forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o))));
- this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue);
+ this.attributes = Attribute.readAttributeMap(in);
}
/**
@@ -132,7 +132,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten
public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
- out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue);
+ out.writeMap(
+ attributes,
+ (stream, attribute) -> Attribute.writeTo(out, attribute),
+ (stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
+ );
}
/**
diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java
index 7fa4e98..9b97e5f 100644
--- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java
+++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java
@@ -12,6 +12,8 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.Maps;
+import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
+import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
@@ -80,6 +82,25 @@ public static List generateQueryInsightRecords(int lower, int
attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100));
attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10)));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
+ attributes.put(
+ Attribute.TASK_RESOURCE_USAGES,
+ List.of(
+ new TaskResourceInfo(
+ randomAlphaOfLengthBetween(5, 10),
+ randomLongBetween(1, 1000),
+ randomLongBetween(1, 1000),
+ randomAlphaOfLengthBetween(5, 10),
+ new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
+ ),
+ new TaskResourceInfo(
+ randomAlphaOfLengthBetween(5, 10),
+ randomLongBetween(1, 1000),
+ randomLongBetween(1, 1000),
+ randomAlphaOfLengthBetween(5, 10),
+ new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
+ )
+ )
+ );
records.add(new SearchQueryRecord(timestamp, measurements, attributes));
timestamp += interval;
@@ -163,8 +184,8 @@ public static boolean checkRecordsEquals(List records1, List<
return false;
} else if (value instanceof Map
&& !Maps.deepEquals((Map