Skip to content

Commit

Permalink
Task consumer Integration (#2293)
Browse files Browse the repository at this point in the history
* Integrate task consumers to capture task resource information during unregister.
  Add consumer that logs topN expensive search tasks

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
(cherry picked from commit fbe93d4)
  • Loading branch information
sruti1312 authored and github-actions[bot] committed Aug 5, 2022
1 parent 89c30b2 commit 9d8f564
Show file tree
Hide file tree
Showing 20 changed files with 545 additions and 14 deletions.
10 changes: 10 additions & 0 deletions distribution/docker/src/docker/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index
logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.additivity = false

appender.task_detailslog_rolling.type = Console
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog

logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.additivity = false
37 changes: 37 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,40 @@ logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old
logger.index_indexing_slowlog.additivity = false

######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.filePermissions = rw-r-----
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata

appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling.strategy.max = 4
#################################################
######## Task details log - old style pattern ####
appender.task_detailslog_rolling_old.type = RollingFile
appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old
appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log
appender.task_detailslog_rolling_old.filePermissions = rw-r-----
appender.task_detailslog_rolling_old.layout.type = PatternLayout
appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n

appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.policies.type = Policies
appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling_old.policies.size.size = 1GB
appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling_old.strategy.max = 4
#################################################
logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old
logger.task_detailslog_rolling.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException {
Properties properties = new Properties();
properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES)));
assertThat(properties, is(notNullValue()));
assertThat(properties.entrySet(), hasSize(137));
assertThat(properties.entrySet(), hasSize(165));
assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout"));
assertThat(
properties.get("appender.deprecation_rolling.fileName"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,38 @@ logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old
logger.index_indexing_slowlog.additivity = false

######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.layout.type = ESJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.esmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata

appender.task_detailslog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling.strategy.max = 4
#################################################
######## Task details log - old style pattern ####
appender.task_detailslog_rolling_old.type = RollingFile
appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old
appender.task_detailslog_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.log
appender.task_detailslog_rolling_old.layout.type = PatternLayout
appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n

appender.task_detailslog_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.policies.type = Policies
appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling_old.policies.size.size = 1GB
appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling_old.strategy.max = 4
#################################################
logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old
logger.task_detailslog_rolling.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@

package org.opensearch.action.search;

import org.opensearch.common.MemoizedSupplier;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Supplier;

/**
* Task storing information about a currently running search shard request.
Expand All @@ -46,9 +48,28 @@
* @opensearch.internal
*/
public class SearchShardTask extends CancellableTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
}

public SearchShardTask(
long id,
String type,
String action,
String description,
TaskId parentTaskId,
Map<String, String> headers,
Supplier<String> metadataSupplier
) {
super(id, type, action, description, parentTaskId, headers);
this.metadataSupplier = new MemoizedSupplier<>(metadataSupplier);
}

public String getTaskMetadata() {
return metadataSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
Expand Down Expand Up @@ -575,7 +576,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.index.Index;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
Expand All @@ -71,6 +72,7 @@
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Collections;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -85,6 +87,8 @@
* @opensearch.internal
*/
public class ShardSearchRequest extends TransportRequest implements IndicesRequest {
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

private final String clusterAlias;
private final ShardId shardId;
private final int numberOfShards;
Expand Down Expand Up @@ -501,7 +505,7 @@ public String getClusterAlias() {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier);
}

@Override
Expand All @@ -510,6 +514,16 @@ public String getDescription() {
return "shardId[" + shardId() + "]";
}

public String getMetadataSupplier() {
StringBuilder sb = new StringBuilder();
if (source != null) {
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
}
return sb.toString();
}

public Rewriteable<Rewriteable> getRewriteable() {
return new RequestRewritable(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public IndicesOptions indicesOptions() {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier);
}

public String getDescription() {
Expand All @@ -137,4 +137,7 @@ public String getDescription() {
return sb.toString();
}

public String getMetadataSupplier() {
return shardSearchRequest().getMetadataSupplier();
}
}
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand All @@ -75,6 +78,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -92,6 +96,15 @@ public class TaskManager implements ClusterStateApplier {

private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);

public static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "task_resource_consumers.enabled";

public static final Setting<Boolean> TASK_RESOURCE_CONSUMERS_ENABLED = Setting.boolSetting(
TASK_RESOURCE_CONSUMERS_ATTRIBUTES,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Rest headers that are copied to the task
*/
Expand All @@ -116,10 +129,26 @@ public class TaskManager implements ClusterStateApplier {
private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap();
private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>();

private volatile boolean taskResourceConsumersEnabled;
private final Set<Consumer<Task>> taskResourceConsumer;

public static TaskManager createTaskManagerWithClusterSettings(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
) {
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders);
clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled);
return taskManager;
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings));
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand All @@ -135,6 +164,10 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService taskResou
this.taskResourceTrackingService.set(taskResourceTrackingService);
}

public void setTaskResourceConsumersEnabled(boolean taskResourceConsumersEnabled) {
this.taskResourceConsumersEnabled = taskResourceConsumersEnabled;
}

/**
* Registers a task without parent task
*/
Expand Down Expand Up @@ -240,6 +273,16 @@ public Task unregister(Task task) {
// Decrement the task's self-thread as part of unregistration.
task.decrementResourceTrackingThreads();

if (taskResourceConsumersEnabled) {
for (Consumer<Task> taskConsumer : taskResourceConsumer) {
try {
taskConsumer.accept(task);
} catch (Exception e) {
logger.error("error encountered when updating the consumer", e);
}
}
}

if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task.getId());
if (holder != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks.consumer;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.logging.OpenSearchLogMessage;

import java.util.HashMap;
import java.util.Map;

/**
* Search shard task information that will be extracted from Task and converted into
* format that will be logged
*
* @opensearch.internal
*/
public final class SearchShardTaskDetailsLogMessage extends OpenSearchLogMessage {
SearchShardTaskDetailsLogMessage(SearchShardTask task) {
super(prepareMap(task), message(task));
}

private static Map<String, Object> prepareMap(SearchShardTask task) {
Map<String, Object> messageFields = new HashMap<>();
messageFields.put("taskId", task.getId());
messageFields.put("type", task.getType());
messageFields.put("action", task.getAction());
messageFields.put("description", task.getDescription());
messageFields.put("start_time_millis", task.getStartTime());
messageFields.put("parentTaskId", task.getParentTaskId());
messageFields.put("resource_stats", task.getResourceStats());
messageFields.put("metadata", task.getTaskMetadata());
return messageFields;
}

// Message will be used in plaintext logs
private static String message(SearchShardTask task) {
StringBuilder sb = new StringBuilder();
sb.append("taskId:[")
.append(task.getId())
.append("], ")
.append("type:[")
.append(task.getType())
.append("], ")
.append("action:[")
.append(task.getAction())
.append("], ")
.append("description:[")
.append(task.getDescription())
.append("], ")
.append("start_time_millis:[")
.append(task.getStartTime())
.append("], ")
.append("resource_stats:[")
.append(task.getResourceStats())
.append("], ")
.append("metadata:[")
.append(task.getTaskMetadata())
.append("]");
return sb.toString();
}
}
Loading

0 comments on commit 9d8f564

Please sign in to comment.