Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer: support TLS for region syncer #1728

Merged
merged 4 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ package syncer

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/url"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -56,7 +61,39 @@ func (s *RegionSyncer) establish(addr string) (ClientStream, error) {
return nil, err
}

cc, err := grpc.Dial(u.Host, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)))
opt := grpc.WithInsecure()
if len(s.securityConfig.CAPath) != 0 {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
certificates := []tls.Certificate{}
if len(s.securityConfig.CertPath) != 0 && len(s.securityConfig.KeyPath) != 0 {
// Load the client certificates from disk
certificate, e := tls.LoadX509KeyPair(s.securityConfig.CertPath, s.securityConfig.KeyPath)
if e != nil {
return nil, errors.Errorf("could not load client key pair: %s", e)
}
certificates = append(certificates, certificate)
}

// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
ca, e := ioutil.ReadFile(s.securityConfig.CAPath)
if e != nil {
return nil, errors.Errorf("could not read ca certificate: %s", e)
}

// Append the certificates from the CA
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append ca certs")
}

creds := credentials.NewTLS(&tls.Config{
Certificates: certificates,
RootCAs: certPool,
})

opt = grpc.WithTransportCredentials(creds)
}

cc, err := grpc.Dial(u.Host, opt, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)))
if err != nil {
return nil, err
}
Expand Down
30 changes: 17 additions & 13 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/server/config"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -59,19 +60,21 @@ type Server interface {
GetStorage() *core.Storage
Name() string
GetMetaRegions() []*metapb.Region
GetSecurityConfig() *config.SecurityConfig
}

// RegionSyncer is used to sync the region information without raft.
type RegionSyncer struct {
sync.RWMutex
streams map[string]ServerStream
ctx context.Context
cancel context.CancelFunc
server Server
closed chan struct{}
wg sync.WaitGroup
history *historyBuffer
limit *ratelimit.Bucket
streams map[string]ServerStream
ctx context.Context
cancel context.CancelFunc
server Server
closed chan struct{}
wg sync.WaitGroup
history *historyBuffer
limit *ratelimit.Bucket
securityConfig *config.SecurityConfig
}

// NewRegionSyncer returns a region syncer.
Expand All @@ -81,11 +84,12 @@ type RegionSyncer struct {
// no longer etcd but go-leveldb.
func NewRegionSyncer(s Server) *RegionSyncer {
return &RegionSyncer{
streams: make(map[string]ServerStream),
server: s,
closed: make(chan struct{}),
history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionStorage()),
limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity),
streams: make(map[string]ServerStream),
server: s,
closed: make(chan struct{}),
history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionStorage()),
limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity),
securityConfig: s.GetSecurityConfig(),
}
}

Expand Down