-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
optimization: Unified Sorter #972
Conversation
cdc/puller/unified_sorter.go
Outdated
} | ||
// Cache is full. Let GC do its job | ||
} | ||
panic("Unexpected type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer log.Fatal
.
cdc/puller/unified_sorter.go
Outdated
for i := range heapSorters { | ||
finalI := i | ||
heapSorters[finalI] = newHeapSorter(finalI, s.pool, sorterOutCh) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer errgroup.Go
to manage related goroutines.
cdc/puller/unified_sorter.go
Outdated
fileBufferSize = 16 * 1024 * 1024 | ||
heapSizeLimit = 4 * 1024 * 1024 // 4MB | ||
numConcurrentHeaps = 16 | ||
memoryLimit = 1024 * 1024 * 1024 // 1GB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you set the value base on system memory capacity, eg. min(upperBound, x% * systemMemory).
cdc/puller/unified_sorter.go
Outdated
memoryUseEstimate: 0, | ||
fileNameCounter: 0, | ||
mu: sync.Mutex{}, | ||
cache: make([]unsafe.Pointer, 256), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you want a fixed size array, eg, [256]unsafe.Pointer.
cdc/puller/unified_sorter.go
Outdated
} | ||
|
||
func newFileSorterBackEnd(fileName string, serde serializerDeserializer) (*fileSorterBackEnd, error) { | ||
f, err := os.Create(fileName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you remove the file?
cdc/puller/unified_sorter.go
Outdated
heapSizeBytesEstimate += event.RawKV.ApproximateSize() | ||
if heapSizeBytesEstimate >= heapSizeLimit || isResolvedEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to calculate the estimated size before adding an entry to the heap to make sure it does not exceed the limit.
/run-all-tests |
/run-all-tests |
/run-all-tests |
/run-all-tests |
/run-integration-tests |
/run-all-tests |
@@ -0,0 +1,150 @@ | |||
// Copyright 2020 PingCAP, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please exclude testing_utils
dir from the coverage counter
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
Co-authored-by: amyangfei <amyangfei@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/merge |
Your auto merge job has been accepted, waiting for:
|
/run-all-tests |
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
cherry pick to release-4.0 in PR #1122 |
What problem does this PR solve?
#945
What is changed and how it works?
Check List
Tests
Code changes
Side effects
Benchmarks
Method
We generate mock KV-entries with 1KB data in each entry. We use 256 goroutines to generate data in parallel. There is no resolved event until all data has been fed to the sorter.
Results
Machine specifications
CPU: Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz
Memory: 16GB DDR4 2133MHz * 8
Disk: Intel Corporation NVMe Datacenter SSD
Sorter configuration
Release note