Skip to content

Commit

Permalink
Merge branch 'pingcap:master' into fix_runtime_merge_unconsumed
Browse files Browse the repository at this point in the history
  • Loading branch information
jiyfhust authored Mar 26, 2024
2 parents 8e0d4cc + 094f4df commit 2082420
Show file tree
Hide file tree
Showing 382 changed files with 5,942 additions and 2,753 deletions.
36 changes: 18 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6157,13 +6157,13 @@ def go_deps():
name = "com_github_prometheus_common",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/common",
sha256 = "62f8ef01a9303e8767a035de11b10ef05ec100275109ab17734af21dbc22fa09",
strip_prefix = "github.com/prometheus/common@v0.48.0",
sha256 = "f5fdc8e60b2e3e4fa56d9af84d4a2d49039a08f0d2b63c732399d735360b1ba2",
strip_prefix = "github.com/prometheus/common@v0.51.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.48.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/common/com_github_prometheus_common-v0.51.0.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7145,26 +7145,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "070de80c4f3f4a56559f52462e5a8518e38f78bbbf4f5011530c6d051b3c21b6",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240319080847-c9767e55585b",
sha256 = "86c2da8180c318c8258d4759fcae926a5613f5ff929dfb3461ce307c15cc44a5",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240322070737-05aaba6cc6f7",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240319080847-c9767e55585b.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240319080847-c9767e55585b.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240319080847-c9767e55585b.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240319080847-c9767e55585b.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240322070737-05aaba6cc6f7.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "cca1c3b7e41dc7f511ebbd99c948d9636b407bec1e271881c224934a83061e6d",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240319071242-d3b94c97c12b",
sha256 = "9ebd4a2ecfd4d7c03f59deafc9d901e3c2ea308870e4dbaf91db1295b24e7ac1",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240322051414-fb9e2d561b6e",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240319071242-d3b94c97c12b.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240319071242-d3b94c97c12b.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240319071242-d3b94c97c12b.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240319071242-d3b94c97c12b.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240322051414-fb9e2d561b6e.zip",
],
)
go_repository(
Expand Down
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ approvers:
- Leavrth
- leoppro
- lichunzhu
- lidezhu
- Little-Wallace
- liuzix
- lonng
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
31 changes: 30 additions & 1 deletion br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package preparesnap
import (
"context"
"slices"
"sync"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down
32 changes: 30 additions & 2 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -177,7 +178,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
m.onCreateStore(m.stores[storeID])
}
return m.stores[storeID], nil
return AdaptForGRPCInTest(m.stores[storeID]), nil
}

func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down Expand Up @@ -474,7 +475,6 @@ func TestSplitEnv(t *testing.T) {
}

func TestConnectionDelay(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
Expand Down Expand Up @@ -510,3 +510,31 @@ func TestConnectionDelay(t *testing.T) {
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
16 changes: 4 additions & 12 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -846,27 +847,18 @@ func removeCheckpointData(ctx context.Context, s storage.ExternalStorage, subDir
zap.Int64("remove-size", removeSize),
)

maxFailedFilesNum := 16
failedFilesCount := struct {
lock sync.Mutex
count int
}{
count: 0,
}
maxFailedFilesNum := int64(16)
var failedFilesCount atomic.Int64
pool := utils.NewWorkerPool(4, "checkpoint remove worker")
eg, gCtx := errgroup.WithContext(ctx)
for _, filename := range removedFileNames {
name := filename
pool.ApplyOnErrorGroup(eg, func() error {
if err := s.DeleteFile(gCtx, name); err != nil {
log.Warn("failed to remove the file", zap.String("filename", name), zap.Error(err))
failedFilesCount.lock.Lock()
failedFilesCount.count += 1
if failedFilesCount.count >= maxFailedFilesNum {
failedFilesCount.lock.Unlock()
if failedFilesCount.Add(1) >= maxFailedFilesNum {
return errors.Annotate(err, "failed to delete too many files")
}
failedFilesCount.lock.Unlock()
}
return nil
})
Expand Down
3 changes: 0 additions & 3 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,8 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
datas,
s.store,
int64(5*size.MB),
64*1024,
mergeOutput,
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
Expand Down
12 changes: 5 additions & 7 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,18 @@ func (r *byteReader) switchToConcurrentReader() error {
// containing those bytes. The content of returned slice may be changed after
// next call.
func (r *byteReader) readNBytes(n int) ([]byte, error) {
readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
}
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)

readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
auxBuf := make([]byte, n)
for _, b := range bs {
copy(auxBuf[len(auxBuf)-n:], b)
Expand Down
40 changes: 37 additions & 3 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt
sortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("sort")

readStart := time.Now()
readDtStartKey := e.keyAdapter.Encode(nil, startKey, common.MinRowID)
readDtEndKey := e.keyAdapter.Encode(nil, endKey, common.MinRowID)
err := readAllData(
ctx,
e.storage,
e.dataFiles,
e.statsFiles,
startKey,
endKey,
readDtStartKey,
readDtEndKey,
e.smallBlockBufPool,
e.largeBlockBufPool,
&e.memKVsAndBuffers,
Expand Down Expand Up @@ -402,7 +404,32 @@ func (e *Engine) ID() string {

// GetKeyRange implements common.Engine.
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok {
return e.startKey, e.endKey, nil
}

// when duplicate detection feature is enabled, the end key comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.

start, err := e.keyAdapter.Decode(nil, e.startKey)
if err != nil {
return nil, nil, err
}
end, err := e.keyAdapter.Decode(nil, e.endKey)
if err == nil {
return start, end, nil
}
// handle the case that end key is from Key.Next()
if e.endKey[len(e.endKey)-1] != 0 {
return nil, nil, err
}
endEncoded := e.endKey[:len(e.endKey)-1]
end, err = e.keyAdapter.Decode(nil, endEncoded)
if err != nil {
return nil, nil, err
}
return start, kv.Key(end).Next(), nil
}

// SplitRanges split the ranges by split keys provided by external engine.
Expand All @@ -412,6 +439,13 @@ func (e *Engine) SplitRanges(
_ log.Logger,
) ([]common.Range, error) {
splitKeys := e.splitKeys
for i, k := range e.splitKeys {
var err error
splitKeys[i], err = e.keyAdapter.Decode(nil, k)
if err != nil {
return nil, err
}
}
ranges := make([]common.Range, 0, len(splitKeys)+1)
ranges = append(ranges, common.Range{Start: startKey})
for i := 0; i < len(splitKeys); i++ {
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ func NewMergeKVIter(
outerConcurrency = 1
}
concurrentReaderConcurrency := max(256/outerConcurrency, 8)
// TODO: merge-sort step passes outerConcurrency=0, so this bufSize might be
// too large when checkHotspot = true(add-index).
largeBufSize := ConcurrentReaderBufferSizePerConc * concurrentReaderConcurrency
memPool := membuf.NewPool(
membuf.WithBlockNum(1), // currently only one reader will become hotspot
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/external/kv_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ import (
"encoding/binary"
"io"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var (
// default read buf size of kvReader, this buf is split into 3 parts, 2 for prefetch
// from storage, 1 for read by user.
defaultReadBufferSize = 64 * units.KiB
)

type kvReader struct {
byteReader *byteReader
}
Expand Down
Loading

0 comments on commit 2082420

Please sign in to comment.