-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add resource stats to task framework #1555
Add resource stats to task framework #1555
Conversation
Can one of the admins verify this patch? |
✅ Gradle Wrapper Validation success c16586137b6ffe154ad6677f95b380a4d8974019 |
✅ Gradle Precommit success c16586137b6ffe154ad6677f95b380a4d8974019 |
❌ Gradle Check failure c16586137b6ffe154ad6677f95b380a4d8974019 |
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
c165861
to
4744ffe
Compare
✅ Gradle Wrapper Validation success 4744ffe |
✅ Gradle Precommit success 4744ffe |
LGTM, how do you plan to use it? |
@dblock - The resource stats can be used to get top N resource consuming tasks. We can use the |
/** | ||
* This listener is notified whenever an task is completed and has stats present | ||
*/ | ||
public interface TaskStatConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be TaskStatsConsumer
would be better name ?
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class StatCollectorTask extends CancellableTask { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StatsCollectorTask
, since it collect all stats?
@@ -110,10 +111,14 @@ | |||
private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); | |||
private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>(); | |||
|
|||
/** Consumers that are notified of the stats */ | |||
private List<TaskStatConsumer> statConsumers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should use the thread-safe data structure here, TaskManager
is a singleton and addTaskStatConsumer
could be called any time. Alternatively, we could make it final
and add the constructor argument (instead of addTaskStatConsumer
) to exclude any races.
Thanks @dblock , I have a few comments primarily regarding naming, it looks inconsistent (to me), there is a mix of |
Thanks! @sruti1312 care to address naming? |
server/src/main/java/org/opensearch/tasks/consumer/TaskStatConsumer.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/consumer/TaskStatsLogger.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/StatCollectorTask.java
Outdated
Show resolved
Hide resolved
@@ -162,6 +163,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo | |||
table.addCell(FORMATTER.format(Instant.ofEpochMilli(taskInfo.getStartTime()))); | |||
table.addCell(taskInfo.getRunningTimeNanos()); | |||
table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos())); | |||
table.addCell(taskInfo.getStatsInfo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use a request parameter (similar to detailed
flag) to control if task API will return the status_info
in results or not. This will help to avoid breaking the backward compatibility where old client are accessing the cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dblock : When a new field is added in any REST API response object, will that break backward compatibility from client perspective ? Or adding a new field is fine and client doesn't perform any strict schema validation on response object ? Based on that info we can decide if a new parameter is needed to control the response payload with new field or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think new field are ok, @nknize can you confirm please?
@@ -233,6 +252,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |||
builder.field(attribute.getKey(), attribute.getValue()); | |||
} | |||
builder.endObject(); | |||
if (statsInfo != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case statsInfo
will be null here ? I see it is always being setting as Collections.emptyMap()
in callers if not supported by task
}, | ||
"stats_info": { | ||
"type" : "object", | ||
"enabled" : false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how this new field will be handled for existing task result indices ?
@@ -131,7 +132,8 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status | |||
System.nanoTime() - startTimeNanos, | |||
this instanceof CancellableTask, | |||
parentTask, | |||
headers | |||
headers, | |||
this instanceof StatCollectorTask ? ((StatCollectorTask) this).getStats() : Collections.emptyMap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using empty map as default for statsInfo
in TaskInfo
, that means for all the API like CancelTask as well it will end up returning an empty object, which is not needed. Instead, we will need to have some mechanism similar to description field which supports nullability and is only returned if detailed flag is set in request. For cancel type API that is always treated as false and the field in not returned in the response. You can refer the caller of this method
@@ -94,7 +96,8 @@ public TaskInfo( | |||
long runningTimeNanos, | |||
boolean cancellable, | |||
TaskId parentTaskId, | |||
Map<String, String> headers | |||
Map<String, String> headers, | |||
Map<String, Long> statsInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should instead use some stats
object as value rather than Long
to help with adding other fields in that object in future for each stats type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for another review @sohami, will leave this open for now.
@sruti1312 Want to get this over the finishing line? Rebase, etc.? Adding @andrross to help out on the CRs. |
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
MEMORY("memory"), | ||
CPU("cpu"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these stats mean? Like is the memory value a percentage or bytes? Maybe these answers are obvious to everyone but me :)
start gradle check |
Leaving this to @andrross to final/review/merge. |
client/rest-high-level/src/main/java/org/opensearch/client/tasks/TaskInfo.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/TaskStatsContext.java
Outdated
Show resolved
Hide resolved
@@ -63,6 +71,8 @@ | |||
|
|||
private final Map<String, String> headers; | |||
|
|||
private final Map<String, List<TaskResourceStatsUtil>> resourceStats; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current class names are confusing to me specially ones with Util
and Helper
as suffix in it. Does are usually used for static utility classes. How about below names:
TaskResourceStatsUtil --> TaskCompleteResourceInfo
TaskResourceStatsHelper: ---> TaskResourceInfo
TaskResourceMetric: --> TaskStatsInfo
This will make the different classes like below which shows the relation between them.
TaskCompleteResourceInfo:
Map<TaskStatsType, TaskResourceInfo>
TaskResourceInfo:
Map<TaskStats, TaskStatsInfo>
TaskStatsInfo:
private final TaskStats stats;
private final boolean absolute;
private long startValue;
private long endValue;
@@ -57,6 +59,7 @@ | |||
private TaskId parentTaskId; | |||
private final Map<String, Object> status = new HashMap<>(); | |||
private final Map<String, String> headers = new HashMap<>(); | |||
private final List<Object> resourceStats = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should have it as Map<String, Object>
where initially the entry will be for total resources consumed at task level. That way the serialization can also be of below format:
resource_stats: {
"total": {
"cpu_time": 183930,
"memory_in_bytes": 1010293
}
}
appender.search_fatlog_rolling.type = Console | ||
appender.search_fatlog_rolling.name = search_fatlog_rolling | ||
appender.search_fatlog_rolling.layout.type = OpenSearchJsonLayout | ||
appender.search_fatlog_rolling.layout.type_name = search_fatlog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's name it something else other than search_fatlog
like tasks_info.log
or tasks_details.log
. Considering it can be used for other actions except search let's not add search in the log name.
/** Consumers that are notified of the stats */ | ||
private final List<Consumer<TaskStatsContext>> statsConsumers = new ArrayList<Consumer<TaskStatsContext>>() { | ||
{ | ||
add(new TaskSearchStatsLogger()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this consumer enabled by default ?
208bf3f
to
46e533c
Compare
✅ Gradle Check success 46e533ca57b8be2da523897514b9dcf45b95972b |
e314321
to
1432f54
Compare
✅ Gradle Check success e314321a11d478d177a3e7ae061ff1294e009385 |
❌ Gradle Check failure 1432f54c93aef302454f3f2e3a64e3c926edfa46 |
Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
1432f54
to
eb3e993
Compare
} | ||
|
||
public long getTotalResourceUtilization(TaskStats taskStats) { | ||
AtomicLong totalResourceConsumption = new AtomicLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an AtomicLong just to be able to capture side effects from inside lambda functions seems like an anti-pattern to me. It implies there's some sort of concurrency here and also seems a bit like a hack to get around the compiler. I think you can rewrite this pretty simply to use regular for loops and a primitive long, like:
long totalResourceConsumption = 0L;
for (List<TaskCompleteResourceInfo> statsUtilList : resourceStats.values()) {
for (TaskCompleteResourceInfo statsUtil : statsUtilList) {
for (Map.Entry<TaskStatsType, TaskResourceInfo> entry : statsUtil.getResourceInfo().entrySet()) {
if (!entry.getKey().isOnlyForAnalysis()) {
totalResourceConsumption += entry.getValue().getStatsInfo().get(taskStats).getTotalValue();
}
}
}
}
return totalResourceConsumption;
I think you can also write this using the Java stream API, but I find it a bit hard to read and might prefer the more verbose style above:
return resourceStats.values().stream().mapToLong(
statsUtilList -> statsUtilList.stream().mapToLong(
statsUtil -> statsUtil.getResourceInfo().entrySet().stream()
.filter(e -> !e.getKey().isOnlyForAnalysis())
.mapToLong(e -> e.getValue().getStatsInfo().get(taskStats).getTotalValue())
.sum())
.sum())
.sum();
@@ -95,32 +106,36 @@ public Task( | |||
this.startTime = startTime; | |||
this.startTimeNanos = startTimeNanos; | |||
this.headers = headers; | |||
this.resourceStats = resourceStats; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor above creates a ConcurrentHashMap, but this constructor allows a caller to inject any kind of map it wants. If thread safety is important then usage of a thread safe map should be required (either copy the provided map into a ConcurrentHashMap or make the type ConcurrentMap). See my comment below about thread safety though.
TaskStatsType statsType, | ||
TaskStatsInfo... taskResourceMetrics | ||
) { | ||
List<TaskCompleteResourceInfo> taskStatsUtilList = resourceStats.getOrDefault(threadId, new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resourceStats
is a ConcurrentHashMap and therefore is thread safe, but this method isn't. Two threads could race in this method and one would end up overwriting the effects of the other. Is thread safety actually required?
Signed-off-by: Sruti Parthiban partsrut@amazon.com
Description
Add resource stats to task framework. The task expose interfaces to allow updating the resource stats. On task completion, the resource usage stats are emitted to registered consumers (or sinks).
Sample task response:
Issues Resolved
This is used for Search Memory tracking.
#1009
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.