Skip to content

Commit

Permalink
Merge branch 'release-6.1' into release-6.1-25dda978ff9f
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Sep 25, 2022
2 parents a645170 + 30bd180 commit 532a73a
Show file tree
Hide file tree
Showing 45 changed files with 735 additions and 488 deletions.
31 changes: 22 additions & 9 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
tmysql "github.com/pingcap/tidb/errno"
drivererr "github.com/pingcap/tidb/store/driver/error"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -67,6 +67,26 @@ func IsRetryableError(err error) bool {
return true
}

var retryableErrorIDs = map[errors.ErrorID]struct{}{
ErrKVEpochNotMatch.ID(): {},
ErrKVNotLeader.ID(): {},
ErrKVRegionNotFound.ID(): {},
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
ErrKVServerIsBusy.ID(): {},
ErrKVReadIndexNotReady.ID(): {},
ErrKVIngestFailed.ID(): {},
ErrKVRaftProposalDropped.ID(): {},
// during checksum coprocessor will transform error into driver error in handleCopResponse using ToTiDBErr
// met ErrRegionUnavailable on free-tier import during checksum, others hasn't met yet
drivererr.ErrRegionUnavailable.ID(): {},
drivererr.ErrTiKVStaleCommand.ID(): {},
drivererr.ErrTiKVServerTimeout.ID(): {},
drivererr.ErrTiKVServerBusy.ID(): {},
drivererr.ErrUnknown.ID(): {},
}

func isSingleRetryableError(err error) bool {
err = errors.Cause(err)

Expand Down Expand Up @@ -101,14 +121,7 @@ func isSingleRetryableError(err error) bool {
return false
}
case *errors.Error:
switch {
case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader),
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy),
berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed),
berrors.Is(nerr, ErrKVRaftProposalDropped):
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
if _, ok := retryableErrorIDs[nerr.ID()]; ok {
return true
}
return false
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/errno"
drivererr "github.com/pingcap/tidb/store/driver/error"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -54,6 +55,13 @@ func TestIsRetryableError(t *testing.T) {
require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test")))

// tidb error
require.True(t, IsRetryableError(drivererr.ErrRegionUnavailable))
require.True(t, IsRetryableError(drivererr.ErrTiKVStaleCommand))
require.True(t, IsRetryableError(drivererr.ErrTiKVServerTimeout))
require.True(t, IsRetryableError(drivererr.ErrTiKVServerBusy))
require.True(t, IsRetryableError(drivererr.ErrUnknown))

// net: connection refused
_, err := net.Dial("tcp", "localhost:65533")
require.Error(t, err)
Expand Down
117 changes: 43 additions & 74 deletions br/pkg/lightning/common/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,97 +17,62 @@ package common
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"net/http/httptest"
"os"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/util"
"github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type TLS struct {
caPath string
certPath string
keyPath string
inner *tls.Config
client *http.Client
url string
caPath string
certPath string
keyPath string
caBytes []byte
certBytes []byte
keyBytes []byte
inner *tls.Config
client *http.Client
url string
}

// ToTLSConfig constructs a `*tls.Config` from the CA, certification and key
// paths.
//
// If the CA path is empty, returns nil.
func ToTLSConfig(caPath, certPath, keyPath string) (*tls.Config, error) {
if len(caPath) == 0 {
return nil, nil
}

// Create a certificate pool from CA
certPool := x509.NewCertPool()
ca, err := os.ReadFile(caPath)
// NewTLS constructs a new HTTP client with TLS configured with the CA,
// certificate and key paths.
func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error) {
inner, err := util.NewTLSConfig(
util.WithCAPath(caPath),
util.WithCertAndKeyPath(certPath, keyPath),
util.WithCAContent(caBytes),
util.WithCertAndKeyContent(certBytes, keyBytes),
)
if err != nil {
return nil, errors.Annotate(err, "could not read ca certificate")
}

// Append the certificates from the CA
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append ca certs")
}

tlsConfig := &tls.Config{
RootCAs: certPool,
NextProtos: []string{"h2", "http/1.1"}, // specify `h2` to let Go use HTTP/2.
MinVersion: tls.VersionTLS12,
}

if len(certPath) != 0 && len(keyPath) != 0 {
loadCert := func() (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return nil, errors.Annotate(err, "could not load client key pair")
}
return &cert, nil
}
tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return loadCert()
}
tlsConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
return loadCert()
}
return nil, errors.Trace(err)
}
return tlsConfig, nil
}

// NewTLS constructs a new HTTP client with TLS configured with the CA,
// certificate and key paths.
//
// If the CA path is empty, returns an instance where TLS is disabled.
func NewTLS(caPath, certPath, keyPath, host string) (*TLS, error) {
if len(caPath) == 0 {
if inner == nil {
return &TLS{
inner: nil,
client: &http.Client{},
url: "http://" + host,
}, nil
}
inner, err := ToTLSConfig(caPath, certPath, keyPath)
if err != nil {
return nil, errors.Trace(err)
}

return &TLS{
caPath: caPath,
certPath: certPath,
keyPath: keyPath,
inner: inner,
client: httputil.NewClient(inner),
url: "https://" + host,
caPath: caPath,
certPath: certPath,
keyPath: keyPath,
caBytes: caBytes,
certBytes: certBytes,
keyBytes: keyBytes,
inner: inner,
client: httputil.NewClient(inner),
url: "https://" + host,
}, nil
}

Expand All @@ -129,11 +94,9 @@ func (tc *TLS) WithHost(host string) *TLS {
} else {
url = "http://" + host
}
return &TLS{
inner: tc.inner,
client: tc.client,
url: url,
}
shallowClone := *tc
shallowClone.url = url
return &shallowClone
}

// ToGRPCDialOption constructs a gRPC dial option.
Expand All @@ -156,14 +119,20 @@ func (tc *TLS) GetJSON(ctx context.Context, path string, v interface{}) error {
return GetJSON(ctx, tc.client, tc.url+path, v)
}

// ToPDSecurityOption converts the TLS configuration to a PD security option.
func (tc *TLS) ToPDSecurityOption() pd.SecurityOption {
return pd.SecurityOption{
CAPath: tc.caPath,
CertPath: tc.certPath,
KeyPath: tc.keyPath,
CAPath: tc.caPath,
CertPath: tc.certPath,
KeyPath: tc.keyPath,
SSLCABytes: tc.caBytes,
SSLCertBytes: tc.certBytes,
SSLKEYBytes: tc.keyBytes,
}
}

// ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config.
// TODO: TiKV does not support pass in content.
func (tc *TLS) ToTiKVSecurityConfig() config.Security {
return config.Security{
ClusterSSLCA: tc.caPath,
Expand Down
28 changes: 10 additions & 18 deletions br/pkg/lightning/common/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGetJSONInsecure(t *testing.T) {
u, err := url.Parse(mockServer.URL)
require.NoError(t, err)

tls, err := common.NewTLS("", "", "", u.Host)
tls, err := common.NewTLS("", "", "", u.Host, nil, nil, nil)
require.NoError(t, err)

var result struct{ Path string }
Expand Down Expand Up @@ -73,15 +73,8 @@ func TestGetJSONSecure(t *testing.T) {
func TestInvalidTLS(t *testing.T) {
tempDir := t.TempDir()
caPath := filepath.Join(tempDir, "ca.pem")
_, err := common.NewTLS(caPath, "", "", "localhost")
require.Regexp(t, "could not read ca certificate:.*", err.Error())

err = os.WriteFile(caPath, []byte("invalid ca content"), 0o644)
require.NoError(t, err)
_, err = common.NewTLS(caPath, "", "", "localhost")
require.Regexp(t, "failed to append ca certs", err.Error())

err = os.WriteFile(caPath, []byte(`-----BEGIN CERTIFICATE-----
caContent := []byte(`-----BEGIN CERTIFICATE-----
MIIBITCBxwIUf04/Hucshr7AynmgF8JeuFUEf9EwCgYIKoZIzj0EAwIwEzERMA8G
A1UEAwwIYnJfdGVzdHMwHhcNMjIwNDEzMDcyNDQxWhcNMjIwNDE1MDcyNDQxWjAT
MREwDwYDVQQDDAhicl90ZXN0czBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABL+X
Expand All @@ -90,20 +83,19 @@ wczUg0AbaFFaCI+FAk3K9vbB9JeIORgGKS+F1TKip5tvm96g7S5lq8SgY38SXVc3
ze4ZnCkwJdP2VdpI3WZsoI7zAiEAjP8X1c0iFwYxdAbQAveX+9msVrzyUpZOohi4
RtgQTNI=
-----END CERTIFICATE-----
`), 0o644)
`)
err := os.WriteFile(caPath, caContent, 0o644)
require.NoError(t, err)

certPath := filepath.Join(tempDir, "test.pem")
keyPath := filepath.Join(tempDir, "test.key")
tls, err := common.NewTLS(caPath, certPath, keyPath, "localhost")
_, err = tls.TLSConfig().GetCertificate(nil)
require.Regexp(t, "could not load client key pair: open.*", err.Error())

err = os.WriteFile(certPath, []byte("invalid cert content"), 0o644)
certContent := []byte("invalid cert content")
err = os.WriteFile(certPath, certContent, 0o644)
require.NoError(t, err)
err = os.WriteFile(keyPath, []byte("invalid key content"), 0o600)
keyContent := []byte("invalid key content")
err = os.WriteFile(keyPath, keyContent, 0o600)
require.NoError(t, err)
tls, err = common.NewTLS(caPath, certPath, keyPath, "localhost")
_, err = tls.TLSConfig().GetCertificate(nil)
require.Regexp(t, "could not load client key pair: tls.*", err.Error())
_, err = common.NewTLS(caPath, "", "", "localhost", caContent, certContent, keyContent)
require.ErrorContains(t, err, "tls: failed to find any PEM data in certificate input")
}
31 changes: 27 additions & 4 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"github.com/BurntSushi/toml"
"github.com/docker/go-units"
gomysql "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util"
filter "github.com/pingcap/tidb/util/table-filter"
router "github.com/pingcap/tidb/util/table-router"
"go.uber.org/atomic"
Expand Down Expand Up @@ -155,7 +157,15 @@ func (cfg *Config) String() string {

func (cfg *Config) ToTLS() (*common.TLS, error) {
hostPort := net.JoinHostPort(cfg.TiDB.Host, strconv.Itoa(cfg.TiDB.StatusPort))
return common.NewTLS(cfg.Security.CAPath, cfg.Security.CertPath, cfg.Security.KeyPath, hostPort)
return common.NewTLS(
cfg.Security.CAPath,
cfg.Security.CertPath,
cfg.Security.KeyPath,
hostPort,
cfg.Security.CABytes,
cfg.Security.CertBytes,
cfg.Security.KeyBytes,
)
}

type Lightning struct {
Expand Down Expand Up @@ -558,6 +568,11 @@ type Security struct {
// TLSConfigName is used to set tls config for lightning in DM, so we don't expose this field to user
// DM may running many lightning instances at same time, so we need to set different tls config name for each lightning
TLSConfigName string `toml:"-" json:"-"`

// When DM/engine uses lightning as a library, it can directly pass in the content
CABytes []byte `toml:"-" json:"-"`
CertBytes []byte `toml:"-" json:"-"`
KeyBytes []byte `toml:"-" json:"-"`
}

// RegisterMySQL registers the TLS config with name "cluster" or security.TLSConfigName
Expand All @@ -566,7 +581,13 @@ func (sec *Security) RegisterMySQL() error {
if sec == nil {
return nil
}
tlsConfig, err := common.ToTLSConfig(sec.CAPath, sec.CertPath, sec.KeyPath)

tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(sec.CAPath),
util.WithCertAndKeyPath(sec.CertPath, sec.KeyPath),
util.WithCAContent(sec.CABytes),
util.WithCertAndKeyContent(sec.CertBytes, sec.KeyBytes),
)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1142,9 +1163,11 @@ func (cfg *Config) CheckAndAdjustSecurity() error {

switch cfg.TiDB.TLS {
case "":
if len(cfg.TiDB.Security.CAPath) > 0 {
if len(cfg.TiDB.Security.CAPath) > 0 || len(cfg.TiDB.Security.CABytes) > 0 ||
len(cfg.TiDB.Security.CertPath) > 0 || len(cfg.TiDB.Security.CertBytes) > 0 ||
len(cfg.TiDB.Security.KeyPath) > 0 || len(cfg.TiDB.Security.KeyBytes) > 0 {
if cfg.TiDB.Security.TLSConfigName == "" {
cfg.TiDB.Security.TLSConfigName = "cluster" // adjust this the default value
cfg.TiDB.Security.TLSConfigName = uuid.New().String() // adjust this the default value
}
cfg.TiDB.TLS = cfg.TiDB.Security.TLSConfigName
} else {
Expand Down
Loading

0 comments on commit 532a73a

Please sign in to comment.