Skip to content

Commit

Permalink
localbackend: fix resource leak when err on new local backend (#53664) (
Browse files Browse the repository at this point in the history
#54618)

close #53659
  • Loading branch information
ti-chi-bot authored Jul 16, 2024
1 parent 88ea385 commit ac8777a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 27 deletions.
73 changes: 52 additions & 21 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,40 @@ func NewBackend(
}
}()
config.adjust()
pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
var (
pdCtl *pdutil.PdController
spkv *tikvclient.EtcdSafePointKV
pdCliForTiKV *tikvclient.CodecPDClient
rpcCli tikvclient.Client
tikvCli *tikvclient.KVStore
importClientFactory *importClientFactoryImpl
multiIngestSupported bool
)
defer func() {
if err == nil {
return
}
if importClientFactory != nil {
importClientFactory.Close()
}
if tikvCli != nil {
// tikvCli uses pdCliForTiKV(which wraps pdCli) , spkv and rpcCli, so
// close tikvCli will close all of them.
_ = tikvCli.Close()
} else {
if rpcCli != nil {
_ = rpcCli.Close()
}
if spkv != nil {
_ = spkv.Close()
}
// pdCliForTiKV wraps pdCli, so we only need close pdCtl
if pdCtl != nil {
pdCtl.Close()
}
}
}()
pdCtl, err = pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
Expand Down Expand Up @@ -580,12 +613,11 @@ func NewBackend(
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
spkv, err = tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
Expand All @@ -596,12 +628,16 @@ func NewBackend(
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
rpcCli = tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err = tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCtl, importClientFactory)
if err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if config.DupeDetectEnabled {
keyAdapter = common.DupDetectKeyAdapter{}
Expand Down Expand Up @@ -629,6 +665,7 @@ func NewBackend(

BackendConfig: config,

supportMultiIngest: multiIngestSupported,
duplicateDB: duplicateDB,
keyAdapter: keyAdapter,
importClientFactory: importClientFactory,
Expand All @@ -639,9 +676,6 @@ func NewBackend(
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}
if err = local.checkMultiIngestSupport(ctx); err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

return local, nil
}
Expand All @@ -659,10 +693,10 @@ func (local *Backend) TotalMemoryConsume() int64 {
return memConsume + local.bufferPool.TotalSize()
}

func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
func checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController, importClientFactory ImportClientFactory) (bool, error) {
stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

hasTiFlash := false
Expand All @@ -684,10 +718,10 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
client, err1 := importClientFactory.Create(ctx, s.Id)
if err1 != nil {
err = err1
log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
Expand All @@ -700,8 +734,7 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
return false, nil
}
}
log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
Expand All @@ -711,17 +744,15 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
return false, errors.Trace(err)
}
log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
return false, nil
}
}

local.supportMultiIngest = true
log.FromContext(ctx).Info("multi ingest support")
return nil
return true, nil
}

// rlock read locks a local file and returns the Engine instance if it exists.
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,13 +1070,12 @@ func TestMultiIngest(t *testing.T) {
return importCli
},
},
logger: log.L(),
}
err := local.checkMultiIngestSupport(context.Background())
supportMultiIngest, err := checkMultiIngestSupport(context.Background(), local.pdCtl, local.importClientFactory)
if err != nil {
require.Contains(t, err.Error(), testCase.retErr)
} else {
require.Equal(t, testCase.supportMutliIngest, local.supportMultiIngest)
require.Equal(t, testCase.supportMutliIngest, supportMultiIngest)
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
goerrors "errors"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -121,7 +122,7 @@ func PaginateScanRegion(
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
return err
}
Expand Down Expand Up @@ -235,7 +236,9 @@ func NewWaitRegionOnlineBackoffer() utils.Backoffer {

// NextBackoff returns a duration to wait before retrying again
func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration {
if berrors.ErrPDBatchScanRegion.Equal(err) {
// TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion?
var perr *errors.Error
if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() {
// it needs more time to wait splitting the regions that contains data in PITR.
// 2s * 150
delayTime := b.Stat.ExponentialBackoff()
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package restore_test
import (
"context"
"encoding/binary"
goerrors "errors"
"fmt"
"math/rand"
"testing"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -353,7 +355,9 @@ func TestPaginateScanRegion(t *testing.T) {
tc.InjectTimes = 5
_, err = split.PaginateScanRegion(ctx, tc, []byte{}, []byte{}, 3)
require.Error(t, err)
require.True(t, berrors.ErrPDBatchScanRegion.Equal(err))
var perr *errors.Error
goerrors.As(err, &perr)
require.EqualValues(t, berrors.ErrPDBatchScanRegion.ID(), perr.ID())

// make the regionMap losing some region, this will cause scan region check fails
// region ID is key+1, so region 4 is deleted
Expand Down

0 comments on commit ac8777a

Please sign in to comment.