From 4744ffebe78f0766f99defccb1fa38224ecc323e Mon Sep 17 00:00:00 2001 From: Sruti Parthiban Date: Tue, 9 Nov 2021 12:55:04 -0800 Subject: [PATCH] Add stats to task framework Signed-off-by: Sruti Parthiban --- .../org/opensearch/client/tasks/TaskInfo.java | 18 +++- .../core/tasks/GetTaskResponseTests.java | 24 ++++- .../tasks/CancelTasksResponseTests.java | 10 +- .../TransportRethrottleActionTests.java | 2 + .../admin/cluster/node/tasks/TasksIT.java | 14 ++- .../rest/action/cat/RestTasksAction.java | 2 + .../opensearch/tasks/StatCollectorTask.java | 34 +++++++ .../main/java/org/opensearch/tasks/Task.java | 4 +- .../java/org/opensearch/tasks/TaskInfo.java | 66 ++++++++++++- .../org/opensearch/tasks/TaskManager.java | 20 ++++ .../opensearch/tasks/TaskStatsContext.java | 93 +++++++++++++++++++ .../org/opensearch/tasks/TaskStatsType.java | 25 +++++ .../tasks/consumer/TaskStatConsumer.java | 22 +++++ .../tasks/consumer/TaskStatsLogger.java | 25 +++++ .../transport/TransportService.java | 7 ++ .../opensearch/tasks/task-index-mapping.json | 4 + .../admin/cluster/node/tasks/TaskTests.java | 12 ++- .../tasks/CancelTasksResponseTests.java | 4 +- .../tasks/ListTasksResponseTests.java | 17 +++- .../org/opensearch/tasks/TaskInfoTests.java | 75 ++++++++++++--- 20 files changed, 448 insertions(+), 30 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/StatCollectorTask.java create mode 100644 server/src/main/java/org/opensearch/tasks/TaskStatsContext.java create mode 100644 server/src/main/java/org/opensearch/tasks/TaskStatsType.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java b/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java index 062fbe56e4ed9..84c7abe999f94 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java @@ -56,6 +56,7 @@ public class TaskInfo { private TaskId parentTaskId; private final Map status = new HashMap<>(); private final Map headers = new HashMap<>(); + private final Map statsInfo = new HashMap<>(); public TaskInfo(TaskId taskId) { this.taskId = taskId; @@ -141,6 +142,14 @@ public Map getStatus() { return status; } + public Map getStatsInfo() { + return statsInfo; + } + + void setStatsInfo(Map statsInfo) { + this.statsInfo.putAll(statsInfo); + } + private void noOpParse(Object s) {} public static final ObjectParser.NamedObjectParser PARSER; @@ -160,6 +169,7 @@ private void noOpParse(Object s) {} parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable")); parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id")); parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers")); + parser.declareObject(TaskInfo::setStatsInfo, (p, c) -> p.map(), new ParseField("stats_info")); PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null); } @@ -177,7 +187,8 @@ && isCancellable() == taskInfo.isCancellable() && Objects.equals(getDescription(), taskInfo.getDescription()) && Objects.equals(getParentTaskId(), taskInfo.getParentTaskId()) && Objects.equals(status, taskInfo.status) - && Objects.equals(getHeaders(), taskInfo.getHeaders()); + && Objects.equals(getHeaders(), taskInfo.getHeaders()) + && Objects.equals(getStatsInfo(), taskInfo.getStatsInfo()); } @Override @@ -192,7 +203,8 @@ public int hashCode() { isCancellable(), getParentTaskId(), status, - getHeaders() + getHeaders(), + getStatsInfo() ); } @@ -222,6 +234,8 @@ public String toString() { + status + ", headers=" + headers + + ", stats_info=" + + statsInfo + '}'; } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java index a14e1169d09fc..7b427cc804008 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java @@ -41,10 +41,12 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.TaskStatsType; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.opensearch.test.AbstractXContentTestCase.xContentTester; @@ -57,7 +59,7 @@ public void testFromXContent() throws IOException { ) .assertEqualsConsumer(this::assertEqualInstances) .assertToXContentEquivalence(true) - .randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status")) + .randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.endsWith("stats_info")) .test(); } @@ -94,7 +96,25 @@ static TaskInfo randomTaskInfo() { Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + Map statsInfo = randomBoolean() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap() { + { + put(TaskStatsType.MEMORY.toString(), randomLong()); + put(TaskStatsType.CPU.toString(), randomLong()); + } + }); + return new TaskInfo( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + parentTaskId, + headers, + statsInfo + ); } private static TaskId randomTaskId() { diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java index 102ebb5fcd390..6a174208a265f 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/tasks/CancelTasksResponseTests.java @@ -47,6 +47,7 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.TaskStatsType; import java.io.IOException; import java.net.InetAddress; @@ -93,7 +94,13 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns randomIntBetween(5, 10), false, new TaskId("node1", randomLong()), - Collections.singletonMap("x-header-of", "some-value") + Collections.singletonMap("x-header-of", "some-value"), + Collections.unmodifiableMap(new HashMap() { + { + put(TaskStatsType.MEMORY.toString(), randomLong()); + put(TaskStatsType.CPU.toString(), randomLong()); + } + }) ) ); } @@ -133,6 +140,7 @@ protected void assertInstances( FakeTaskStatus status = (FakeTaskStatus) ti.getStatus(); assertEquals(status.code, taskInfo.getStatus().get("code")); assertEquals(status.status, taskInfo.getStatus().get("status")); + assertEquals(new HashMap<>(ti.getStatsInfo()), new HashMap<>(taskInfo.getStatsInfo())); } diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java index 4e6d3401a2f14..2cad75081d90b 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/TransportRethrottleActionTests.java @@ -130,6 +130,7 @@ public void testRethrottleSuccessfulResponse() { 0, true, new TaskId("test", task.getId()), + Collections.emptyMap(), Collections.emptyMap() ) ); @@ -165,6 +166,7 @@ public void testRethrottleWithSomeSucceeded() { 0, true, new TaskId("test", task.getId()), + Collections.emptyMap(), Collections.emptyMap() ) ); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index 02034ae05dc03..9ec8eb212c306 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -901,7 +901,19 @@ public void testNodeNotFoundButTaskFound() throws Exception { TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); resultsService.storeResult( new TaskResult( - new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()), + new TaskInfo( + new TaskId("fake", 1), + "test", + "test", + "", + null, + 0, + 0, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap(), + Collections.emptyMap() + ), new RuntimeException("test") ), new ActionListener() { diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java index b87205593ce87..b485ff2b5b542 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestTasksAction.java @@ -125,6 +125,7 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("timestamp", "alias:ts,hms,hhmmss;desc:start time in HH:MM:SS"); table.addCell("running_time_ns", "default:false;alias:time;desc:running time ns"); table.addCell("running_time", "default:true;alias:time;desc:running time"); + table.addCell("stats_info", "default:false;desc:stats info of the task"); // Node info table.addCell("node_id", "default:false;alias:ni;desc:unique node id"); @@ -162,6 +163,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime()))); table.addCell(taskInfo.getRunningTimeNanos()); table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos())); + table.addCell(taskInfo.getStatsInfo()); // Node information. Note that the node may be null because it has left the cluster between when we got this response and now. table.addCell(fullId ? nodeId : Strings.substring(nodeId, 0, 4)); diff --git a/server/src/main/java/org/opensearch/tasks/StatCollectorTask.java b/server/src/main/java/org/opensearch/tasks/StatCollectorTask.java new file mode 100644 index 0000000000000..2038ac07eb1de --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/StatCollectorTask.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import java.util.HashMap; +import java.util.Map; + +public class StatCollectorTask extends CancellableTask { + private Map allStats; + + public StatCollectorTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + super(id, type, action, description, parentTaskId, headers); + allStats = new HashMap<>(); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + + public Map getStats() { + return allStats; + } + + public void updateStat(TaskStatsType statsType, long value) { + allStats.put(statsType.toString(), value); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index 8646a97da5cfe..02cc37329d9b7 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -39,6 +39,7 @@ import org.opensearch.common.xcontent.ToXContentObject; import java.io.IOException; +import java.util.Collections; import java.util.Map; /** @@ -131,7 +132,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, - headers + headers, + this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap() ); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index 03afa763efd65..1398a471454ed 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -84,6 +84,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final Map headers; + private final Map statsInfo; + public TaskInfo( TaskId taskId, String type, @@ -94,7 +96,8 @@ public TaskInfo( long runningTimeNanos, boolean cancellable, TaskId parentTaskId, - Map headers + Map headers, + Map statsInfo ) { this.taskId = taskId; this.type = type; @@ -106,6 +109,7 @@ public TaskInfo( this.cancellable = cancellable; this.parentTaskId = parentTaskId; this.headers = headers; + this.statsInfo = statsInfo; } /** @@ -126,6 +130,11 @@ public TaskInfo(StreamInput in) throws IOException { } else { headers = Collections.emptyMap(); } + if (in.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) { + statsInfo = in.readMap(StreamInput::readString, StreamInput::readLong); + } else { + statsInfo = Collections.emptyMap(); + } } @Override @@ -142,6 +151,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(LegacyESVersion.V_6_2_0)) { out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } + if (out.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) { + out.writeMap(statsInfo, StreamOutput::writeString, StreamOutput::writeLong); + } } public TaskId getTaskId() { @@ -207,6 +219,13 @@ public Map getHeaders() { return headers; } + /** + * Returns the tasks stats information + */ + public Map getStatsInfo() { + return statsInfo; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", taskId.getNodeId()); @@ -233,6 +252,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(attribute.getKey(), attribute.getValue()); } builder.endObject(); + if (statsInfo != null) { + builder.startObject("stats_info"); + for (Map.Entry attribute : statsInfo.entrySet()) { + builder.field(attribute.getKey(), attribute.getValue()); + } + builder.endObject(); + } return builder; } @@ -257,9 +283,27 @@ public static TaskInfo fromXContent(XContentParser parser) { // This might happen if we are reading an old version of task info headers = Collections.emptyMap(); } + @SuppressWarnings("unchecked") + Map statsInfo = (Map) a[i++]; + if (statsInfo == null) { + // This might happen if we are reading an old version of task info or if stats information is not present + statsInfo = Collections.emptyMap(); + } RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); - return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + return new TaskInfo( + id, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + parentTaskId, + headers, + statsInfo + ); }); static { // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format @@ -275,6 +319,7 @@ public static TaskInfo fromXContent(XContentParser parser) { PARSER.declareBoolean(constructorArg(), new ParseField("cancellable")); PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.map(), new ParseField("stats_info")); } @Override @@ -298,11 +343,24 @@ public boolean equals(Object obj) { && Objects.equals(parentTaskId, other.parentTaskId) && Objects.equals(cancellable, other.cancellable) && Objects.equals(status, other.status) - && Objects.equals(headers, other.headers); + && Objects.equals(headers, other.headers) + && Objects.equals(statsInfo, other.statsInfo); } @Override public int hashCode() { - return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers); + return Objects.hash( + taskId, + type, + action, + description, + startTime, + runningTimeNanos, + parentTaskId, + cancellable, + status, + headers, + statsInfo + ); } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index 1f6169768f245..f27c846637ac0 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -57,6 +57,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.tasks.consumer.TaskStatConsumer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -110,10 +111,14 @@ public class TaskManager implements ClusterStateApplier { private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); private final SetOnce cancellationService = new SetOnce<>(); + /** Consumers that are notified of the stats */ + private List statConsumers; + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this.threadPool = threadPool; this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.statConsumers = new ArrayList<>(); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -202,6 +207,12 @@ public void cancel(CancellableTask task, String reason, Runnable listener) { */ public Task unregister(Task task) { logger.trace("unregister task for id: {}", task.getId()); + if (task instanceof StatCollectorTask) { + TaskStatsContext statsContext = TaskStatsContext.createTaskStatsContext((StatCollectorTask) task); + for (TaskStatConsumer consumer : statConsumers) { + consumer.taskStatConsumed(statsContext); + } + } if (task instanceof CancellableTask) { CancellableTaskHolder holder = cancellableTasks.remove(task.getId()); if (holder != null) { @@ -348,6 +359,15 @@ public CancellableTask getCancellableTask(long id) { } } + /** + * Register task stat consumer with task manager + * + *

The consumer is notified whenever an task is complete + */ + public void addTaskStatConsumer(TaskStatConsumer statConsumer) { + statConsumers.add(statConsumer); + } + /** * Returns the number of currently banned tasks. *

diff --git a/server/src/main/java/org/opensearch/tasks/TaskStatsContext.java b/server/src/main/java/org/opensearch/tasks/TaskStatsContext.java new file mode 100644 index 0000000000000..942fda2dcec88 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/TaskStatsContext.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import java.util.Map; + +public class TaskStatsContext { + private final long taskId; + private final String type; + private final String action; + private final String description; + private final long startTimeNanos; + private final String parentTaskId; + private final Map allStats; + + private TaskStatsContext( + long taskId, + String type, + String action, + String description, + long startTimeNanos, + String parentTaskId, + Map allStats + ) { + this.taskId = taskId; + this.type = type; + this.action = action; + this.description = description; + this.startTimeNanos = startTimeNanos; + this.parentTaskId = parentTaskId; + this.allStats = allStats; + } + + public long getTaskId() { + return taskId; + } + + public String getType() { + return type; + } + + public String getAction() { + return action; + } + + public String getDescription() { + return description; + } + + /** + * Returns the task start time + */ + public long getStartTime() { + return startTimeNanos; + } + + /** + * Returns the task running time + */ + public long getRunningTimeNanos() { + return System.nanoTime() - startTimeNanos; + } + + /** + * Returns the parent task id + */ + public String getParentTaskId() { + return parentTaskId; + } + + public Map getAllStats() { + return allStats; + } + + public static TaskStatsContext createTaskStatsContext(StatCollectorTask task) { + return new TaskStatsContext( + task.getId(), + task.getType(), + task.getAction(), + task.getDescription(), + task.getStartTimeNanos(), + task.getParentTaskId().toString(), + task.getStats() + ); + + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskStatsType.java b/server/src/main/java/org/opensearch/tasks/TaskStatsType.java new file mode 100644 index 0000000000000..61378907d7264 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/TaskStatsType.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +public enum TaskStatsType { + MEMORY("Memory"), + CPU("CPU"); + + private final String value; + + TaskStatsType(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java b/server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java new file mode 100644 index 0000000000000..6e4c9d844a46b --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.tasks.TaskStatsContext; + +/** + * This listener is notified whenever an task is completed and has stats present + */ +public interface TaskStatConsumer { + + /** + * Called when task is unregistered and task has stats present. + */ + void taskStatConsumed(TaskStatsContext taskStatsContext); +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java b/server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java new file mode 100644 index 0000000000000..b043ac38b7c39 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.tasks.TaskStatsContext; + +/** + * A simple listener that logs all stats published in the tasks + */ +public class TaskStatsLogger implements TaskStatConsumer { + private static final Logger LOGGER = LogManager.getLogger(TaskStatsLogger.class); + + @Override + public void taskStatConsumed(TaskStatsContext taskStatsContext) { + LOGGER.debug("Task stats: [{}] for task type=[{}]", taskStatsContext.getAllStats(), taskStatsContext.getType()); + } +} diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 3d6471bdfa6f3..28a52a57327c6 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -67,6 +67,7 @@ import org.opensearch.node.ReportingService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.tasks.consumer.TaskStatsLogger; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -213,6 +214,8 @@ public TransportService( setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); taskManager = createTaskManager(settings, threadPool, taskHeaders); + // Register task stats consumers + registerTaskStatConsumers(); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -269,6 +272,10 @@ void setTracerLogExclude(List tracerLogExclude) { this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY); } + private void registerTaskStatConsumers() { + taskManager.addTaskStatConsumer(new TaskStatsLogger()); + } + @Override protected void doStart() { transport.setMessageListener(this); diff --git a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json index 40730fc02886b..44ad142992c80 100644 --- a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json @@ -44,6 +44,10 @@ "headers": { "type" : "object", "enabled" : false + }, + "stats_info": { + "type" : "object", + "enabled" : false } } }, diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java index a2f8f0a5f7a44..fc7c2b00d7966 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java @@ -35,10 +35,12 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.TaskStatsType; import org.opensearch.test.OpenSearchTestCase; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; public class TaskTests extends OpenSearchTestCase { @@ -49,6 +51,12 @@ public void testTaskInfoToString() { long startTime = randomNonNegativeLong(); long runningTime = randomNonNegativeLong(); boolean cancellable = randomBoolean(); + Map stats = new LinkedHashMap() { + { + put(TaskStatsType.MEMORY.toString(), randomNonNegativeLong()); + put(TaskStatsType.CPU.toString(), randomNonNegativeLong()); + } + }; TaskInfo taskInfo = new TaskInfo( new TaskId(nodeId, taskId), "test_type", @@ -59,7 +67,8 @@ public void testTaskInfoToString() { runningTime, cancellable, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + stats ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -71,6 +80,7 @@ public void testTaskInfoToString() { assertEquals(((Number) map.get("running_time_in_nanos")).longValue(), runningTime); assertEquals(map.get("cancellable"), cancellable); assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar")); + assertEquals(map.get("stats_info"), stats); } } diff --git a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java index 64d2979c2c5a0..660e6a7816f39 100644 --- a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java @@ -69,8 +69,8 @@ private static List randomTasks() { @Override protected Predicate getRandomFieldsExcludeFilter() { - // status and headers hold arbitrary content, we can't inject random fields in them - return field -> field.endsWith("status") || field.endsWith("headers"); + // status, headers and stats_info hold arbitrary content, we can't inject random fields in them + return field -> field.endsWith("status") || field.endsWith("headers") || field.endsWith("stats_info"); } @Override diff --git a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java index 450dd522ca891..9e6083c4e9700 100644 --- a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java @@ -45,6 +45,7 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; @@ -71,7 +72,13 @@ public void testNonEmptyToString() { 1, true, new TaskId("node1", 0), - Collections.singletonMap("foo", "bar") + Collections.singletonMap("foo", "bar"), + Collections.unmodifiableMap(new HashMap() { + { + put(TaskStatsType.MEMORY.toString(), 100L); + put(TaskStatsType.CPU.toString(), 100L); + } + }) ); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals( @@ -91,6 +98,10 @@ public void testNonEmptyToString() { + " \"parent_task_id\" : \"node1:0\",\n" + " \"headers\" : {\n" + " \"foo\" : \"bar\"\n" + + " },\n" + + " \"stats_info\" : {\n" + + " \"Memory\" : 100,\n" + + " \"CPU\" : 100\n" + " }\n" + " }\n" + " ]\n" @@ -125,8 +136,8 @@ protected boolean supportsUnknownFields() { @Override protected Predicate getRandomFieldsExcludeFilter() { - // status and headers hold arbitrary content, we can't inject random fields in them - return field -> field.endsWith("status") || field.endsWith("headers"); + // status, headers and stats_info hold arbitrary content, we can't inject random fields in them + return field -> field.endsWith("status") || field.endsWith("headers") || field.endsWith("stats_info"); } @Override diff --git a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java index b9a0d05149bb8..09452b3b5d63b 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java @@ -77,8 +77,8 @@ protected boolean supportsUnknownFields() { @Override protected Predicate getRandomFieldsExcludeFilter() { - // status and headers hold arbitrary content, we can't inject random fields in them - return field -> "status".equals(field) || "headers".equals(field); + // status, headers and stats_info hold arbitrary content, we can't inject random fields in them + return field -> "status".equals(field) || "headers".equals(field) || "stats_info".equals(field); } @Override @@ -96,7 +96,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 1: return new TaskInfo( @@ -109,7 +110,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 2: return new TaskInfo( @@ -122,7 +124,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 3: return new TaskInfo( @@ -135,7 +138,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 4: Task.Status newStatus = randomValueOtherThan(info.getStatus(), TaskInfoTests::randomRawTaskStatus); @@ -149,7 +153,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 5: return new TaskInfo( @@ -162,7 +167,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 6: return new TaskInfo( @@ -175,7 +181,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos() + between(1, 100), info.isCancellable(), info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 7: return new TaskInfo( @@ -188,7 +195,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable() == false, info.getParentTaskId(), - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 8: TaskId parentId = new TaskId(info.getParentTaskId().getNodeId() + randomAlphaOfLength(5), info.getParentTaskId().getId()); @@ -202,7 +210,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), parentId, - info.getHeaders() + info.getHeaders(), + info.getStatsInfo() ); case 9: Map headers = info.getHeaders(); @@ -222,7 +231,29 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - headers + headers, + info.getStatsInfo() + ); + case 10: + Map statsInfo = info.getStatsInfo(); + if (statsInfo == null) { + statsInfo = new HashMap<>(1); + } else { + statsInfo = new HashMap<>(info.getStatsInfo()); + } + statsInfo.put(TaskStatsType.MEMORY.toString(), randomLong()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.getParentTaskId(), + info.getHeaders(), + statsInfo ); default: throw new IllegalStateException(); @@ -242,7 +273,25 @@ static TaskInfo randomTaskInfo() { Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + Map statsInfo = randomBoolean() ? Collections.emptyMap() : new HashMap() { + { + put(TaskStatsType.MEMORY.toString(), randomNonNegativeLong()); + put(TaskStatsType.CPU.toString(), randomNonNegativeLong()); + } + }; + return new TaskInfo( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + parentTaskId, + headers, + statsInfo + ); } private static TaskId randomTaskId() {