-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: add operator abstraction #46279
Conversation
Hi @tangenta. Thanks for your PR. PRs from untrusted users cannot be marked as trusted with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #46279 +/- ##
================================================
- Coverage 73.3815% 72.7198% -0.6618%
================================================
Files 1285 1309 +24
Lines 394832 399639 +4807
================================================
+ Hits 289734 290617 +883
- Misses 86650 90608 +3958
+ Partials 18448 18414 -34
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
return "simpleSink" | ||
} | ||
|
||
type simpleOperator[T, R any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For add index, will we use IngestOperator which wraps AsyncOperator instead of simpleOperator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you can overwrite the Display()
method.
@ywqzzy: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3/10, will review later
disttask/operator/operator.go
Outdated
} | ||
|
||
// NewAsyncOperator create an AsyncOperator. | ||
func NewAsyncOperator[T, R any](name string, workerNum int, transform func(T) R) *AsyncOperator[T, R] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously i thought we need impl each operator separately, now it seems we have a common operator and we need to impl a different transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it's not that flexible as each input T we generate a output R, but in some case we only generate one output from all input data.(right now, we're using a special sink as a collector)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it's not that flexible as each input T we generate a output R, but in some case we only generate one output from all input data.(right now, we're using a special sink as a collector)
Do you mean that one operator should have multiple type of input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me change it to two constructors:
// NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.
func NewAsyncOperatorWithTransform[T, R any](name string, workerNum int, transform func(T) R) *AsyncOperator[T, R] {
pool := workerpool.NewWorkerPool(name, util.DistTask, workerNum, newAsyncWorkerCtor(transform))
return NewAsyncOperator(pool)
}
// NewAsyncOperator create an AsyncOperator.
func NewAsyncOperator[T, R any](pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R] {
return &AsyncOperator[T, R]{
pool: pool,
}
}
We can pass a custom worker pool for different businesses.
If this still cannot meet the requirement, we can define our own async operator by implementing Operator
+ WithSource
/WithSink
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that one operator should have multiple type of input?
now we have one output row for each input row(suppose it's row-based), we may want a operator that aggregate multiple or all input rows into a single output row.
Seems no err handling code, next pr? |
@Benjamin2037 Yes, I will implement the error handling in next PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: D3Hunter, GMHDBJD, ywqzzy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: ref #46258
Problem Summary:
What is changed and how it works?
This PR introduces the async data operator interface, which provide an abstraction for the planner of dist task framework.
By composing the async operators, we can process the data concurrently:
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.