Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource stats to task framework #1555

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> resourceStats = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -150,6 +151,14 @@ public Map<String, Object> getStatus() {
return status;
}

void setResourceStats(Map<String, Object> resourceStats) {
this.resourceStats.putAll(resourceStats);
}

public Map<String, Object> getResourceStats() {
return resourceStats;
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -170,6 +179,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -188,7 +198,8 @@ && isCancelled() == taskInfo.isCancelled()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
}

@Override
Expand All @@ -204,7 +215,8 @@ public int hashCode() {
isCancelled(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getResourceStats()
);
}

Expand Down Expand Up @@ -236,6 +248,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", resource_stats="
+ resourceStats
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.tasks.TaskStats.CPU;
import static org.opensearch.tasks.TaskStats.MEMORY;
import static org.opensearch.test.AbstractXContentTestCase.xContentTester;

public class GetTaskResponseTests extends OpenSearchTestCase {
Expand All @@ -57,7 +60,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats"))
.test();
}

Expand Down Expand Up @@ -87,14 +90,24 @@ static TaskInfo randomTaskInfo() {
String action = randomAlphaOfLength(5);
Task.Status status = randomBoolean() ? randomRawTaskStatus() : null;
String description = randomBoolean() ? randomAlphaOfLength(5) : null;
long startTime = randomLong();
long runningTimeNanos = randomLong();
long startTime = randomNonNegativeLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
Map<String, Map<String, Long>> resourceStats = randomBoolean() ? Collections.emptyMap() : new HashMap<String, Map<String, Long>>() {
{
put(randomAlphaOfLength(5), new HashMap<String, Long>() {
{
put(MEMORY.toString(), randomNonNegativeLong());
put(CPU.toString(), randomNonNegativeLong());
}
});
}
};
return new TaskInfo(
taskId,
type,
Expand All @@ -106,7 +119,8 @@ static TaskInfo randomTaskInfo() {
cancellable,
cancelled,
parentTaskId,
headers
headers,
resourceStats
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskStats;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -96,7 +97,17 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
cancellable,
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
new HashMap<String, Map<String, Long>>() {
{
put(randomAlphaOfLength(5), new HashMap<String, Long>() {
{
put(TaskStats.MEMORY.toString(), randomNonNegativeLong());
put(TaskStats.CPU.toString(), randomNonNegativeLong());
}
});
}
}
)
);
}
Expand Down Expand Up @@ -137,6 +148,7 @@ protected void assertInstances(
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
assertEquals(status.code, taskInfo.getStatus().get("code"));
assertEquals(status.status, taskInfo.getStatus().get("status"));
assertEquals(ti.getResourceStats(), taskInfo.getResourceStats());

}

Expand Down
10 changes: 10 additions & 0 deletions distribution/docker/src/docker/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index
logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.additivity = false

appender.task_detailslog_rolling.type = Console
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog

logger.task_detailslog.name = task.detailslog
logger.task_detailslog.level = trace
logger.task_detailslog.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog.additivity = false
22 changes: 22 additions & 0 deletions distribution/src/config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,25 @@ logger.index_indexing_slowlog.level = trace
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling
logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old
logger.index_indexing_slowlog.additivity = false
######## Task details log JSON ####################
appender.task_detailslog_rolling.type = RollingFile
appender.task_detailslog_rolling.name = task_detailslog_rolling
appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_task_detailslog.json
appender.task_detailslog_rolling.filePermissions = rw-r-----
appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout
appender.task_detailslog_rolling.layout.type_name = task_detailslog
appender.task_detailslog_rolling.layout.opensearchmessagefields=message,taskId,type,action,description,start_time_millis,parentTaskId,resource_stats

appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\
.cluster_name}_task_detailslog-%i.json.gz
appender.task_detailslog_rolling.policies.type = Policies
appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.task_detailslog_rolling.policies.size.size = 1GB
appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy
appender.task_detailslog_rolling.strategy.max = 4
#################################################
logger.task_detailslog_rolling.name = task.detailslog
logger.task_detailslog_rolling.level = trace
logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling
logger.task_detailslog_rolling.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static void rethrottleParentTask(
}, listener::onFailure));
} else {
logger.debug("children of task [{}] are already finished, nothing to rethrottle", task.getId());
listener.onResponse(task.taskInfo(localNodeId, true));
listener.onResponse(task.taskInfo(localNodeId, true, false));
}
}

Expand All @@ -136,7 +136,7 @@ private static void rethrottleChildTask(
) {
logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond);
task.getWorkerState().rethrottle(newRequestsPerSecond);
listener.onResponse(task.taskInfo(localNodeId, true));
listener.onResponse(task.taskInfo(localNodeId, true, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void testRethrottleSuccessfulResponse() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down Expand Up @@ -167,6 +168,7 @@ public void testRethrottleWithSomeSucceeded() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void testCancelOrphanedTasks() throws Exception {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask task : transportService.getTaskManager().getCancellableTasks().values()) {
if (task.getAction().equals(TransportTestAction.ACTION.name())) {
final TaskInfo taskInfo = task.taskInfo(transportService.getLocalNode().getId(), false);
final TaskInfo taskInfo = task.taskInfo(transportService.getLocalNode().getId(), false, false);
assertTrue(taskInfo.toString(), task.isCancelled());
assertNotNull(taskInfo.toString(), task.getReasonCancelled());
assertThat(taskInfo.toString(), task.getReasonCancelled(), equalTo("channel was closed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ public void testNodeNotFoundButTaskFound() throws Exception {
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
Collections.emptyMap()
),
new RuntimeException("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected void taskOperation(CancelTasksRequest request, CancellableTask cancell
cancellableTask,
request.getReason(),
request.waitForCompletion(),
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false))
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false, false))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
@Override
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
waitedForCompletion(
thisTask,
request,
runningTask.taskInfo(clusterService.localNode().getId(), true, true),
listener
);
}

@Override
Expand All @@ -171,7 +176,7 @@ public void onFailure(Exception e) {
}
});
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true, true);
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,4 @@ public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected ListTasksResponse newResponse(

@Override
protected void taskOperation(ListTasksRequest request, Task task, ActionListener<TaskInfo> listener) {
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed()));
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed(), request.getDetailed()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List<TaskInfo> slic
sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status));
}
Status status = leaderState.getStatus(sliceStatuses);
return taskInfo(localNodeId, getDescription(), status);
return taskInfo(localNodeId, getDescription(), status, Collections.emptyMap());
}

private BulkByScrollTask.Status emptyStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected Table getTableWithHeader(final RestRequest request) {
// Task detailed info
if (detailed) {
table.addCell("description", "default:true;alias:desc;desc:task action");
table.addCell("resource_stats", "default:false;desc:resource consumption info of the task");
}
table.endHeaders();
return table;
Expand Down Expand Up @@ -173,6 +174,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo

if (detailed) {
table.addCell(taskInfo.getDescription());
table.addCell(taskInfo.getResourceStats());
}
table.endRow();
}
Expand Down
Loading