Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move tikv driver out #22651

Merged
merged 10 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func main() {
flag.PrintDefaults()
err := logutil.InitZapLogger(logutil.NewLogConfig(*logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
terror.MustNil(err)
err = store.Register("tikv", tikv.Driver{})
err = store.Register("tikv", store.TiKVDriver{})
terror.MustNil(err)
ut := newBenchDB()
works := strings.Split(*runJobs, "|")
Expand Down
4 changes: 2 additions & 2 deletions cmd/benchkv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
storepkg "github.com/pingcap/tidb/store"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
Expand Down Expand Up @@ -69,7 +69,7 @@ var (

// Init initializes information.
func Init() {
driver := tikv.Driver{}
driver := storepkg.TiKVDriver{}
var err error
store, err = driver.Open(fmt.Sprintf("tikv://%s?cluster=1", *pdAddr))
terror.MustNil(err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -1061,5 +1060,5 @@ func addEnvPath(newPath string) {

func init() {
rand.Seed(time.Now().UnixNano())
store.Register("tikv", tikv.Driver{})
store.Register("tikv", store.TiKVDriver{})
}
3 changes: 2 additions & 1 deletion session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (s *testSessionSuiteBase) SetUpSuite(c *C) {
if *withTiKV {
initPdAddrs()
s.pdAddr = <-pdAddrChan
var d tikv.Driver
var d store.TiKVDriver
config.UpdateGlobal(func(conf *config.Config) {
conf.TxnLocalLatches.Enabled = false
})
Expand Down
6 changes: 5 additions & 1 deletion store/mockstore/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ func newMockTikvStore(opt *mockOptions) (kv.Storage, error) {
}
opt.clusterInspector(cluster)

return tikv.NewTestTiKVStore(client, pdClient, opt.clientHijacker, opt.pdClientHijacker, opt.txnLocalLatches)
kvstore, err := tikv.NewTestTiKVStore(client, pdClient, opt.clientHijacker, opt.pdClientHijacker, opt.txnLocalLatches)
if err != nil {
return nil, err
}
return &mockStorage{KVStore: kvstore}, nil
}
21 changes: 20 additions & 1 deletion store/mockstore/unistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package mockstore

import (
"crypto/tls"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/unistore"
Expand All @@ -31,5 +33,22 @@ func newUnistore(opts *mockOptions) (kv.Storage, error) {
Client: pdClient,
}

return tikv.NewTestTiKVStore(client, pdClient, opts.clientHijacker, opts.pdClientHijacker, opts.txnLocalLatches)
kvstore, err := tikv.NewTestTiKVStore(client, pdClient, opts.clientHijacker, opts.pdClientHijacker, opts.txnLocalLatches)
if err != nil {
return nil, err
}
return &mockStorage{KVStore: kvstore}, nil
}

// Wraps tikv.KVStore and make it compatible with kv.Storage.
type mockStorage struct {
*tikv.KVStore
}

func (s *mockStorage) EtcdAddrs() ([]string, error) {
return nil, nil
}

func (s *mockStorage) TLSConfig() *tls.Config {
return nil
}
4 changes: 2 additions & 2 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type testAsyncCommitCommon struct {

func (s *testAsyncCommitCommon) setUpTest(c *C) {
if *WithTiKV {
s.store = NewTestStore(c).(*KVStore)
s.store = NewTestStore(c)
return
}

Expand All @@ -51,7 +51,7 @@ func (s *testAsyncCommitCommon) setUpTest(c *C) {
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)

s.store = store.(*KVStore)
s.store = store
}

func (s *testAsyncCommitCommon) putAlphabets(c *C, enableAsyncCommit bool) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type testBackoffSuite struct {
var _ = Suite(&testBackoffSuite{})

func (s *testBackoffSuite) SetUpTest(c *C) {
s.store = NewTestStore(c).(*KVStore)
s.store = NewTestStore(c)
}

func (s *testBackoffSuite) TearDownTest(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *testDeleteRangeSuite) SetUpTest(c *C) {
// )
// c.Assert(err, IsNil)

s.store = store.(*KVStore)
s.store = store
}

func (s *testDeleteRangeSuite) TearDownTest(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Suite(&testIsolationSuite{})

func (s *testIsolationSuite) SetUpSuite(c *C) {
s.OneByOneSuite.SetUpSuite(c)
s.store = NewTestStore(c).(*KVStore)
s.store = NewTestStore(c)
}

func (s *testIsolationSuite) TearDownSuite(c *C) {
Expand Down
151 changes: 8 additions & 143 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package tikv
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -35,25 +32,11 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type storeCache struct {
sync.Mutex
cache map[string]*KVStore
}

var mc storeCache

// Driver implements engine Driver.
type Driver struct {
}

func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
cli, err := clientv3.New(clientv3.Config{
Expand All @@ -70,67 +53,6 @@ func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, erro
return cli, nil
}

// Open opens or creates an TiKV storage with given path.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func (d Driver) Open(path string) (kv.Storage, error) {
mc.Lock()
defer mc.Unlock()
security := config.GetGlobalConfig().Security
pdConfig := config.GetGlobalConfig().PDClient
tikvConfig := config.GetGlobalConfig().TiKVClient
txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches
etcdAddrs, disableGC, err := config.ParsePath(path)
if err != nil {
return nil, errors.Trace(err)
}

pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
}, pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(tikvConfig.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(tikvConfig.GrpcKeepAliveTimeout) * time.Second,
}),
), pd.WithCustomTimeoutOption(time.Duration(pdConfig.PDServerTimeout)*time.Second))
pdCli = execdetails.InterceptedPDClient{Client: pdCli}

if err != nil {
return nil, errors.Trace(err)
}

// FIXME: uuid will be a very long and ugly string, simplify it.
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))
if store, ok := mc.cache[uuid]; ok {
return store, nil
}

tlsConfig, err := security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}

spkv, err := NewEtcdSafePointKV(etcdAddrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}

coprCacheConfig := &config.GetGlobalConfig().TiKVClient.CoprCache
s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, NewRPCClient(security), !disableGC, coprCacheConfig)
if err != nil {
return nil, errors.Trace(err)
}
if txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(txnLocalLatches.Capacity)
}
s.etcdAddrs = etcdAddrs
s.tlsConfig = tlsConfig

mc.cache[uuid] = s
return s, nil
}

// EtcdBackend is used for judging a storage is a real TiKV.
type EtcdBackend interface {
EtcdAddrs() ([]string, error)
Expand All @@ -153,10 +75,9 @@ type KVStore struct {
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
gcWorker GCHandler
etcdAddrs []string
tlsConfig *tls.Config
mock bool
enableGC bool

mock bool
enableGC bool

kv SafePointKV
safePoint uint64
Expand Down Expand Up @@ -243,58 +164,6 @@ func (s *KVStore) IsLatchEnabled() bool {
return s.txnLatches != nil
}

var (
ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No
)

// EtcdAddrs returns etcd server addresses.
func (s *KVStore) EtcdAddrs() ([]string, error) {
if s.etcdAddrs == nil {
return nil, nil
}

if ldflagGetEtcdAddrsFromConfig == "1" {
// For automated test purpose.
// To manipulate connection to etcd by mandatorily setting path to a proxy.
cfg := config.GetGlobalConfig()
return strings.Split(cfg.Path, ","), nil
}

ctx := context.Background()
bo := NewBackoffer(ctx, GetAllMembersBackoff)
etcdAddrs := make([]string, 0)
pdClient := s.pdClient
if pdClient == nil {
return nil, errors.New("Etcd client not found")
}
for {
members, err := pdClient.GetAllMembers(ctx)
if err != nil {
err := bo.Backoff(BoRegionMiss, err)
if err != nil {
return nil, err
}
continue
}
for _, member := range members {
if len(member.ClientUrls) > 0 {
u, err := url.Parse(member.ClientUrls[0])
if err != nil {
logutil.BgLogger().Error("fail to parse client url from pd members", zap.String("client_url", member.ClientUrls[0]), zap.Error(err))
return nil, err
}
etcdAddrs = append(etcdAddrs, u.Host)
}
}
return etcdAddrs, nil
}
}

// TLSConfig returns the tls config to connect to etcd.
func (s *KVStore) TLSConfig() *tls.Config {
return s.tlsConfig
}

// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once.
func (s *KVStore) StartGCWorker() error {
if !s.enableGC || NewGCHandlerFunc == nil {
Expand Down Expand Up @@ -373,10 +242,6 @@ func (s *KVStore) GetSnapshot(ver kv.Version) kv.Snapshot {

// Close store
func (s *KVStore) Close() error {
mc.Lock()
defer mc.Unlock()

delete(mc.cache, s.uuid)
s.oracle.Close()
s.pdClient.Close()
if s.gcWorker != nil {
Expand Down Expand Up @@ -483,6 +348,11 @@ func (s *KVStore) GetOracle() oracle.Oracle {
return s.oracle
}

// GetPDClient returns the PD client.
func (s *KVStore) GetPDClient() pd.Client {
return s.pdClient
}

// Name gets the name of the storage engine
func (s *KVStore) Name() string {
return "TiKV"
Expand Down Expand Up @@ -553,8 +423,3 @@ func (s *KVStore) GetTiKVClient() (client Client) {
func (s *KVStore) GetMemCache() kv.MemManager {
return s.memCache
}

func init() {
mc.cache = make(map[string]*KVStore)
rand.Seed(time.Now().UnixNano())
}
2 changes: 1 addition & 1 deletion store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type testLockSuite struct {
var _ = Suite(&testLockSuite{})

func (s *testLockSuite) SetUpTest(c *C) {
s.store = NewTestStore(c).(*KVStore)
s.store = NewTestStore(c)
}

func (s *testLockSuite) TearDownTest(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/prewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *testPrewriteSuite) SetUpTest(c *C) {
unistore.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)
s.store = store.(*KVStore)
s.store = store
}

func (s *testPrewriteSuite) TestSetMinCommitTSInAsyncCommit(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/range_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *testRangeTaskSuite) SetUpTest(c *C) {
// }),
// )
// c.Assert(err, IsNil)
s.store = store.(*KVStore)
s.store = store

s.testRanges = []kv.KeyRange{
makeRange("", ""),
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Suite(&testSafePointSuite{})

func (s *testSafePointSuite) SetUpSuite(c *C) {
s.OneByOneSuite.SetUpSuite(c)
s.store = NewTestStore(c).(*KVStore)
s.store = NewTestStore(c)
s.prefix = fmt.Sprintf("seek_%d", time.Now().Unix())
}

Expand Down
Loading