Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource stats to task framework #1555

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> statsInfo = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -141,6 +142,14 @@ public Map<String, Object> getStatus() {
return status;
}

public Map<String, Object> getStatsInfo() {
return statsInfo;
}

void setStatsInfo(Map<String, Object> statsInfo) {
this.statsInfo.putAll(statsInfo);
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -160,6 +169,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setStatsInfo, (p, c) -> p.map(), new ParseField("stats_info"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -177,7 +187,8 @@ && isCancellable() == taskInfo.isCancellable()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getStatsInfo(), taskInfo.getStatsInfo());
}

@Override
Expand All @@ -192,7 +203,8 @@ public int hashCode() {
isCancellable(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getStatsInfo()
);
}

Expand Down Expand Up @@ -222,6 +234,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", stats_info="
+ statsInfo
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskStatsType;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -57,7 +59,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.endsWith("stats_info"))
.test();
}

Expand Down Expand Up @@ -94,7 +96,25 @@ static TaskInfo randomTaskInfo() {
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
Map<String, Long> statsInfo = randomBoolean() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, Long>() {
{
put(TaskStatsType.MEMORY.toString(), randomLong());
put(TaskStatsType.CPU.toString(), randomLong());
}
});
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
parentTaskId,
headers,
statsInfo
);
}

private static TaskId randomTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskStatsType;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -93,7 +94,13 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
randomIntBetween(5, 10),
false,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
Collections.unmodifiableMap(new HashMap<String, Long>() {
{
put(TaskStatsType.MEMORY.toString(), randomLong());
put(TaskStatsType.CPU.toString(), randomLong());
}
})
)
);
}
Expand Down Expand Up @@ -133,6 +140,7 @@ protected void assertInstances(
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
assertEquals(status.code, taskInfo.getStatus().get("code"));
assertEquals(status.status, taskInfo.getStatus().get("status"));
assertEquals(new HashMap<>(ti.getStatsInfo()), new HashMap<>(taskInfo.getStatsInfo()));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testRethrottleSuccessfulResponse() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down Expand Up @@ -165,6 +166,7 @@ public void testRethrottleWithSomeSucceeded() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap(),
Collections.emptyMap()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
Collections.emptyMap()
),
new RuntimeException("test")
),
new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("timestamp", "alias:ts,hms,hhmmss;desc:start time in HH:MM:SS");
table.addCell("running_time_ns", "default:false;alias:time;desc:running time ns");
table.addCell("running_time", "default:true;alias:time;desc:running time");
table.addCell("stats_info", "default:false;desc:stats info of the task");

// Node info
table.addCell("node_id", "default:false;alias:ni;desc:unique node id");
Expand Down Expand Up @@ -162,6 +163,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo
table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime())));
table.addCell(taskInfo.getRunningTimeNanos());
table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos()));
table.addCell(taskInfo.getStatsInfo());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use a request parameter (similar to detailed flag) to control if task API will return the status_info in results or not. This will help to avoid breaking the backward compatibility where old client are accessing the cluster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dblock : When a new field is added in any REST API response object, will that break backward compatibility from client perspective ? Or adding a new field is fine and client doesn't perform any strict schema validation on response object ? Based on that info we can decide if a new parameter is needed to control the response payload with new field or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think new field are ok, @nknize can you confirm please?


// Node information. Note that the node may be null because it has left the cluster between when we got this response and now.
table.addCell(fullId ? nodeId : Strings.substring(nodeId, 0, 4));
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/tasks/StatCollectorTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.HashMap;
import java.util.Map;

public class StatCollectorTask extends CancellableTask {
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatsCollectorTask, since it collect all stats?

private Map<String, Long> allStats;

public StatCollectorTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
allStats = new HashMap<>();
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

public Map<String, Long> getStats() {
return allStats;
}

public void updateStat(TaskStatsType statsType, long value) {
allStats.put(statsType.toString(), value);
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
Expand Down Expand Up @@ -131,7 +132,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
parentTask,
headers
headers,
this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using empty map as default for statsInfo in TaskInfo, that means for all the API like CancelTask as well it will end up returning an empty object, which is not needed. Instead, we will need to have some mechanism similar to description field which supports nullability and is only returned if detailed flag is set in request. For cancel type API that is always treated as false and the field in not returned in the response. You can refer the caller of this method

);
}

Expand Down
66 changes: 62 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final Map<String, String> headers;

private final Map<String, Long> statsInfo;

public TaskInfo(
TaskId taskId,
String type,
Expand All @@ -94,7 +96,8 @@ public TaskInfo(
long runningTimeNanos,
boolean cancellable,
TaskId parentTaskId,
Map<String, String> headers
Map<String, String> headers,
Map<String, Long> statsInfo
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should instead use some stats object as value rather than Long to help with adding other fields in that object in future for each stats type

) {
this.taskId = taskId;
this.type = type;
Expand All @@ -106,6 +109,7 @@ public TaskInfo(
this.cancellable = cancellable;
this.parentTaskId = parentTaskId;
this.headers = headers;
this.statsInfo = statsInfo;
}

/**
Expand All @@ -126,6 +130,11 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
headers = Collections.emptyMap();
}
if (in.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) {
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
statsInfo = in.readMap(StreamInput::readString, StreamInput::readLong);
} else {
statsInfo = Collections.emptyMap();
}
}

@Override
Expand All @@ -142,6 +151,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_6_2_0)) {
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
if (out.getVersion().onOrAfter(LegacyESVersion.V_2_0_0)) {
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
out.writeMap(statsInfo, StreamOutput::writeString, StreamOutput::writeLong);
}
}

public TaskId getTaskId() {
Expand Down Expand Up @@ -207,6 +219,13 @@ public Map<String, String> getHeaders() {
return headers;
}

/**
* Returns the tasks stats information
*/
public Map<String, Long> getStatsInfo() {
return statsInfo;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", taskId.getNodeId());
Expand All @@ -233,6 +252,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
if (statsInfo != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case statsInfo will be null here ? I see it is always being setting as Collections.emptyMap() in callers if not supported by task

builder.startObject("stats_info");
for (Map.Entry<String, Long> attribute : statsInfo.entrySet()) {
builder.field(attribute.getKey(), attribute.getValue());
}
builder.endObject();
}
return builder;
}

Expand All @@ -257,9 +283,27 @@ public static TaskInfo fromXContent(XContentParser parser) {
// This might happen if we are reading an old version of task info
headers = Collections.emptyMap();
}
@SuppressWarnings("unchecked")
Map<String, Long> statsInfo = (Map<String, Long>) a[i++];
if (statsInfo == null) {
// This might happen if we are reading an old version of task info or if stats information is not present
statsInfo = Collections.emptyMap();
}
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
id,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
parentTaskId,
headers,
statsInfo
);
});
static {
// Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format
Expand All @@ -275,6 +319,7 @@ public static TaskInfo fromXContent(XContentParser parser) {
PARSER.declareBoolean(constructorArg(), new ParseField("cancellable"));
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.map(), new ParseField("stats_info"));
}

@Override
Expand All @@ -298,11 +343,24 @@ public boolean equals(Object obj) {
&& Objects.equals(parentTaskId, other.parentTaskId)
&& Objects.equals(cancellable, other.cancellable)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers);
&& Objects.equals(headers, other.headers)
&& Objects.equals(statsInfo, other.statsInfo);
}

@Override
public int hashCode() {
return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers);
return Objects.hash(
taskId,
type,
action,
description,
startTime,
runningTimeNanos,
parentTaskId,
cancellable,
status,
headers,
statsInfo
);
}
}
Loading