Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

*: cherry-picking some PRs for v4.0.8 #562

Merged
merged 5 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ lint: tools
tidy: prepare
@echo "go mod tidy"
GO111MODULE=on go mod tidy
git diff --quiet go.mod go.sum
# tidy isn't a read-only task for go.mod, run FINISH_MOD always,
# so our go.mod1 won't stick in old state
git diff --quiet go.mod go.sum || ("$(FINISH_MOD)" && exit 1)
$(FINISH_MOD)

failpoint-enable: tools
Expand Down
14 changes: 6 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func main() {
sig := <-sc
fmt.Printf("\nGot signal [%v] to exit.\n", sig)
log.Warn("received signal to exit", zap.Stringer("signal", sig))
switch sig {
case syscall.SIGTERM:
cancel()
os.Exit(0)
default:
cancel()
os.Exit(1)
}
cancel()
fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit")
<-sc
// Even user use SIGTERM to exit, there isn't any checkpoint for resuming,
// hence returning fail exit code.
os.Exit(1)
}()

rootCmd := &cobra.Command{
Expand Down
12 changes: 3 additions & 9 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/table-filter"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -406,8 +406,6 @@ func (bc *Client) BackupRanges(
updateCh glue.Progress,
) ([]*kvproto.File, error) {
errCh := make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// we collect all files in a single goroutine to avoid thread safety issues.
filesCh := make(chan []*kvproto.File, concurrency)
Expand Down Expand Up @@ -483,8 +481,6 @@ func (bc *Client) BackupRange(
zap.Stringer("EndKey", utils.WrapKey(endKey)),
zap.Uint64("RateLimit", req.RateLimit),
zap.Uint32("Concurrency", req.Concurrency))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
Expand All @@ -497,10 +493,10 @@ func (bc *Client) BackupRange(
req.EndKey = endKey
req.StorageBackend = bc.backend

push := newPushDown(ctx, bc.mgr, len(allStores))
push := newPushDown(bc.mgr, len(allStores))

var results rtree.RangeTree
results, err = push.pushBackup(req, allStores, updateCh)
results, err = push.pushBackup(ctx, req, allStores, updateCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -801,8 +797,6 @@ func SendBackup(
respFn func(*kvproto.BackupResponse) error,
resetFn func() (kvproto.BackupClient, error),
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
Expand Down
12 changes: 5 additions & 7 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ import (

// pushDown warps a backup task.
type pushDown struct {
ctx context.Context
mgr ClientMgr
respCh chan *backup.BackupResponse
errCh chan error
}

// newPushDown creates a push down backup.
func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown {
log.Info("new backup client")
func newPushDown(mgr ClientMgr, cap int) *pushDown {
return &pushDown{
ctx: ctx,
mgr: mgr,
respCh: make(chan *backup.BackupResponse, cap),
errCh: make(chan error, cap),
Expand All @@ -38,6 +35,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown {

// FullBackup make a full backup of a tikv cluster.
func (push *pushDown) pushBackup(
ctx context.Context,
req backup.BackupRequest,
stores []*metapb.Store,
updateCh glue.Progress,
Expand All @@ -51,7 +49,7 @@ func (push *pushDown) pushBackup(
log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState()))
continue
}
client, err := push.mgr.GetBackupClient(push.ctx, storeID)
client, err := push.mgr.GetBackupClient(ctx, storeID)
if err != nil {
log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
return res, errors.Trace(err)
Expand All @@ -60,15 +58,15 @@ func (push *pushDown) pushBackup(
go func() {
defer wg.Done()
err := SendBackup(
push.ctx, storeID, client, req,
ctx, storeID, client, req,
func(resp *backup.BackupResponse) error {
// Forward all responses (including error).
push.respCh <- resp
return nil
},
func() (backup.BackupClient, error) {
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(push.ctx, storeID)
return push.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
push.errCh <- err
Expand Down
19 changes: 17 additions & 2 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package gluetikv

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
pd "github.com/tikv/pd/client"

"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -48,7 +50,7 @@ func (Glue) OwnsStorage() bool {

// StartProgress implements glue.Glue.
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)}
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0}
}

// Record implements glue.Glue.
Expand All @@ -57,15 +59,28 @@ func (Glue) Record(name string, val uint64) {
}

type progress struct {
ch chan<- struct{}
ch chan<- struct{}
closed int32
}

// Inc implements glue.Progress.
func (p progress) Inc() {
if atomic.LoadInt32(&p.closed) != 0 {
log.Warn("proposing a closed progress")
return
}
// there might be buggy if the thread is yielded here.
// however, there should not be gosched, at most time.
// so send here probably is safe, even not totally safe.
// since adding an extra lock should be costly, we just be optimistic.
// (Maybe a spin lock here would be better?)
p.ch <- struct{}{}
}

// Close implements glue.Progress.
func (p progress) Close() {
// set closed to true firstly,
// so we won't see a state that the channel is closed and the p.closed is false.
atomic.StoreInt32(&p.closed, 1)
close(p.ch)
}
36 changes: 26 additions & 10 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
// TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
Expand All @@ -73,11 +74,12 @@ var (

// DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml.
DefaultPDCfg = map[string]interface{}{
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
"enable-location-replacement": "true",
}
)

Expand Down Expand Up @@ -175,10 +177,12 @@ func NewPdController(
}

return &PdController{
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
schedulerPauseCh: make(chan struct{}),
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
// We should make a buffered channel here otherwise when context canceled,
// gracefully shutdown will stick at resuming schedulers.
schedulerPauseCh: make(chan struct{}, 1),
}, nil
}

Expand Down Expand Up @@ -408,6 +412,7 @@ func (p *PdController) UpdatePDScheduleConfig(
if e == nil {
return nil
}
log.Warn("failed to update PD config, will try next", zap.Error(e), zap.String("pd", addr))
}
return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config")
}
Expand All @@ -416,6 +421,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg))
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
Expand All @@ -441,6 +447,12 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return errors.Annotate(err, "fail to update PD schedule config")
}
if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok {
log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement))
if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -482,6 +494,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})
log.Debug("saved PD config", zap.Any("config", scheduleCfg))

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
Expand Down Expand Up @@ -512,7 +525,10 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
return undo, p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg)
if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return undo, err
}
return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"})
}

// Close close the connection to pd.
Expand Down
Loading