Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv:remove tidb/config from store/tikv #22606

Merged
merged 8 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 50 additions & 70 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"os"
"os/user"
"path/filepath"
Expand Down Expand Up @@ -66,8 +65,6 @@ const (
DefMaxOfTableColumnCountLimit = 4096
// DefTxnScope is the default value for TxnScope
DefTxnScope = "global"
// DefStoresRefreshInterval is the default value of StoresRefreshInterval
DefStoresRefreshInterval = 60
)

// Valid config maps
Expand Down Expand Up @@ -104,10 +101,10 @@ type Config struct {
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches TxnLocalLatches `toml:"-" json:"-"`
TempStorageQuota int64 `toml:"tmp-storage-quota" json:"tmp-storage-quota"` // Bytes
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches tikvcfg.TxnLocalLatches `toml:"-" json:"-"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand All @@ -119,7 +116,7 @@ type Config struct {
PreparedPlanCache PreparedPlanCache `toml:"prepared-plan-cache" json:"prepared-plan-cache"`
OpenTracing OpenTracing `toml:"opentracing" json:"opentracing"`
ProxyProtocol ProxyProtocol `toml:"proxy-protocol" json:"proxy-protocol"`
PDClient PDClient `toml:"pd-client" json:"pd-client"`
PDClient tikvcfg.PDClient `toml:"pd-client" json:"pd-client"`
TiKVClient tikvcfg.TiKVClient `toml:"tikv-client" json:"tikv-client"`
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Expand Down Expand Up @@ -193,6 +190,22 @@ func (c *Config) UpdateTempStoragePath() {
}
}

func (c *Config) getTiKVConfig() *tikvcfg.Config {
return &tikvcfg.Config{
CommitterConcurrency: c.Performance.CommitterConcurrency,
MaxTxnTTL: c.Performance.MaxTxnTTL,
ServerMemoryQuota: defTiKVCfg.ServerMemoryQuota,
TiKVClient: c.TiKVClient,
Security: c.Security.ClusterSecurity(),
PDClient: c.PDClient,
PessimisticTxn: tikvcfg.PessimisticTxn{MaxRetryCount: c.PessimisticTxn.MaxRetryCount},
TxnLocalLatches: c.TxnLocalLatches,
StoresRefreshInterval: c.StoresRefreshInterval,
OpenTracingEnable: c.OpenTracing.Enable,
Path: c.Path,
}
}

func encodeDefTempStorageDir(tempDir string, host, statusHost string, port, statusPort uint) string {
dirName := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%v:%v/%v:%v", host, port, statusHost, statusPort)))
var osUID string
Expand Down Expand Up @@ -413,12 +426,6 @@ type PlanCache struct {
Shards uint `toml:"shards" json:"shards"`
}

// TxnLocalLatches is the TxnLocalLatches section of the config.
type TxnLocalLatches struct {
Enabled bool `toml:"-" json:"-"`
Capacity uint `toml:"-" json:"-"`
}

// PreparedPlanCache is the PreparedPlanCache section of the config.
type PreparedPlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Expand Down Expand Up @@ -463,12 +470,6 @@ type ProxyProtocol struct {
HeaderTimeout uint `toml:"header-timeout" json:"header-timeout"`
}

// PDClient is the config for PD client.
type PDClient struct {
// PDServerTimeout is the max time which PD client will wait for the PD server in seconds.
PDServerTimeout uint `toml:"pd-server-timeout" json:"pd-server-timeout"`
}

// Binlog is the config for binlog.
type Binlog struct {
Enable bool `toml:"enable" json:"enable"`
Expand All @@ -482,18 +483,25 @@ type Binlog struct {
Strategy string `toml:"strategy" json:"strategy"`
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
}

// DefaultPessimisticTxn returns the default configuration for PessimisticTxn
func DefaultPessimisticTxn() PessimisticTxn {
return PessimisticTxn{
MaxRetryCount: 256,
}
}

// Plugin is the config for plugin
type Plugin struct {
Dir string `toml:"dir" json:"dir"`
Load string `toml:"load" json:"load"`
}

// StmtSummary is the config for statement summary.
type StmtSummary struct {
// Enable statement summary or not.
Expand Down Expand Up @@ -525,6 +533,7 @@ type Experimental struct {
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
}

var defTiKVCfg = tikvcfg.DefaultConfig()
var defaultConf = Config{
Host: DefHost,
AdvertiseAddress: "",
Expand Down Expand Up @@ -555,13 +564,10 @@ var defaultConf = Config{
RepairMode: false,
RepairTableList: []string{},
MaxServerConnections: 0,
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 0,
},
LowerCaseTableNames: 2,
GracefulWaitBeforeShutdown: 0,
ServerVersion: "",
TxnLocalLatches: defTiKVCfg.TxnLocalLatches,
LowerCaseTableNames: 2,
GracefulWaitBeforeShutdown: 0,
ServerVersion: "",
Log: Log{
Level: "info",
Format: "text",
Expand Down Expand Up @@ -601,8 +607,8 @@ var defaultConf = Config{
TxnEntrySizeLimit: DefTxnEntrySizeLimit,
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
CommitterConcurrency: 16,
MaxTxnTTL: 60 * 60 * 1000, // 1hour
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
MemProfileInterval: "1m",
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand All @@ -625,17 +631,13 @@ var defaultConf = Config{
},
Reporter: OpenTracingReporter{},
},
PDClient: PDClient{
PDServerTimeout: 3,
},
TiKVClient: tikvcfg.DefaultTiKVClient(),
PDClient: defTiKVCfg.PDClient,
TiKVClient: defTiKVCfg.TiKVClient,
Binlog: Binlog{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
MaxRetryCount: 256,
},
PessimisticTxn: DefaultPessimisticTxn(),
StmtSummary: StmtSummary{
Enable: true,
EnableInternalQuery: false,
Expand All @@ -661,7 +663,7 @@ var defaultConf = Config{
DeprecateIntegerDisplayWidth: false,
TxnScope: DefTxnScope,
EnableEnumLengthLimit: true,
StoresRefreshInterval: DefStoresRefreshInterval,
StoresRefreshInterval: defTiKVCfg.StoresRefreshInterval,
}

var (
Expand All @@ -684,6 +686,8 @@ func GetGlobalConfig() *Config {
// StoreGlobalConfig stores a new config to the globalConf. It mostly uses in the test to avoid some data races.
func StoreGlobalConfig(config *Config) {
globalConf.Store(config)
cfg := *config.getTiKVConfig()
tikvcfg.StoreGlobalConfig(&cfg)
}

var deprecatedConfig = map[string]struct{}{
Expand Down Expand Up @@ -844,8 +848,9 @@ func (c *Config) Valid() error {
return fmt.Errorf("lower-case-table-names should be 0 or 1 or 2")
}

if c.TxnLocalLatches.Enabled && c.TxnLocalLatches.Capacity == 0 {
return fmt.Errorf("txn-local-latches.capacity can not be 0")
// txn-local-latches
if err := c.TxnLocalLatches.Valid(); err != nil {
return err
}

// For tikvclient.
Expand Down Expand Up @@ -979,28 +984,3 @@ const (
OOMActionCancel = "cancel"
OOMActionLog = "log"
)

// ParsePath parses this path.
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected [tikv] but found [%s]", u.Scheme)
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
13 changes: 0 additions & 13 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,19 +501,6 @@ func (s *testConfigSuite) TestTableColumnCountLimit(c *C) {
checkValid(DefMaxOfTableColumnCountLimit+1, false)
}

func (s *testConfigSuite) TestParsePath(c *C) {
etcdAddrs, disableGC, err := ParsePath("tikv://node1:2379,node2:2379")
c.Assert(err, IsNil)
c.Assert(etcdAddrs, DeepEquals, []string{"node1:2379", "node2:2379"})
c.Assert(disableGC, IsFalse)

_, _, err = ParsePath("tikv://node1:2379")
c.Assert(err, IsNil)
_, disableGC, err = ParsePath("tikv://node1:2379?disableGC=true")
c.Assert(err, IsNil)
c.Assert(disableGC, IsTrue)
}

func (s *testConfigSuite) TestEncodeDefTempStorageDir(c *C) {
tests := []struct {
host string
Expand Down
7 changes: 4 additions & 3 deletions store/mockstore/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/config"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/config"
)

func TestT(t *testing.T) {
Expand All @@ -31,7 +32,7 @@ func (s testSuite) SetUpSuite(c *C) {}
var _ = Suite(testSuite{})

func (s testSuite) TestConfig(c *C) {
config.UpdateGlobal(func(conf *config.Config) {
tidbcfg.UpdateGlobal(func(conf *tidbcfg.Config) {
conf.TxnLocalLatches = config.TxnLocalLatches{
Enabled: true,
Capacity: 10240,
Expand All @@ -48,7 +49,7 @@ func (s testSuite) TestConfig(c *C) {
c.Assert(store.(LatchEnableChecker).IsLatchEnabled(), IsTrue)
store.Close()

config.UpdateGlobal(func(conf *config.Config) {
tidbcfg.UpdateGlobal(func(conf *tidbcfg.Config) {
conf.TxnLocalLatches = config.TxnLocalLatches{
Enabled: false,
Capacity: 10240,
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -809,8 +809,8 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
// If the rate limit is too high, tikv will report service is busy.
// If the rate limit is too low, we can't full utilize the tikv's throughput.
// TODO: Find a self-adaptive way to control the rate limit here.
if rateLim > config.GetGlobalConfig().Performance.CommitterConcurrency {
rateLim = config.GetGlobalConfig().Performance.CommitterConcurrency
if rateLim > config.GetGlobalConfig().CommitterConcurrency {
rateLim = config.GetGlobalConfig().CommitterConcurrency
}
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
err := batchExecutor.process(batches)
Expand Down Expand Up @@ -888,13 +888,13 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
if uptime > config.GetGlobalConfig().Performance.MaxTxnTTL {
if uptime > config.GetGlobalConfig().MaxTxnTTL {
// Checks maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.Logger(bo.ctx).Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("uptime", uptime),
zap.Uint64("maxTxnTTL", config.GetGlobalConfig().Performance.MaxTxnTTL))
zap.Uint64("maxTxnTTL", config.GetGlobalConfig().MaxTxnTTL))
metrics.TiKVTTLLifeTimeReachCounter.Inc()
// the pessimistic locks may expire if the ttl manager has timed out, set `LockExpired` flag
// so that this transaction could only commit or rollback with no more statement executions
Expand Down
25 changes: 20 additions & 5 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -1298,9 +1298,24 @@ func (s *testCommitterSuite) TestAsyncCommit(c *C) {
})
}

func updateGlobalConfig(f func(conf *config.Config)) {
g := config.GetGlobalConfig()
newConf := *g
f(&newConf)
config.StoreGlobalConfig(&newConf)
}

// restoreFunc gets a function that restore the config to the current value.
func restoreGlobalConfFunc() (restore func()) {
g := config.GetGlobalConfig()
return func() {
config.StoreGlobalConfig(g)
}
}

func (s *testCommitterSuite) TestAsyncCommitCheck(c *C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
defer restoreGlobalConfFunc()()
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 16
conf.TiKVClient.AsyncCommit.TotalKeySizeLimit = 64
})
Expand All @@ -1318,12 +1333,12 @@ func (s *testCommitterSuite) TestAsyncCommitCheck(c *C) {
c.Assert(err, IsNil)
c.Assert(committer.checkAsyncCommit(), IsTrue)

config.UpdateGlobal(func(conf *config.Config) {
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 15
})
c.Assert(committer.checkAsyncCommit(), IsFalse)

config.UpdateGlobal(func(conf *config.Config) {
updateGlobalConfig(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.KeysLimit = 20
conf.TiKVClient.AsyncCommit.TotalKeySizeLimit = 63
})
Expand Down
Loading