Skip to content

Commit

Permalink
sync bug fixes from core to the plugin repo
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jul 16, 2024
1 parent 84e6b75 commit df933e3
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 19 deletions.
33 changes: 33 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# EditorConfig: http://editorconfig.org/

root = true

[*]
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_style = space

[*.gradle]
indent_size = 2

[*.groovy]
indent_size = 4

[*.java]
indent_size = 4

[*.json]
indent_size = 2

[*.py]
indent_size = 2

[*.sh]
indent_size = 2

[*.{yml,yaml}]
indent_size = 2

[*.{xsd,xml}]
indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
/**
* Constructor for QueryInsightsListener
*
* @param clusterService The Node's cluster service.
* @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
Expand Down Expand Up @@ -91,7 +91,7 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
* @param enabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,23 @@ public int getTopNSize() {
* @param size the wanted top N size
*/
public void validateTopNSize(final int size) {
if (size > QueryInsightsSettings.MAX_N_SIZE) {
if (size < 1 || size > QueryInsightsSettings.MAX_N_SIZE) {
throw new IllegalArgumentException(
"Top N size setting for ["
+ metricType
+ "]"
+ " should be smaller than max top N size ["
+ " should be between 1 and "
+ QueryInsightsSettings.MAX_N_SIZE
+ "was ("
+ ", was ("
+ size
+ " > "
+ QueryInsightsSettings.MAX_N_SIZE
+ ")"
);
}
}

/**
* Set enable flag for the service
*
* @param enabled boolean
*/
public void setEnabled(final boolean enabled) {
Expand Down Expand Up @@ -251,7 +250,7 @@ public void validateExporterConfig(Settings settings) {
/**
* Get all top queries records that are in the current top n queries store
* Optionally include top N records from the last window.
*
* <p>
* By default, return the records in sorted order.
*
* @param includeLastWindow if the top N queries from the last window should be included
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@

package org.opensearch.plugin.insights.rules.model;

import org.apache.lucene.util.ArrayUtil;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* Valid attributes for a search query record
Expand Down Expand Up @@ -65,14 +72,78 @@ static Attribute readFromStream(final StreamInput in) throws IOException {
/**
* Write Attribute to a StreamOutput
*
* @param out the StreamOutput to write
* @param out the StreamOutput to write
* @param attribute the Attribute to write
* @throws IOException IOException
*/
static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException {
out.writeString(attribute.toString());
}

/**
* Write Attribute value to a StreamOutput
*
* @param out the StreamOutput to write
* @param attributeValue the Attribute value to write
*/
@SuppressWarnings("unchecked")
public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
if (attributeValue instanceof List) {
out.writeList((List<? extends Writeable>) attributeValue);
} else {
out.writeGenericValue(attributeValue);
}
}

/**
* Read attribute value from the input stream given the Attribute type
*
* @param in the {@link StreamInput} input to read
* @param attribute attribute type to differentiate between Source and others
* @return parse value
* @throws IOException IOException
*/
public static Object readAttributeValue(StreamInput in, Attribute attribute) throws IOException {
if (attribute == Attribute.TASK_RESOURCE_USAGES) {
return in.readList(TaskResourceInfo::readFromStream);
} else {
return in.readGenericValue();
}
}

/**
* Read attribute map from the input stream
*
* @param in the {@link StreamInput} to read
* @return parsed attribute map
* @throws IOException IOException
*/
public static Map<Attribute, Object> readAttributeMap(StreamInput in) throws IOException {
int size = readArraySize(in);
if (size == 0) {
return Collections.emptyMap();
}
Map<Attribute, Object> map = new HashMap<>(size);

for (int i = 0; i < size; i++) {
Attribute key = readFromStream(in);
Object value = readAttributeValue(in, key);
map.put(key, value);
}
return map;
}

private static int readArraySize(StreamInput in) throws IOException {
final int arraySize = in.readVInt();
if (arraySize > ArrayUtil.MAX_ARRAY_LENGTH) {
throw new IllegalStateException("array length must be <= to " + ArrayUtil.MAX_ARRAY_LENGTH + " but was: " + arraySize);
}
if (arraySize < 0) {
throw new NegativeArraySizeException("array size must be positive but was: " + arraySize);
}
return arraySize;
}

@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce
measurements = new HashMap<>();
in.readMap(MetricType::readFromStream, StreamInput::readGenericValue)
.forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o))));
this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue);
this.attributes = Attribute.readAttributeMap(in);
}

/**
Expand Down Expand Up @@ -132,7 +132,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten
public void writeTo(final StreamOutput out) throws IOException {
out.writeLong(timestamp);
out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue);
out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue);
out.writeMap(
attributes,
(stream, attribute) -> Attribute.writeTo(out, attribute),
(stream, attributeValue) -> Attribute.writeValueTo(out, attributeValue)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.Maps;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries;
Expand Down Expand Up @@ -80,6 +82,25 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int
attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100));
attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10)));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
attributes.put(
Attribute.TASK_RESOURCE_USAGES,
List.of(
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
),
new TaskResourceInfo(
randomAlphaOfLengthBetween(5, 10),
randomLongBetween(1, 1000),
randomLongBetween(1, 1000),
randomAlphaOfLengthBetween(5, 10),
new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000))
)
)
);

records.add(new SearchQueryRecord(timestamp, measurements, attributes));
timestamp += interval;
Expand Down Expand Up @@ -163,8 +184,8 @@ public static boolean checkRecordsEquals(List<SearchQueryRecord> records1, List<
return false;
} else if (value instanceof Map
&& !Maps.deepEquals((Map<Object, Object>) value, (Map<Object, Object>) attributes2.get(attribute))) {
return false;
}
return false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[] { "index-1", "index-2" };
String[] indices = new String[]{"index-1", "index-2"};

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel")
);

String[] indices = new String[] { "index-1", "index-2" };
String[] indices = new String[]{"index-1", "index-2"};

Map<String, Long> phaseLatencyMap = new HashMap<>();
phaseLatencyMap.put("expand", 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,22 @@ public void testSmallNSize() {
}

public void testValidateTopNSize() {
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); });
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1);
});
}

public void testValidateNegativeTopNSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateTopNSize(-1);
});
}

public void testGetTopQueriesWhenNotEnabled() {
topQueriesService.setEnabled(false);
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); });
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.getTopQueriesRecords(false);
});
}

public void testValidateWindowSize() {
Expand All @@ -90,8 +100,12 @@ public void testValidateWindowSize() {
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS));
});
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); });
assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); });
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS));
});
assertThrows(IllegalArgumentException.class, () -> {
topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES));
});
}

private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) {
Expand Down

0 comments on commit df933e3

Please sign in to comment.