Skip to content

Commit

Permalink
Merge branch 'master' into fix_52012
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Mar 25, 2024
2 parents e1f9083 + 40456f3 commit f9ea645
Show file tree
Hide file tree
Showing 253 changed files with 3,109 additions and 1,382 deletions.
36 changes: 18 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6157,13 +6157,13 @@ def go_deps():
name = "com_github_prometheus_common",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/common",
sha256 = "62f8ef01a9303e8767a035de11b10ef05ec100275109ab17734af21dbc22fa09",
strip_prefix = "github.com/prometheus/common@v0.48.0",
sha256 = "f5fdc8e60b2e3e4fa56d9af84d4a2d49039a08f0d2b63c732399d735360b1ba2",
strip_prefix = "github.com/prometheus/common@v0.51.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7145,26 +7145,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "56bca932f5be6574490fcaaa4d8a19f13a5e05f576aeea07ac16c86fe1feac09",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240320090330-ab337d6c51d2",
sha256 = "86c2da8180c318c8258d4759fcae926a5613f5ff929dfb3461ce307c15cc44a5",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240322070737-05aaba6cc6f7",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240320090330-ab337d6c51d2.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240320090330-ab337d6c51d2.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240320090330-ab337d6c51d2.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240320090330-ab337d6c51d2.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "d2f21afb90ffa49839d6392fa1660865fac285b10580c23b8feba5bbf6bf7191",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240320081713-c00c42e77b31",
sha256 = "9ebd4a2ecfd4d7c03f59deafc9d901e3c2ea308870e4dbaf91db1295b24e7ac1",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240322051414-fb9e2d561b6e",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240320081713-c00c42e77b31.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240320081713-c00c42e77b31.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240320081713-c00c42e77b31.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240320081713-c00c42e77b31.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
],
)
go_repository(
Expand Down
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ approvers:
- Leavrth
- leoppro
- lichunzhu
- lidezhu
- Little-Wallace
- liuzix
- lonng
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down
30 changes: 29 additions & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -474,7 +475,6 @@ func TestSplitEnv(t *testing.T) {
}

func TestConnectionDelay(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
Expand Down Expand Up @@ -510,3 +510,31 @@ func TestConnectionDelay(t *testing.T) {
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
16 changes: 4 additions & 12 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -846,27 +847,18 @@ func removeCheckpointData(ctx context.Context, s storage.ExternalStorage, subDir
zap.Int64("remove-size", removeSize),
)

maxFailedFilesNum := 16
failedFilesCount := struct {
lock sync.Mutex
count int
}{
count: 0,
}
maxFailedFilesNum := int64(16)
var failedFilesCount atomic.Int64
pool := utils.NewWorkerPool(4, "checkpoint remove worker")
eg, gCtx := errgroup.WithContext(ctx)
for _, filename := range removedFileNames {
name := filename
pool.ApplyOnErrorGroup(eg, func() error {
if err := s.DeleteFile(gCtx, name); err != nil {
log.Warn("failed to remove the file", zap.String("filename", name), zap.Error(err))
failedFilesCount.lock.Lock()
failedFilesCount.count += 1
if failedFilesCount.count >= maxFailedFilesNum {
failedFilesCount.lock.Unlock()
if failedFilesCount.Add(1) >= maxFailedFilesNum {
return errors.Annotate(err, "failed to delete too many files")
}
failedFilesCount.lock.Unlock()
}
return nil
})
Expand Down
3 changes: 0 additions & 3 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,8 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
datas,
s.store,
int64(5*size.MB),
64*1024,
mergeOutput,
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
Expand Down
12 changes: 5 additions & 7 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,18 @@ func (r *byteReader) switchToConcurrentReader() error {
// containing those bytes. The content of returned slice may be changed after
// next call.
func (r *byteReader) readNBytes(n int) ([]byte, error) {
readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
}
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)

readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
auxBuf := make([]byte, n)
for _, b := range bs {
copy(auxBuf[len(auxBuf)-n:], b)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ func NewMergeKVIter(
outerConcurrency = 1
}
concurrentReaderConcurrency := max(256/outerConcurrency, 8)
// TODO: merge-sort step passes outerConcurrency=0, so this bufSize might be
// too large when checkHotspot = true(add-index).
largeBufSize := ConcurrentReaderBufferSizePerConc * concurrentReaderConcurrency
memPool := membuf.NewPool(
membuf.WithBlockNum(1), // currently only one reader will become hotspot
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/external/kv_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ import (
"encoding/binary"
"io"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var (
// default read buf size of kvReader, this buf is split into 3 parts, 2 for prefetch
// from storage, 1 for read by user.
defaultReadBufferSize = 64 * units.KiB
)

type kvReader struct {
byteReader *byteReader
}
Expand Down
64 changes: 36 additions & 28 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,52 @@ package external
import (
"context"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
// MaxMergingFilesPerThread is the maximum number of files that can be merged by a
// single thread. This value comes from the fact that 16 threads are ok to merge 4k
// files in parallel, so we set it to 250.
MaxMergingFilesPerThread = 250
// MinUploadPartSize is the minimum size of each part when uploading files to
// external storage, which is 5MiB for both S3 and GCS.
MinUploadPartSize int64 = 5 * units.MiB
)

// MergeOverlappingFiles reads from given files whose key range may overlap
// and writes to new sorted, nonoverlapping files.
func MergeOverlappingFiles(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
partSize int64,
readBufferSize int,
newFilePrefix string,
blockSize int,
memSizeLimit uint64,
writeBatchCount uint64,
onClose OnCloseFunc,
concurrency int,
checkHotspot bool,
) error {
dataFilesSlice := splitDataFiles(paths, concurrency)
// during encode&sort step, the writer-limit is aligned to block size, so we
// need align this too. the max additional written size per file is max-block-size.
// for max-block-size = 32MiB, adding (max-block-size * MaxMergingFilesPerThread)/10000 ~ 1MiB
// to part-size is enough.
partSize = max(MinUploadPartSize, partSize+units.MiB)

logutil.Logger(ctx).Info("start to merge overlapping files",
zap.Int("file-count", len(paths)),
zap.Int("file-groups", len(dataFilesSlice)),
zap.Int("concurrency", concurrency))
zap.Int("concurrency", concurrency),
zap.Int64("part-size", partSize))
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
partSize = max(int64(5*size.MB), partSize+int64(1*size.MB))
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
Expand All @@ -45,12 +57,9 @@ func MergeOverlappingFiles(
files,
store,
partSize,
readBufferSize,
newFilePrefix,
uuid.New().String(),
memSizeLimit,
blockSize,
writeBatchCount,
onClose,
checkHotspot,
)
Expand All @@ -59,10 +68,11 @@ func MergeOverlappingFiles(
return eg.Wait()
}

// split input data files into max 'concurrency' shares evenly, if there are not
// enough files, merge at least 2 files in one batch.
// split input data files into multiple shares evenly, with the max number files
// in each share MaxMergingFilesPerThread, if there are not enough files, merge at
// least 2 files in one batch.
func splitDataFiles(paths []string, concurrency int) [][]string {
shares := concurrency
shares := max((len(paths)+MaxMergingFilesPerThread-1)/MaxMergingFilesPerThread, concurrency)
if len(paths) < 2*concurrency {
shares = max(1, len(paths)/2)
}
Expand Down Expand Up @@ -91,30 +101,29 @@ func splitDataFiles(paths []string, concurrency int) [][]string {
// accurately, here we only consider the memory used by our code, the estimate max
// memory usage of this function is:
//
// memSizeLimit
// + 20 * partSize
// + 20 * 5MiB(stat file, we might not use all part, as stat file is quite small)
// + readBufferSize * len(paths)
// defaultOneWriterMemSizeLimit
// + MaxMergingFilesPerThread * (X + defaultReadBufferSize)
// + maxUploadWorkersPerThread * (data-part-size + 5MiB(stat-part-size))
// + memory taken by concurrent reading if check-hotspot is enabled
//
// memSizeLimit = 256 MiB now.
// partSize = index-kv-data-file-size / (10000 / MergeSortOverlapThreshold) for import into.
// readBufferSize = 64 KiB now.
// len(paths) >= kv-files-in-subtask(suppose MergeSortOverlapThreshold) / concurrency
// where X is memory used for each read connection, it's http2 for GCP, X might be
// 4 or more MiB, http1 for S3, it's smaller.
//
// with current default values, on machine with 2G per core, the estimate max memory
// usage for import into is:
//
// 128 + 250 * (4 + 64/1024) + 8 * (25.6 + 5) ~ 1.36 GiB
// where 25.6 is max part-size when there is only data kv = 1024*250/10000 = 25.6MiB
//
// TODO: seems it might OOM if partSize = 256 / (10000/4000) = 100 MiB, when write
// external storage is slow.
// for add-index, it uses more memory as check-hotspot is enabled.
func mergeOverlappingFilesInternal(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
partSize int64,
readBufferSize int,
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
Expand All @@ -127,7 +136,7 @@ func mergeOverlappingFilesInternal(
}()

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0)
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, defaultReadBufferSize, checkHotspot, 0)
if err != nil {
return err
}
Expand All @@ -139,9 +148,8 @@ func mergeOverlappingFilesInternal(
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetMemorySizeLimit(defaultOneWriterMemSizeLimit).
SetBlockSize(blockSize).
SetWriterBatchCount(writeBatchCount).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
Expand Down
Loading

0 comments on commit f9ea645

Please sign in to comment.