Skip to content

Commit

Permalink
store/tikv:remove tidb/config from store/tikv (#22606)
Browse files Browse the repository at this point in the history
Signed-off-by: shirly <AndreMouche@126.com>
  • Loading branch information
AndreMouche authored Feb 1, 2021
1 parent 579421f commit 85a9669
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 117 deletions.
120 changes: 50 additions & 70 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"os"
"os/user"
"path/filepath"
Expand Down Expand Up @@ -66,8 +65,6 @@ const (
DefMaxOfTableColumnCountLimit = 4096
// DefTxnScope is the default value for TxnScope
DefTxnScope = "global"
// DefStoresRefreshInterval is the default value of StoresRefreshInterval
DefStoresRefreshInterval = 60
)

// Valid config maps
Expand Down Expand Up @@ -104,10 +101,10 @@ type Config struct {
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches TxnLocalLatches `toml:"-" json:"-"`
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand All @@ -119,7 +116,7 @@ type Config struct {
PreparedPlanCache PreparedPlanCache `toml:"prepared-plan-cache" json:"prepared-plan-cache"`
OpenTracing OpenTracing `toml:"opentracing" json:"opentracing"`
ProxyProtocol ProxyProtocol `toml:"proxy-protocol" json:"proxy-protocol"`
PDClient PDClient `toml:"pd-client" json:"pd-client"`
PDClient tikvcfg.PDClient `toml:"pd-client" json:"pd-client"`
TiKVClient tikvcfg.TiKVClient `toml:"tikv-client" json:"tikv-client"`
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Expand Down Expand Up @@ -193,6 +190,22 @@ func (c *Config) UpdateTempStoragePath() {
}
}

func (c *Config) getTiKVConfig() *tikvcfg.Config {
return &tikvcfg.Config{
CommitterConcurrency: c.Performance.CommitterConcurrency,
MaxTxnTTL: c.Performance.MaxTxnTTL,
ServerMemoryQuota: defTiKVCfg.ServerMemoryQuota,
TiKVClient: c.TiKVClient,
Security: c.Security.ClusterSecurity(),
PDClient: c.PDClient,
PessimisticTxn: tikvcfg.PessimisticTxn{MaxRetryCount: c.PessimisticTxn.MaxRetryCount},
TxnLocalLatches: c.TxnLocalLatches,
StoresRefreshInterval: c.StoresRefreshInterval,
OpenTracingEnable: c.OpenTracing.Enable,
Path: c.Path,
}
}

func encodeDefTempStorageDir(tempDir string, host, statusHost string, port, statusPort uint) string {
dirName := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%v:%v/%v:%v", host, port, statusHost, statusPort)))
var osUID string
Expand Down Expand Up @@ -413,12 +426,6 @@ type PlanCache struct {
Shards uint `toml:"shards" json:"shards"`
}

// TxnLocalLatches is the TxnLocalLatches section of the config.
type TxnLocalLatches struct {
Enabled bool `toml:"-" json:"-"`
Capacity uint `toml:"-" json:"-"`
}

// PreparedPlanCache is the PreparedPlanCache section of the config.
type PreparedPlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Expand Down Expand Up @@ -463,12 +470,6 @@ type ProxyProtocol struct {
HeaderTimeout uint `toml:"header-timeout" json:"header-timeout"`
}

// PDClient is the config for PD client.
type PDClient struct {
// PDServerTimeout is the max time which PD client will wait for the PD server in seconds.
PDServerTimeout uint `toml:"pd-server-timeout" json:"pd-server-timeout"`
}

// Binlog is the config for binlog.
type Binlog struct {
Enable bool `toml:"enable" json:"enable"`
Expand All @@ -482,18 +483,25 @@ type Binlog struct {
Strategy string `toml:"strategy" json:"strategy"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
}

// DefaultPessimisticTxn returns the default configuration for PessimisticTxn
func DefaultPessimisticTxn() PessimisticTxn {
return PessimisticTxn{
MaxRetryCount: 256,
}
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// StmtSummary is the config for statement summary.
type StmtSummary struct {
// Enable statement summary or not.
Expand Down Expand Up @@ -525,6 +533,7 @@ type Experimental struct {
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
}

var defTiKVCfg = tikvcfg.DefaultConfig()
var defaultConf = Config{
Host: DefHost,
AdvertiseAddress: "",
Expand Down Expand Up @@ -555,13 +564,10 @@ var defaultConf = Config{
RepairMode: false,
RepairTableList: []string{},
MaxServerConnections: 0,
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 0,
},
LowerCaseTableNames: 2,
GracefulWaitBeforeShutdown: 0,
ServerVersion: "",
TxnLocalLatches: defTiKVCfg.TxnLocalLatches,
LowerCaseTableNames: 2,
GracefulWaitBeforeShutdown: 0,
ServerVersion: "",
Log: Log{
Level: "info",
Format: "text",
Expand Down Expand Up @@ -601,8 +607,8 @@ var defaultConf = Config{
TxnEntrySizeLimit: DefTxnEntrySizeLimit,
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
CommitterConcurrency: 16,
MaxTxnTTL: 60 * 60 * 1000, // 1hour
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
MemProfileInterval: "1m",
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand All @@ -625,17 +631,13 @@ var defaultConf = Config{
},
Reporter: OpenTracingReporter{},
},
PDClient: PDClient{
PDServerTimeout: 3,
},
TiKVClient: tikvcfg.DefaultTiKVClient(),
PDClient: defTiKVCfg.PDClient,
TiKVClient: defTiKVCfg.TiKVClient,
Binlog: Binlog{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
MaxRetryCount: 256,
},
PessimisticTxn: DefaultPessimisticTxn(),
StmtSummary: StmtSummary{
Enable: true,
EnableInternalQuery: false,
Expand All @@ -661,7 +663,7 @@ var defaultConf = Config{
DeprecateIntegerDisplayWidth: false,
TxnScope: DefTxnScope,
EnableEnumLengthLimit: true,
StoresRefreshInterval: DefStoresRefreshInterval,
StoresRefreshInterval: defTiKVCfg.StoresRefreshInterval,
}

var (
Expand All @@ -684,6 +686,8 @@ func GetGlobalConfig() *Config {
// StoreGlobalConfig stores a new config to the globalConf. It mostly uses in the test to avoid some data races.
func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
cfg := *config.getTiKVConfig()
tikvcfg.StoreGlobalConfig(&cfg)
}

var deprecatedConfig = map[string]struct{}{
Expand Down Expand Up @@ -844,8 +848,9 @@ func (c *Config) Valid() error {
return fmt.Errorf("lower-case-table-names should be 0 or 1 or 2")
}

if c.TxnLocalLatches.Enabled && c.TxnLocalLatches.Capacity == 0 {
return fmt.Errorf("txn-local-latches.capacity can not be 0")
// txn-local-latches
if err := c.TxnLocalLatches.Valid(); err != nil {
return err
}

// For tikvclient.
Expand Down Expand Up @@ -979,28 +984,3 @@ const (
OOMActionCancel = "cancel"
OOMActionLog = "log"
)

// ParsePath parses this path.
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected [tikv] but found [%s]", u.Scheme)
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
13 changes: 0 additions & 13 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,19 +501,6 @@ func (s *testConfigSuite) TestTableColumnCountLimit(c *C) {
checkValid(DefMaxOfTableColumnCountLimit+1, false)
}

func (s *testConfigSuite) TestParsePath(c *C) {
etcdAddrs, disableGC, err := ParsePath("tikv://node1:2379,node2:2379")
c.Assert(err, IsNil)
c.Assert(etcdAddrs, DeepEquals, []string{"node1:2379", "node2:2379"})
c.Assert(disableGC, IsFalse)

_, _, err = ParsePath("tikv://node1:2379")
c.Assert(err, IsNil)
_, disableGC, err = ParsePath("tikv://node1:2379?disableGC=true")
c.Assert(err, IsNil)
c.Assert(disableGC, IsTrue)
}

func (s *testConfigSuite) TestEncodeDefTempStorageDir(c *C) {
tests := []struct {
host string
Expand Down
7 changes: 4 additions & 3 deletions store/mockstore/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/config"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/config"
)

func TestT(t *testing.T) {
Expand All @@ -31,7 +32,7 @@ func (s testSuite) SetUpSuite(c *C) {}
var _ = Suite(testSuite{})

func (s testSuite) TestConfig(c *C) {
config.UpdateGlobal(func(conf *config.Config) {
tidbcfg.UpdateGlobal(func(conf *tidbcfg.Config) {
conf.TxnLocalLatches = config.TxnLocalLatches{
Enabled: true,
Capacity: 10240,
Expand All @@ -48,7 +49,7 @@ func (s testSuite) TestConfig(c *C) {
c.Assert(store.(LatchEnableChecker).IsLatchEnabled(), IsTrue)
store.Close()

config.UpdateGlobal(func(conf *config.Config) {
tidbcfg.UpdateGlobal(func(conf *tidbcfg.Config) {
conf.TxnLocalLatches = config.TxnLocalLatches{
Enabled: false,
Capacity: 10240,
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -809,8 +809,8 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
// If the rate limit is too high, tikv will report service is busy.
// If the rate limit is too low, we can't full utilize the tikv's throughput.
// TODO: Find a self-adaptive way to control the rate limit here.
if rateLim > config.GetGlobalConfig().Performance.CommitterConcurrency {
rateLim = config.GetGlobalConfig().Performance.CommitterConcurrency
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
}
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
err := batchExecutor.process(batches)
Expand Down Expand Up @@ -888,13 +888,13 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
if uptime > config.GetGlobalConfig().Performance.MaxTxnTTL {
if uptime > config.GetGlobalConfig().MaxTxnTTL {
// Checks maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.Logger(bo.ctx).Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("uptime", uptime),
zap.Uint64("maxTxnTTL", config.GetGlobalConfig().Performance.MaxTxnTTL))
zap.Uint64("maxTxnTTL", config.GetGlobalConfig().MaxTxnTTL))
metrics.TiKVTTLLifeTimeReachCounter.Inc()
// the pessimistic locks may expire if the ttl manager has timed out, set `LockExpired` flag
// so that this transaction could only commit or rollback with no more statement executions
Expand Down
25 changes: 20 additions & 5 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -1297,9 +1297,24 @@ func (s *testCommitterSuite) TestAsyncCommit(c *C) {
})
}

func updateGlobalConfig(f func(conf *config.Config)) {
g := config.GetGlobalConfig()
newConf := *g
f(&newConf)
config.StoreGlobalConfig(&newConf)
}

// restoreFunc gets a function that restore the config to the current value.
func restoreGlobalConfFunc() (restore func()) {
g := config.GetGlobalConfig()
return func() {
config.StoreGlobalConfig(g)
}
}

func (s *testCommitterSuite) TestAsyncCommitCheck(c *C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
defer restoreGlobalConfFunc()()
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 16
conf.TiKVClient.AsyncCommit.TotalKeySizeLimit = 64
})
Expand All @@ -1317,12 +1332,12 @@ func (s *testCommitterSuite) TestAsyncCommitCheck(c *C) {
c.Assert(err, IsNil)
c.Assert(committer.checkAsyncCommit(), IsTrue)

config.UpdateGlobal(func(conf *config.Config) {
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 15
})
c.Assert(committer.checkAsyncCommit(), IsFalse)

config.UpdateGlobal(func(conf *config.Config) {
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 20
conf.TiKVClient.AsyncCommit.TotalKeySizeLimit = 63
})
Expand Down
Loading

0 comments on commit 85a9669

Please sign in to comment.