-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support file sort in puller #477
Conversation
/run-integration-tests |
Codecov Report
@@ Coverage Diff @@
## master #477 +/- ##
================================================
- Coverage 29.0322% 28.3365% -0.6957%
================================================
Files 58 59 +1
Lines 5673 6144 +471
================================================
+ Hits 1647 1741 +94
- Misses 3913 4287 +374
- Partials 113 116 +3 |
cdc/model/mounter.go
Outdated
Ts uint64 `json:"t"` | ||
RawKV *RawKVEntry `json:"-"` | ||
Row *RowChangedEvent `json:"r"` | ||
Finished chan struct{} `json:"-"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make Finished
public? and why not store RowChangedEvent
to the disk directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by the way, i think json
is not a suited serialization algorithm, it's advantages is readability but we don't need it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, Finished
is set to public by mistake. And use gob encode to store PolymorphicEvent to file
/run-integration-tests |
|
||
func (fs *FileSorter) sortAndOutput(ctx context.Context) error { | ||
bufferLen := 10 | ||
buffer := make([]*model.PolymorphicEvent, 0, bufferLen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using bytes.Buffer to avoid frequent memory allocation
return 0, errors.Trace(err) | ||
} | ||
dataBuf.Reset() | ||
err = gob.NewEncoder(dataBuf).Encode(entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about create a new encoder with file buffer directly?
f, err := os.OpenFile(fullpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return 0, errors.Trace(err)
}
w := bufio.NewWriter(f)
encoder:= gob.NewEncoder(w)
for _, entry := range entries {
encoder.Encode(entry)
}
cdc/puller/file_sorter.go
Outdated
|
||
// clear and reset unsorted files, set cache sorting flag to prevent repeated sort | ||
fs.cache.fileLock.Lock() | ||
if atomic.LoadInt32(&fs.cache.sorting) == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's needless that check the sorting is in process under lock protect
we can use CompareAndSwap
to avoid using lock
cdc/puller/file_sorter.go
Outdated
return "", errors.New("unsorted file unexpected truncated") | ||
} | ||
ev := &model.PolymorphicEvent{} | ||
err = gob.NewDecoder(bytes.NewReader(data[idx+8 : idx+8+dataLen])).Decode(ev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid to create decode and reader continually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
What problem does this PR solve?
This is the first part of pingcap/ticdc#466, add a file sort mechanism, a changefeed can use either memory sorter or file sorter
What is changed and how it works?
EventSorter
in puller, so we don't need to change original replication model.input
cache channel, it reads frominput
channel and flushes events to file until they are all mounted.Check List
Tests