Skip to content

Commit

Permalink
backup: advacned prepare implementation (#48439)
Browse files Browse the repository at this point in the history
close #50359
  • Loading branch information
YuJuncen authored Jan 17, 2024
1 parent 2d77402 commit bbbada0
Show file tree
Hide file tree
Showing 23 changed files with 1,565 additions and 460 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5862,13 +5862,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "69c0e5916c604b358dc854873cb855e053c19da1cab0163fa48797eb119537f9",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20231226064240-4f28b82c7860",
sha256 = "53da7bf27e06dedfeea1b523941e8adb94d7d7ed5f93dbfb7467cb71b5d19bd6",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20240109063850-932639606bcf",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20231226064240-4f28b82c7860.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240109063850-932639606bcf.zip",
],
)
go_repository(
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "prepare_snap",
srcs = [
"env.go",
"errors.go",
"prepare.go",
"stream.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/backup/prepare_snap",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//br/pkg/utils",
"//pkg/util/engine",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "prepare_snap_test",
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
deps = [
":prepare_snap",
"//br/pkg/utils",
"//pkg/store/mockstore/unistore",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_zap//zapcore",
],
)
172 changes: 172 additions & 0 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package preparesnap

import (
"context"
"slices"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
// default max gRPC message size is 10MiB.
// split requests to chunks of 1MiB will reduce the possibility of being rejected
// due to max gRPC message size.
maxRequestSize = units.MiB
)

type Env interface {
ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error)
GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error)

LoadRegionsInKeyRange(ctx context.Context, startKey, endKey []byte) (regions []Region, err error)
}

type PrepareClient interface {
Send(*brpb.PrepareSnapshotBackupRequest) error
Recv() (*brpb.PrepareSnapshotBackupResponse, error)
}

type SplitRequestClient struct {
PrepareClient
MaxRequestSize int
}

func (s SplitRequestClient) Send(req *brpb.PrepareSnapshotBackupRequest) error {
// Try best to keeping the request untouched.
if req.Ty == brpb.PrepareSnapshotBackupRequestType_WaitApply && req.Size() > s.MaxRequestSize {
rs := req.Regions
findSplitIndex := func() int {
if len(rs) == 0 {
return -1
}

// Select at least one request.
// So we won't get sutck if there were a really huge (!) request.
collected := 0
lastI := 1
for i := 2; i < len(rs) && collected+rs[i].Size() < s.MaxRequestSize; i++ {
lastI = i
collected += rs[i].Size()
}
return lastI
}
for splitIdx := findSplitIndex(); splitIdx > 0; splitIdx = findSplitIndex() {
split := &brpb.PrepareSnapshotBackupRequest{
Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply,
Regions: rs[:splitIdx],
}
rs = rs[splitIdx:]
if err := s.PrepareClient.Send(split); err != nil {
return err
}
}
return nil
}
return s.PrepareClient.Send(req)
}

type Region interface {
GetMeta() *metapb.Region
GetLeaderStoreID() uint64
}

type CliEnv struct {
Cache *tikv.RegionCache
Mgr *utils.StoreManager
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}
withoutTiFlash := slices.DeleteFunc(stores, engine.IsTiFlash)
return withoutTiFlash, err
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
bcli := brpb.NewBackupClient(cc)
c, err := bcli.PrepareSnapshotBackup(ctx)
if err != nil {
return errors.Annotatef(err, "failed to create prepare backup stream")
}
cli = c
return nil
})
if err != nil {
return nil, err
}
return cli, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
bo := tikv.NewBackoffer(ctx, regionCacheMaxBackoffMs)
if len(endKey) == 0 {
// This is encoded [0xff; 8].
// Workaround for https://github.com/tikv/client-go/issues/1051.
endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
}
rs, err := c.Cache.LoadRegionsInKeyRange(bo, startKey, endKey)
if err != nil {
return nil, err
}
rrs := make([]Region, 0, len(rs))
for _, r := range rs {
rrs = append(rrs, r)
}
return rrs, nil
}

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
// Retry for about 2 minutes.
rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second)
bo := utils.Backoffer(&rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
}
cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) {
cli, err := r.Env.ConnectToStore(ctx, storeID)
if err != nil {
log.Warn("Failed to connect to store, will retry.", zap.Uint64("store", storeID), logutil.ShortError(err))
return nil, err
}
return cli, nil
})
if err != nil {
return nil, err
}
return SplitRequestClient{PrepareClient: cli, MaxRequestSize: maxRequestSize}, nil
}
39 changes: 39 additions & 0 deletions br/pkg/backup/prepare_snap/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package preparesnap

import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
)

func convertErr(err *errorpb.Error) error {
if err == nil {
return nil
}
return errors.New(err.Message)
}

func leaseExpired() error {
return errors.New("the lease has expired")
}

func unsupported() error {
return errors.New("unsupported operation")
}

func retryLimitExceeded() error {
return errors.New("the limit of retrying exceeded")
}
Loading

0 comments on commit bbbada0

Please sign in to comment.