-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: impl slot manager in dispatcher, and check resource before start dispatcher #49195
Conversation
Hi @D3Hunter. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
}() | ||
dm.setRunningTask(task, dispatcher) | ||
metrics.UpdateMetricsForRunTask(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from setRunningTask
/cc @ywqzzy |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #49195 +/- ##
================================================
+ Coverage 71.0441% 71.8911% +0.8469%
================================================
Files 1368 1410 +42
Lines 402948 418921 +15973
================================================
+ Hits 286271 301167 +14896
- Misses 96736 98842 +2106
+ Partials 19941 18912 -1029
Flags with carried forward coverage won't be shown. Click here to find out more.
|
mu struct { | ||
syncutil.RWMutex | ||
taskIDs map[int64]struct{} | ||
dispatchers map[int64]Dispatcher | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using sync.Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's untyped. let's leave it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
if taskCnt >= proto.MaxConcurrentTask { | ||
break | ||
} | ||
for _, task := range tasks { | ||
// This task is running, so no need to reprocess it. | ||
if dm.isRunningTask(task.ID) { | ||
continue | ||
} | ||
metrics.DistTaskGauge.WithLabelValues(task.Type.String(), metrics.DispatchingStatus).Inc() | ||
// we check it before start dispatcher, so no need to check it again. | ||
// see startDispatcher. | ||
// this should not happen normally, unless user modify system table | ||
// directly. | ||
if getDispatcherFactory(task.Type) == nil { | ||
logutil.BgLogger().Warn("unknown task type", zap.Int64("task-id", task.ID), | ||
zap.Stringer("task-type", task.Type)) | ||
dm.failTask(task, errors.New("unknown task type")) | ||
continue | ||
} | ||
// the task is not in runningTasks set when: | ||
// owner changed or task is cancelled when status is pending. | ||
if task.State == proto.TaskStateRunning || task.State == proto.TaskStateReverting || task.State == proto.TaskStateCancelling { | ||
metrics.UpdateMetricsForDispatchTask(task) | ||
dm.startDispatcher(task) | ||
cnt++ | ||
continue | ||
} | ||
if dm.checkConcurrencyOverflow(cnt) { | ||
break | ||
} | ||
metrics.UpdateMetricsForDispatchTask(task) | ||
dm.startDispatcher(task) | ||
cnt++ | ||
reservedExecID, ok := dm.slotMgr.canReserve(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why check it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's checked twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is strange. In fact I commented L197 in vs code.....
I mean L197 and L158
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check in L158, so no need to run any sql
require.Eventually(t, func() bool { | ||
taskKeys := getRunningTaskKeys() | ||
return err == nil && len(taskKeys) == 4 && | ||
taskKeys[0] == "key/0" && taskKeys[1] == "key/1" && | ||
taskKeys[2] == "key/3" && taskKeys[3] == "key/4" | ||
}, time.Second*10, time.Millisecond*100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment to explain why there are 4 tasks. I think it is because the MaxDispatcherConcurency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, limited by max concurrent task. seems quite trivial to comment this
// this should not happen normally, unless user modify system table | ||
// directly. | ||
if getDispatcherFactory(task.Type) == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which system table affects this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suppose user change task.Type to some invalid value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
Co-authored-by: tangenta <tangenta@126.com>
Co-authored-by: tangenta <tangenta@126.com>
@D3Hunter: you cannot LGTM your own PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: D3Hunter, okJiang, tangenta The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@D3Hunter: Please do not delete or edit you lgtm type comment! In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
What problem does this PR solve?
Issue Number: close #49100
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.