Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
BR support TLS (#161)
Browse files Browse the repository at this point in the history
* *: support tls

* move tikv.driver to glue

* fix comments
  • Loading branch information
3pointer authored Feb 27, 2020
1 parent 3c9d42f commit 028963d
Show file tree
Hide file tree
Showing 24 changed files with 560 additions and 103 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/mattn/go-runewidth v0.0.7 // indirect
Expand All @@ -31,6 +30,9 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.opencensus.io v0.22.2 // indirect
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,12 @@ github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
Expand Down Expand Up @@ -388,11 +390,15 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2tiKGSTUnb3Ok/9CEQb9oqu9LHKQQpc=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk=
Expand Down
42 changes: 36 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conn
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
Expand All @@ -25,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/glue"
Expand All @@ -44,6 +46,7 @@ type Mgr struct {
addrs []string
cli *http.Client
}
tlsConf *tls.Config
dom *domain.Domain
storage tikv.Storage
grpcClis struct {
Expand All @@ -58,9 +61,6 @@ func pdRequest(
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) ([]byte, error) {
if addr != "" && !strings.HasPrefix("http", addr) {
addr = "http://" + addr
}
u, err := url.Parse(addr)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -88,12 +88,33 @@ func pdRequest(
}

// NewMgr creates a new Mgr.
func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
func NewMgr(
ctx context.Context,
g glue.Glue,
pdAddrs string,
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
cli := &http.Client{Timeout: 30 * time.Second}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}

processedAddrs := make([]string, 0, len(addrs))
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
// TODO need check cluster version >= 3.1 when br release
if failure == nil {
Expand All @@ -104,7 +125,7 @@ func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Stora
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}

pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
pdClient, err := pd.NewClient(addrs, securityOption)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -140,8 +161,9 @@ func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Stora
pdClient: pdClient,
storage: storage,
dom: dom,
tlsConf: tlsConf,
}
mgr.pdHTTP.addrs = addrs
mgr.pdHTTP.addrs = processedAddrs
mgr.pdHTTP.cli = cli
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
return mgr, nil
Expand Down Expand Up @@ -217,6 +239,9 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if mgr.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
Expand Down Expand Up @@ -269,6 +294,11 @@ func (mgr *Mgr) GetTiKV() tikv.Storage {
return mgr.storage
}

// GetTLSConfig returns the tls config
func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.tlsConf
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *tikv.LockResolver {
return mgr.storage.GetLockResolver()
Expand Down
2 changes: 2 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -13,6 +14,7 @@ import (
type Glue interface {
BootstrapSession(store kv.Storage) (*domain.Domain, error)
CreateSession(store kv.Storage) (Session, error)
Open(path string, option pd.SecurityOption) (kv.Storage, error)
}

// Session is an abstraction of the session.Session interface.
Expand Down
15 changes: 15 additions & 0 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"context"

"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
)
Expand All @@ -35,6 +38,18 @@ func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
return &tidbSession{se: se}, nil
}

// Open implements glue.Glue
func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
if option.CAPath != "" {
conf := config.GetGlobalConfig()
conf.Security.ClusterSSLCA = option.CAPath
conf.Security.ClusterSSLCert = option.CertPath
conf.Security.ClusterSSLKey = option.KeyPath
config.StoreGlobalConfig(conf)
}
return tikv.Driver{}.Open(path)
}

// Execute implements glue.Session
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
Expand Down
19 changes: 16 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"context"
"crypto/tls"
"encoding/json"
"math"
"sort"
Expand All @@ -20,6 +21,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
Expand All @@ -41,6 +43,7 @@ type Client struct {
fileImporter FileImporter
workerPool *utils.WorkerPool
tableWorkerPool *utils.WorkerPool
tlsConf *tls.Config

databases map[string]*utils.Database
ddlJobs []*model.Job
Expand All @@ -57,6 +60,7 @@ func NewRestoreClient(
g glue.Glue,
pdClient pd.Client,
store kv.Storage,
tlsConf *tls.Config,
) (*Client, error) {
ctx, cancel := context.WithCancel(ctx)
db, err := NewDB(g, store)
Expand All @@ -71,6 +75,7 @@ func NewRestoreClient(
pdClient: pdClient,
tableWorkerPool: utils.NewWorkerPool(128, "table"),
db: db,
tlsConf: tlsConf,
}, nil
}

Expand Down Expand Up @@ -112,8 +117,8 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importClient := NewImportClient(metaClient, rc.tlsConf)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit)
return nil
}
Expand All @@ -128,6 +133,11 @@ func (rc *Client) EnableOnline() {
rc.isOnline = true
}

// GetTLSConfig returns the tls config
func (rc *Client) GetTLSConfig() *tls.Config {
return rc.tlsConf
}

// GetTS gets a new timestamp from PD
func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
p, l, err := rc.pdClient.GetTS(ctx)
Expand All @@ -145,7 +155,7 @@ func (rc *Client) ResetTS(pdAddrs []string) error {
i := 0
return utils.WithRetry(rc.ctx, func() error {
idx := i % len(pdAddrs)
return utils.ResetTS(pdAddrs[idx], restoreTS)
return utils.ResetTS(pdAddrs[idx], restoreTS, rc.tlsConf)
}, newResetTSBackoffer())
}

Expand Down Expand Up @@ -332,6 +342,9 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
bfConf.MaxDelay = time.Second * 3
for _, store := range stores {
opt := grpc.WithInsecure()
if rc.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(rc.tlsConf))
}
gctx, cancel := context.WithTimeout(ctx, time.Second*5)
keepAlive := 10
keepAliveTimeout := 3
Expand Down
12 changes: 10 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"context"
"crypto/tls"
"strings"
"sync"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/pingcap/pd/pkg/codec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -47,13 +49,15 @@ type importClient struct {
mu sync.Mutex
metaClient SplitClient
clients map[uint64]import_sstpb.ImportSSTClient
tlsConf *tls.Config
}

// NewImportClient returns a new ImporterClient
func NewImportClient(metaClient SplitClient) ImporterClient {
func NewImportClient(metaClient SplitClient, tlsConf *tls.Config) ImporterClient {
return &importClient{
metaClient: metaClient,
clients: make(map[uint64]import_sstpb.ImportSSTClient),
tlsConf: tlsConf,
}
}

Expand Down Expand Up @@ -107,7 +111,11 @@ func (ic *importClient) getImportClient(
if err != nil {
return nil, err
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if ic.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(ic.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package restore
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -20,6 +21,7 @@ import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/pd/server/schedule/placement"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// SplitClient is an external client used by RegionSplitter.
Expand Down Expand Up @@ -58,13 +60,15 @@ type SplitClient interface {
type pdClient struct {
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
storeCache map[uint64]*metapb.Store
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient {
return &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
}
}
Expand Down Expand Up @@ -199,7 +203,11 @@ func (c *pdClient) BatchSplitRegions(
if err != nil {
return nil, err
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func SplitRanges(
elapsed := time.Since(start)
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient()))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 028963d

Please sign in to comment.