-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support task resource tracking in OpenSearch #2639
Changes from all commits
66e1aee
84cc493
a01aac2
36d2de1
be7cb83
f61ef7d
7058bd7
aa35b82
23d639b
930edae
f68e91d
e788dfd
39dfc22
4b26d2d
c76ce40
6915d17
576a477
046c652
f135cf1
dea288b
167086a
ff4a9eb
3df4d63
5dcd53e
9bd32cf
0c301e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ | |
import org.opensearch.action.ActionResponse; | ||
import org.opensearch.common.lease.Releasable; | ||
import org.opensearch.common.lease.Releasables; | ||
import org.opensearch.common.util.concurrent.ThreadContext; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.tasks.TaskCancelledException; | ||
import org.opensearch.tasks.TaskId; | ||
|
@@ -88,31 +89,39 @@ public final Task execute(Request request, ActionListener<Response> listener) { | |
*/ | ||
final Releasable unregisterChildNode = registerChildNode(request.getParentTask()); | ||
final Task task; | ||
|
||
try { | ||
task = taskManager.register("transport", actionName, request); | ||
} catch (TaskCancelledException e) { | ||
unregisterChildNode.close(); | ||
throw e; | ||
} | ||
execute(task, request, new ActionListener<Response>() { | ||
@Override | ||
public void onResponse(Response response) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onResponse(response); | ||
|
||
ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least, it is a bit cleaner, thanks @tushar-kharbanda72, why do we do that every time instead of doing in once in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other execute gets called for an already registered task as well. So essentially it's possible task manager getting execute call twice for same task. |
||
try { | ||
execute(task, request, new ActionListener<Response>() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The second issue which strikes me: why task manager has register / unregister + cancellation but does not have an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering something similar as all these pieces are spread across and execution doesn't seem to have common pipeline through which it flows through. But wouldn't want to include the change in this PR as I believe this part still needs more discussion and brain storming and might delay the current PR if we extend the scope. |
||
@Override | ||
public void onResponse(Response response) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onResponse(response); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onFailure(e); | ||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onFailure(e); | ||
} | ||
} | ||
} | ||
}); | ||
}); | ||
} finally { | ||
storedContext.close(); | ||
} | ||
|
||
return task; | ||
} | ||
|
||
|
@@ -129,25 +138,30 @@ public final Task execute(Request request, TaskListener<Response> listener) { | |
unregisterChildNode.close(); | ||
throw e; | ||
} | ||
execute(task, request, new ActionListener<Response>() { | ||
@Override | ||
public void onResponse(Response response) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onResponse(task, response); | ||
ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task); | ||
try { | ||
execute(task, request, new ActionListener<Response>() { | ||
@Override | ||
public void onResponse(Response response) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onResponse(task, response); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onFailure(task, e); | ||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task)); | ||
} finally { | ||
listener.onFailure(task, e); | ||
} | ||
} | ||
} | ||
}); | ||
}); | ||
} finally { | ||
storedContext.close(); | ||
} | ||
return task; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this not work with
wait_for_completion
true?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Wait for completion means tasks have finished which further means that the resource tracking cycle of those tasks is also complete. But if user doesn't pass this then for the running tasks if those are handled by just one thread and are in progress - if we don't refresh users will always see 0 value until task is complete and the end values have been recorded.
So to show the most recent state we refresh the stats before returning response.