Skip to content

Commit

Permalink
Add stats to task framework
Browse files Browse the repository at this point in the history
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
  • Loading branch information
sruti1312 committed Nov 16, 2021
1 parent fb78d10 commit 4744ffe
Show file tree
Hide file tree
Showing 20 changed files with 448 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> statsInfo = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -141,6 +142,14 @@ public Map<String, Object> getStatus() {
return status;
}

public Map<String, Object> getStatsInfo() {
return statsInfo;
}

void setStatsInfo(Map<String, Object> statsInfo) {
this.statsInfo.putAll(statsInfo);
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -160,6 +169,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setStatsInfo, (p, c) -> p.map(), new ParseField("stats_info"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -177,7 +187,8 @@ && isCancellable() == taskInfo.isCancellable()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getStatsInfo(), taskInfo.getStatsInfo());
}

@Override
Expand All @@ -192,7 +203,8 @@ public int hashCode() {
isCancellable(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getStatsInfo()
);
}

Expand Down Expand Up @@ -222,6 +234,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", stats_info="
+ statsInfo
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskStatsType;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -57,7 +59,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.endsWith("stats_info"))
.test();
}

Expand Down Expand Up @@ -94,7 +96,25 @@ static TaskInfo randomTaskInfo() {
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
Map<String, Long> statsInfo = randomBoolean() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, Long>() {
{
put(TaskStatsType.MEMORY.toString(), randomLong());
put(TaskStatsType.CPU.toString(), randomLong());
}
});
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
parentTaskId,
headers,
statsInfo
);
}

private static TaskId randomTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskStatsType;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -93,7 +94,13 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
randomIntBetween(5, 10),
false,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
Collections.unmodifiableMap(new HashMap<String, Long>() {
{
put(TaskStatsType.MEMORY.toString(), randomLong());
put(TaskStatsType.CPU.toString(), randomLong());
}
})
)
);
}
Expand Down Expand Up @@ -133,6 +140,7 @@ protected void assertInstances(
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
assertEquals(status.code, taskInfo.getStatus().get("code"));
assertEquals(status.status, taskInfo.getStatus().get("status"));
assertEquals(new HashMap<>(ti.getStatsInfo()), new HashMap<>(taskInfo.getStatsInfo()));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testRethrottleSuccessfulResponse() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down Expand Up @@ -165,6 +166,7 @@ public void testRethrottleWithSomeSucceeded() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
Collections.emptyMap()
),
new RuntimeException("test")
),
new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("timestamp", "alias:ts,hms,hhmmss;desc:start time in HH:MM:SS");
table.addCell("running_time_ns", "default:false;alias:time;desc:running time ns");
table.addCell("running_time", "default:true;alias:time;desc:running time");
table.addCell("stats_info", "default:false;desc:stats info of the task");

// Node info
table.addCell("node_id", "default:false;alias:ni;desc:unique node id");
Expand Down Expand Up @@ -162,6 +163,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo
table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime())));
table.addCell(taskInfo.getRunningTimeNanos());
table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos()));
table.addCell(taskInfo.getStatsInfo());

// Node information. Note that the node may be null because it has left the cluster between when we got this response and now.
table.addCell(fullId ? nodeId : Strings.substring(nodeId, 0, 4));
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/tasks/StatCollectorTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.HashMap;
import java.util.Map;

public class StatCollectorTask extends CancellableTask {
private Map<String, Long> allStats;

public StatCollectorTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
allStats = new HashMap<>();
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

public Map<String, Long> getStats() {
return allStats;
}

public void updateStat(TaskStatsType statsType, long value) {
allStats.put(statsType.toString(), value);
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
Expand Down Expand Up @@ -131,7 +132,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
parentTask,
headers
headers,
this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap()
);
}

Expand Down
66 changes: 62 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final Map<String, String> headers;

private final Map<String, Long> statsInfo;

public TaskInfo(
TaskId taskId,
String type,
Expand All @@ -94,7 +96,8 @@ public TaskInfo(
long runningTimeNanos,
boolean cancellable,
TaskId parentTaskId,
Map<String, String> headers
Map<String, String> headers,
Map<String, Long> statsInfo
) {
this.taskId = taskId;
this.type = type;
Expand All @@ -106,6 +109,7 @@ public TaskInfo(
this.cancellable = cancellable;
this.parentTaskId = parentTaskId;
this.headers = headers;
this.statsInfo = statsInfo;
}

/**
Expand All @@ -126,6 +130,11 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
headers = Collections.emptyMap();
}
if (in.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) {
statsInfo = in.readMap(StreamInput::readString, StreamInput::readLong);
} else {
statsInfo = Collections.emptyMap();
}
}

@Override
Expand All @@ -142,6 +151,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_6_2_0)) {
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
if (out.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) {
out.writeMap(statsInfo, StreamOutput::writeString, StreamOutput::writeLong);
}
}

public TaskId getTaskId() {
Expand Down Expand Up @@ -207,6 +219,13 @@ public Map<String, String> getHeaders() {
return headers;
}

/**
* Returns the tasks stats information
*/
public Map<String, Long> getStatsInfo() {
return statsInfo;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", taskId.getNodeId());
Expand All @@ -233,6 +252,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
if (statsInfo != null) {
builder.startObject("stats_info");
for (Map.Entry<String, Long> attribute : statsInfo.entrySet()) {
builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
}
return builder;
}

Expand All @@ -257,9 +283,27 @@ public static TaskInfo fromXContent(XContentParser parser) {
// This might happen if we are reading an old version of task info
headers = Collections.emptyMap();
}
@SuppressWarnings("unchecked")
Map<String, Long> statsInfo = (Map<String, Long>) a[i++];
if (statsInfo == null) {
// This might happen if we are reading an old version of task info or if stats information is not present
statsInfo = Collections.emptyMap();
}
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
id,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
parentTaskId,
headers,
statsInfo
);
});
static {
// Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format
Expand All @@ -275,6 +319,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
PARSER.declareBoolean(constructorArg(), new ParseField("cancellable"));
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.map(), new ParseField("stats_info"));
}

@Override
Expand All @@ -298,11 +343,24 @@ public boolean equals(Object obj) {
&& Objects.equals(parentTaskId, other.parentTaskId)
&& Objects.equals(cancellable, other.cancellable)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers);
&& Objects.equals(headers, other.headers)
&& Objects.equals(statsInfo, other.statsInfo);
}

@Override
public int hashCode() {
return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers);
return Objects.hash(
taskId,
type,
action,
description,
startTime,
runningTimeNanos,
parentTaskId,
cancellable,
status,
headers,
statsInfo
);
}
}
Loading

0 comments on commit 4744ffe

Please sign in to comment.