Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task consumer Integration #2293

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions distribution/docker/src/docker/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index
logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.additivity = false

appender.task_detailslog_rolling.type = Console
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog

logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.additivity = false
37 changes: 37 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,40 @@ logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old
logger.index_indexing_slowlog.additivity = false

######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.filePermissions = rw-r-----
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata

appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling.strategy.max = 4
#################################################
######## Task details log - old style pattern ####
appender.task_detailslog_rolling_old.type = RollingFile
appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old
appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log
appender.task_detailslog_rolling_old.filePermissions = rw-r-----
appender.task_detailslog_rolling_old.layout.type = PatternLayout
appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n

appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.policies.type = Policies
appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling_old.policies.size.size = 1GB
appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling_old.strategy.max = 4
#################################################
logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old
logger.task_detailslog_rolling.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException {
Properties properties = new Properties();
properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES)));
assertThat(properties, is(notNullValue()));
assertThat(properties.entrySet(), hasSize(137));
assertThat(properties.entrySet(), hasSize(165));
assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout"));
assertThat(
properties.get("appender.deprecation_rolling.fileName"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,38 @@ logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old
logger.index_indexing_slowlog.additivity = false

######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.layout.type = ESJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.esmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata

appender.task_detailslog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling.strategy.max = 4
#################################################
######## Task details log - old style pattern ####
appender.task_detailslog_rolling_old.type = RollingFile
appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old
appender.task_detailslog_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.log
appender.task_detailslog_rolling_old.layout.type = PatternLayout
appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n

appender.task_detailslog_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.log.gz
appender.task_detailslog_rolling_old.policies.type = Policies
appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling_old.policies.size.size = 1GB
appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling_old.strategy.max = 4
#################################################
logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old
logger.task_detailslog_rolling.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@

package org.opensearch.action.search;

import org.opensearch.common.MemoizedSupplier;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Supplier;

/**
* Task storing information about a currently running search shard request.
Expand All @@ -46,9 +48,28 @@
* @opensearch.internal
*/
public class SearchShardTask extends CancellableTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
}

public SearchShardTask(
long id,
String type,
String action,
String description,
TaskId parentTaskId,
Map<String, String> headers,
Supplier<String> metadataSupplier
) {
super(id, type, action, description, parentTaskId, headers);
this.metadataSupplier = new MemoizedSupplier<>(metadataSupplier);
}

public String getTaskMetadata() {
return metadataSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
Expand Down Expand Up @@ -577,7 +578,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.index.Index;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
Expand All @@ -71,6 +72,7 @@
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Collections;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -85,6 +87,8 @@
* @opensearch.internal
*/
public class ShardSearchRequest extends TransportRequest implements IndicesRequest {
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

private final String clusterAlias;
private final ShardId shardId;
private final int numberOfShards;
Expand Down Expand Up @@ -501,7 +505,7 @@ public String getClusterAlias() {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier);
}

@Override
Expand All @@ -510,6 +514,16 @@ public String getDescription() {
return "shardId[" + shardId() + "]";
}

public String getMetadataSupplier() {
StringBuilder sb = new StringBuilder();
if (source != null) {
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
}
return sb.toString();
}

public Rewriteable<Rewriteable> getRewriteable() {
return new RequestRewritable(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public IndicesOptions indicesOptions() {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier);
}

public String getDescription() {
Expand All @@ -137,4 +137,7 @@ public String getDescription() {
return sb.toString();
}

public String getMetadataSupplier() {
return shardSearchRequest().getMetadataSupplier();
}
}
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand All @@ -75,6 +78,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -92,6 +96,15 @@ public class TaskManager implements ClusterStateApplier {

private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);

public static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "task_resource_consumers.enabled";

public static final Setting<Boolean> TASK_RESOURCE_CONSUMERS_ENABLED = Setting.boolSetting(
TASK_RESOURCE_CONSUMERS_ATTRIBUTES,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Rest headers that are copied to the task
*/
Expand All @@ -116,10 +129,26 @@ public class TaskManager implements ClusterStateApplier {
private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap();
private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>();

private volatile boolean taskResourceConsumersEnabled;
private final Set<Consumer<Task>> taskResourceConsumer;

public static TaskManager createTaskManagerWithClusterSettings(
andrross marked this conversation as resolved.
Show resolved Hide resolved
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
Set<String> taskHeaders
) {
final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders);
clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled);
return taskManager;
}

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings));
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand All @@ -135,6 +164,10 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService taskResou
this.taskResourceTrackingService.set(taskResourceTrackingService);
}

public void setTaskResourceConsumersEnabled(boolean taskResourceConsumersEnabled) {
this.taskResourceConsumersEnabled = taskResourceConsumersEnabled;
}

/**
* Registers a task without parent task
*/
Expand Down Expand Up @@ -240,6 +273,16 @@ public Task unregister(Task task) {
// Decrement the task's self-thread as part of unregistration.
task.decrementResourceTrackingThreads();

if (taskResourceConsumersEnabled) {
for (Consumer<Task> taskConsumer : taskResourceConsumer) {
try {
taskConsumer.accept(task);
} catch (Exception e) {
logger.error("error encountered when updating the consumer", e);
}
}
}

if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task.getId());
if (holder != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks.consumer;

import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.logging.OpenSearchLogMessage;

import java.util.HashMap;
import java.util.Map;

/**
* Search shard task information that will be extracted from Task and converted into
* format that will be logged
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
*
* @opensearch.internal
*/
public final class SearchShardTaskDetailsLogMessage extends OpenSearchLogMessage {
SearchShardTaskDetailsLogMessage(SearchShardTask task) {
super(prepareMap(task), message(task));
}

private static Map<String, Object> prepareMap(SearchShardTask task) {
Map<String, Object> messageFields = new HashMap<>();
messageFields.put("taskId", task.getId());
messageFields.put("type", task.getType());
messageFields.put("action", task.getAction());
messageFields.put("description", task.getDescription());
messageFields.put("start_time_millis", task.getStartTime());
messageFields.put("parentTaskId", task.getParentTaskId());
messageFields.put("resource_stats", task.getResourceStats());
messageFields.put("metadata", task.getTaskMetadata());
return messageFields;
}

// Message will be used in plaintext logs
private static String message(SearchShardTask task) {
StringBuilder sb = new StringBuilder();
sb.append("taskId:[")
.append(task.getId())
.append("], ")
.append("type:[")
.append(task.getType())
.append("], ")
.append("action:[")
.append(task.getAction())
.append("], ")
.append("description:[")
.append(task.getDescription())
.append("], ")
.append("start_time_millis:[")
.append(task.getStartTime())
.append("], ")
.append("resource_stats:[")
.append(task.getResourceStats())
.append("], ")
.append("metadata:[")
.append(task.getTaskMetadata())
.append("]");
return sb.toString();
}
}
Loading