Skip to content

Commit

Permalink
Merge pull request #1929 from nolouch/pick-3.1
Browse files Browse the repository at this point in the history
*: Fix the issue that some picks failed to pick to 3.1
  • Loading branch information
nolouch authored Nov 12, 2019
2 parents 0e3a054 + 0aed354 commit d14bd3d
Show file tree
Hide file tree
Showing 37 changed files with 591 additions and 404 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ tags
/.retools/
vendor
default*
*.bak
.vscode/
46 changes: 3 additions & 43 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ package pd

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/url"
"strings"
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/grpcutil"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// Client is a PD (Placement Driver) client.
Expand Down Expand Up @@ -272,43 +268,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return conn, nil
}

opt := grpc.WithInsecure()
if len(c.security.CAPath) != 0 {

certificates := []tls.Certificate{}
if len(c.security.CertPath) != 0 && len(c.security.KeyPath) != 0 {
// Load the client certificates from disk
certificate, err := tls.LoadX509KeyPair(c.security.CertPath, c.security.KeyPath)
if err != nil {
return nil, errors.Errorf("could not load client key pair: %s", err)
}
certificates = append(certificates, certificate)
}

// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(c.security.CAPath)
if err != nil {
return nil, errors.Errorf("could not read ca certificate: %s", err)
}

// Append the certificates from the CA
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append ca certs")
}

creds := credentials.NewTLS(&tls.Config{
Certificates: certificates,
RootCAs: certPool,
})

opt = grpc.WithTransportCredentials(creds)
}
u, err := url.Parse(addr)
if err != nil {
return nil, errors.WithStack(err)
}
cc, err := grpc.Dial(u.Host, opt)
cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
326 changes: 165 additions & 161 deletions docs/api.html

Large diffs are not rendered by default.

69 changes: 69 additions & 0 deletions pkg/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcutil

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/url"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// GetClientConn returns a gRPC client connection.
func GetClientConn(addr string, caPath string, certPath string, keyPath string) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure()
if len(caPath) != 0 {
certificates := []tls.Certificate{}
if len(certPath) != 0 && len(keyPath) != 0 {
// Load the client certificates from disk
certificate, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return nil, errors.Errorf("could not load client key pair: %s", err)
}
certificates = append(certificates, certificate)
}

// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(caPath)
if err != nil {
return nil, errors.Errorf("could not read ca certificate: %s", err)
}

// Append the certificates from the CA
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append ca certs")
}

creds := credentials.NewTLS(&tls.Config{
Certificates: certificates,
RootCAs: certPool,
})

opt = grpc.WithTransportCredentials(creds)
}
u, err := url.Parse(addr)
if err != nil {
return nil, errors.WithStack(err)
}
cc, err := grpc.Dial(u.Host, opt)
if err != nil {
return nil, errors.WithStack(err)
}
return cc, nil
}
2 changes: 1 addition & 1 deletion server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ types:
uriParameters:
filter:
type: string
enum: [ miss-peer, extra-peer, pending-peer, down-peer, incorrect-ns ]
enum: [ miss-peer, extra-peer, pending-peer, down-peer, incorrect-ns, offline-peer, empty-region ]
get:
description: List regions with unhealthy status.
responses:
Expand Down
19 changes: 18 additions & 1 deletion server/api/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package api

import (
"crypto/tls"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server"
Expand All @@ -33,6 +35,8 @@ const (
errRedirectToNotLeader = "redirect to not leader"
)

var initHTTPClientOnce sync.Once

type redirector struct {
s *server.Server
}
Expand Down Expand Up @@ -67,7 +71,20 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

initHTTPClientOnce.Do(func() {
var tlsConfig *tls.Config
tlsConfig, err = server.ToTLSConfig(h.s.GetSecurityConfig())
dialClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
},
}
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newCustomReverseProxies(urls).ServeHTTP(w, r)
}

Expand Down
22 changes: 22 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,28 @@ func (h *regionsHandler) GetIncorrectNamespaceRegions(w http.ResponseWriter, r *
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) GetOfflinePeer(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
regions, err := handler.GetOfflinePeer()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) GetEmptyRegion(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
regions, err := handler.GetEmptyRegion()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
Expand Down
14 changes: 14 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ func (s *testRegionSuite) TestRegionCheck(c *C) {
err = readJSONWithURL(url, r3)
c.Assert(err, IsNil)
c.Assert(r3, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}})

url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "offline-peer")
r4 := &RegionsInfo{}
err = readJSONWithURL(url, r4)
c.Assert(err, IsNil)
c.Assert(r4, DeepEquals, &RegionsInfo{Count: 0, Regions: []*RegionInfo{}})

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(c, s.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "empty-region")
r5 := &RegionsInfo{}
err = readJSONWithURL(url, r5)
c.Assert(err, IsNil)
c.Assert(r5, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}})
}

func (s *testRegionSuite) TestRegions(c *C) {
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/regions/check/extra-peer", regionsHandler.GetExtraPeerRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/check/pending-peer", regionsHandler.GetPendingPeerRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/check/down-peer", regionsHandler.GetDownPeerRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/check/offline-peer", regionsHandler.GetOfflinePeer).Methods("GET")
router.HandleFunc("/api/v1/regions/check/empty-region", regionsHandler.GetEmptyRegion).Methods("GET")
router.HandleFunc("/api/v1/regions/sibling/{id}", regionsHandler.GetRegionSiblings).Methods("GET")
router.HandleFunc("/api/v1/regions/check/incorrect-ns", regionsHandler.GetIncorrectNamespaceRegions).Methods("GET")

Expand Down
18 changes: 13 additions & 5 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,15 +776,23 @@ type SecurityConfig struct {
KeyPath string `toml:"key-path" json:"key-path"`
}

// ConvertToMap is used to convert SecurityConfig to a map.
func (s *SecurityConfig) ConvertToMap() map[string]string {
return map[string]string{
"caPath": s.CAPath,
"certPath": s.CertPath,
"keyPath": s.KeyPath}
}

// ToTLSConfig generatres tls config.
func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) {
if len(s.CertPath) == 0 && len(s.KeyPath) == 0 {
func ToTLSConfig(config map[string]string) (*tls.Config, error) {
if len(config["certPath"]) == 0 && len(config["keyPath"]) == 0 {
return nil, nil
}
tlsInfo := transport.TLSInfo{
CertFile: s.CertPath,
KeyFile: s.KeyPath,
TrustedCAFile: s.CAPath,
CertFile: config["certPath"],
KeyFile: config["keyPath"],
TrustedCAFile: config["caPath"],
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type testConfigSuite struct{}

func (s *testConfigSuite) TestTLS(c *C) {
cfg := NewConfig()
tls, err := cfg.Security.ToTLSConfig()
tls, err := ToTLSConfig(cfg.Security.ConvertToMap())
c.Assert(err, IsNil)
c.Assert(tls, IsNil)
}
Expand Down
17 changes: 13 additions & 4 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)
Expand All @@ -26,28 +27,36 @@ type StoreCreateOption func(region *StoreInfo)
// SetStoreAddress sets the address for the store.
func SetStoreAddress(address string) StoreCreateOption {
return func(store *StoreInfo) {
store.meta.Address = address
meta := proto.Clone(store.meta).(*metapb.Store)
meta.Address = address
store.meta = meta
}
}

// SetStoreLabels sets the labels for the store.
func SetStoreLabels(labels []*metapb.StoreLabel) StoreCreateOption {
return func(store *StoreInfo) {
store.meta.Labels = labels
meta := proto.Clone(store.meta).(*metapb.Store)
meta.Labels = labels
store.meta = meta
}
}

// SetStoreVersion sets the version for the store.
func SetStoreVersion(version string) StoreCreateOption {
return func(store *StoreInfo) {
store.meta.Version = version
meta := proto.Clone(store.meta).(*metapb.Store)
meta.Version = version
store.meta = meta
}
}

// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState) StoreCreateOption {
return func(store *StoreInfo) {
store.meta.State = state
meta := proto.Clone(store.meta).(*metapb.Store)
meta.State = state
store.meta = meta
}
}

Expand Down
18 changes: 18 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,24 @@ func (h *Handler) GetIncorrectNamespaceRegions() ([]*core.RegionInfo, error) {
return c.cachedCluster.GetRegionStatsByType(statistics.IncorrectNamespace), nil
}

// GetOfflinePeer gets the region with offline peer.
func (h *Handler) GetOfflinePeer() ([]*core.RegionInfo, error) {
c := h.s.GetRaftCluster()
if c == nil {
return nil, ErrNotBootstrapped
}
return c.cachedCluster.GetRegionStatsByType(statistics.OfflinePeer), nil
}

// GetEmptyRegion gets the region with empty size.
func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {
c := h.s.GetRaftCluster()
if c == nil {
return nil, ErrNotBootstrapped
}
return c.cachedCluster.GetRegionStatsByType(statistics.EmptyRegion), nil
}

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoServer := h.s
Expand Down
2 changes: 1 addition & 1 deletion server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func PrepareJoinCluster(cfg *Config) error {
}

// Below are cases without data directory.
tlsConfig, err := cfg.Security.ToTLSConfig()
tlsConfig, err := ToTLSConfig(cfg.Security.ConvertToMap())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d14bd3d

Please sign in to comment.