Skip to content

Commit

Permalink
Disable task consumers by default and update tests
Browse files Browse the repository at this point in the history
Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
  • Loading branch information
sruti1312 committed Mar 31, 2022
1 parent 25158dc commit e9ed69f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 41 deletions.
12 changes: 4 additions & 8 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,13 @@ logger.index_indexing_slowlog.additivity = false
######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.filePermissions = rw-r-----
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata

appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
Expand All @@ -217,14 +215,12 @@ appender.task_detailslog_rolling.strategy.max = 4
######## Task details log - old style pattern ####
appender.task_detailslog_rolling_old.type = RollingFile
appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old
appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_task_detailslog.log
appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log
appender.task_detailslog_rolling_old.filePermissions = rw-r-----
appender.task_detailslog_rolling_old.layout.type = PatternLayout
appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n

appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\
_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.policies.type = Policies
appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling_old.policies.size.size = 1GB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException {
Properties properties = new Properties();
properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES)));
assertThat(properties, is(notNullValue()));
assertThat(properties.entrySet(), hasSize(137));
assertThat(properties.entrySet(), hasSize(165));
assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout"));
assertThat(
properties.get("appender.deprecation_rolling.fileName"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ PercentilesConfig configOrDefault() {

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.array(valuesField.getPreferredName(), values);
if (valuesField != null) {
builder.array(valuesField.getPreferredName(), values);
}
builder.field(KEYED_FIELD.getPreferredName(), keyed);
builder = configOrDefault().toXContent(builder, params);
return builder;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class TaskManager implements ClusterStateApplier {

private static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "cluster.task.consumers";

private static final Setting<Boolean> TASK_RESOURCE_CONSUMERS_SETTINGS = Setting.boolSetting(
private static final Setting<Boolean> TASK_RESOURCE_CONSUMERS_SETTING = Setting.boolSetting(
TASK_RESOURCE_CONSUMERS_ATTRIBUTES,
true,
Setting.Property.Dynamic,
Expand Down Expand Up @@ -130,7 +130,7 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_SETTINGS.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_SETTING.get(settings);
this.taskResourceConsumer = new ArrayList<Consumer<Task>>() {
{
add(new TopNSearchTasksLogger(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public final class TaskDetailsLogMessage extends OpenSearchLogMessage {
TaskDetailsLogMessage(Task task) {
Expand All @@ -27,7 +26,7 @@ private static Map<String, Object> prepareMap(Task task) {
messageFields.put("type", task.getType());
messageFields.put("action", task.getAction());
messageFields.put("description", task.getDescription());
messageFields.put("start_time_millis", TimeUnit.NANOSECONDS.toMillis(task.getStartTime()));
messageFields.put("start_time_millis", task.getStartTime());
messageFields.put("parentTaskId", task.getParentTaskId());
messageFields.put("resource_stats", task.getResourceStats());
messageFields.put("metadata", task instanceof SearchShardTask ? ((SearchShardTask) task).getTaskMetadata() : null);
Expand All @@ -50,7 +49,7 @@ private static String message(Task task) {
.append(task.getDescription())
.append("], ")
.append("start_time_millis:[")
.append(TimeUnit.NANOSECONDS.toMillis(task.getStartTime()))
.append(task.getStartTime())
.append("], ")
.append("resource_stats:[")
.append(task.getResourceStats())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.Task;

import java.time.Instant;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
Expand All @@ -34,29 +35,30 @@ public class TopNSearchTasksLogger implements Consumer<Task> {

private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search");

// By default, logs top 10 memory expensive search request in the last 60 sec.
private static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTINGS = Setting.intSetting(
// number of memory expensive search tasks that are logged
private static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
LOG_TOP_QUERIES_SIZE,
10,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final Setting<Long> LOG_TOP_QUERIES_FREQUENCY_SETTINGS = Setting.longSetting(
// frequency in which memory expensive search tasks are logged
private static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
LOG_TOP_QUERIES_FREQUENCY,
60_000L,
TimeValue.timeValueSeconds(60L),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final int topQueriesSize;
private final long topQueriesLogFrequencyInMillis;
private final long topQueriesLogFrequencyInNanos;
private final Queue<Tuple<Long, Task>> topQueries;
private long lastReportedTimeInMillis = Instant.now().toEpochMilli();
private long lastReportedTimeInNanos = System.nanoTime();

public TopNSearchTasksLogger(Settings settings) {
this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTINGS.get(settings);
this.topQueriesLogFrequencyInMillis = LOG_TOP_QUERIES_FREQUENCY_SETTINGS.get(settings);
this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings);
this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos();
this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1));
Loggers.setLevel(SEARCH_TASK_DETAILS_LOGGER, "info");
}
Expand All @@ -71,12 +73,11 @@ public void accept(Task task) {
}
}

// TODO: Need performance testing results to understand if we can to use synchronized here.
private synchronized void recordSearchTask(final Task searchTask) {
final long memory_in_bytes = searchTask.getResourceStats().get("memory");
if (Instant.now().toEpochMilli() - lastReportedTimeInMillis > topQueriesLogFrequencyInMillis) {
final long memory_in_bytes = searchTask.getTotalResourceUtilization(ResourceStats.MEMORY);
if (System.nanoTime() - lastReportedTimeInNanos >= topQueriesLogFrequencyInNanos) {
logTopResourceConsumingQueries();
lastReportedTimeInMillis = Instant.now().toEpochMilli();
lastReportedTimeInNanos = System.nanoTime();
}
if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) {
// evict the element
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.tasks.consumer;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.ResourceStatsType;
import org.opensearch.tasks.ResourceUsageMetric;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

Expand All @@ -34,7 +37,25 @@ public void testTaskDetailsLogHasJsonFields() {
assertThat(p.getValueFor("action"), equalTo("n/a"));
assertThat(p.getValueFor("description"), equalTo("test"));
assertThat(p.getValueFor("parentTaskId"), equalTo(null));
assertThat(p.getValueFor("resource_stats"), equalTo("{memory=100, cpu=100}"));
// when no resource information present
assertThat(p.getValueFor("resource_stats"), equalTo("{}"));
assertThat(p.getValueFor("metadata"), equalTo("test_metadata"));

task.startThreadResourceTracking(
0,
ResourceStatsType.WORKER_STATS,
new ResourceUsageMetric(ResourceStats.MEMORY, 0L),
new ResourceUsageMetric(ResourceStats.CPU, 0L)
);
task.updateThreadResourceStats(
0,
ResourceStatsType.WORKER_STATS,
new ResourceUsageMetric(ResourceStats.MEMORY, 100),
new ResourceUsageMetric(ResourceStats.CPU, 100)
);
assertThat(
p.getValueFor("resource_stats"),
equalTo("{0=[{cpu_time_in_nanos=100, memory_in_bytes=100}, stats_type=worker_stats, is_active=true]}")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.MockAppender;
import org.opensearch.common.settings.Settings;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.ResourceStatsType;
import org.opensearch.tasks.ResourceUsageMetric;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

Expand Down Expand Up @@ -46,33 +49,25 @@ public static void cleanup() {
}

public void testLoggerWithTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, 10).build();
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);

generateTasks(5);
// sleeping enough time for the logger to complete publishing entries to log file
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
// ignore
}
generateTasks(1);
LogEvent logEvent = appender.getLastEventAndReset();
assertNotNull(logEvent);
assertEquals(logEvent.getLevel(), Level.INFO);
assertTrue(logEvent.getMessage().getFormattedMessage().contains("memory=400, cpu=400"));
assertTrue(logEvent.getMessage().getFormattedMessage().contains("cpu_time_in_nanos=300, memory_in_bytes=300"));
}

public void testLoggerWithoutTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, 10).build();
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);

assertNull(appender.getLastEventAndReset());
}

public void testLoggerWithHighFrequency() {
// setting the frequency to a really large value and confirming that nothing gets written to log file.
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, 100_000).build();
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
generateTasks(5);
generateTasks(2);
Expand All @@ -92,6 +87,18 @@ public void generateTasks(int numberOfTasks) {
Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"),
() -> "n/a"
);
task.startThreadResourceTracking(
i,
ResourceStatsType.WORKER_STATS,
new ResourceUsageMetric(ResourceStats.MEMORY, 0L),
new ResourceUsageMetric(ResourceStats.CPU, 0L)
);
task.updateThreadResourceStats(
i,
ResourceStatsType.WORKER_STATS,
new ResourceUsageMetric(ResourceStats.MEMORY, i * 100L),
new ResourceUsageMetric(ResourceStats.CPU, i * 100L)
);
topNSearchTasksLogger.accept(task);
}
}
Expand Down

0 comments on commit e9ed69f

Please sign in to comment.