Skip to content

Commit

Permalink
tikv: fix region cache do not filter the down peers (#17337) (#17342)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored May 25, 2020
1 parent f9385f2 commit 49f2853
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 120 deletions.
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/danjacques/gofslock v0.0.0-20191023191349-0a45f885bc37
github.com/dgraph-io/ristretto v0.0.1
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/go-sql-driver/mysql v1.4.1
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.4
github.com/golang/snappy v0.0.1
github.com/google/btree v1.0.0
github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3
github.com/google/pprof v0.0.0-20200407044318-7d83b28da2e9
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.3
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
Expand All @@ -22,30 +22,29 @@ require (
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pingcap/br v0.0.0-20200426093517-dd11ae28b885
github.com/pingcap/br v0.0.0-20200521085655-53201addd4ad
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200521064712-8dc0fb6ce6f4
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.4.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/shirou/gopsutil v2.19.10+incompatible
github.com/sirupsen/logrus v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/uber-go/atomic v1.3.2
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.2.0
Expand All @@ -55,7 +54,7 @@ require (
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200325203130-f53864d0dba1
google.golang.org/grpc v1.25.1
google.golang.org/grpc v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
Expand Down
189 changes: 134 additions & 55 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,25 +591,25 @@ type RegionFrameRange struct {
func (t *tikvHandlerTool) getRegionsMeta(regionIDs []uint64) ([]RegionMeta, error) {
regions := make([]RegionMeta, len(regionIDs))
for i, regionID := range regionIDs {
meta, leader, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID)
region, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID)
if err != nil {
return nil, errors.Trace(err)
}

failpoint.Inject("errGetRegionByIDEmpty", func(val failpoint.Value) {
if val.(bool) {
meta = nil
region.Meta = nil
}
})

if meta == nil {
if region.Meta == nil {
return nil, errors.Errorf("region not found for regionID %q", regionID)
}
regions[i] = RegionMeta{
ID: regionID,
Leader: leader,
Peers: meta.Peers,
RegionEpoch: meta.RegionEpoch,
Leader: region.Leader,
Peers: region.Meta.Peers,
RegionEpoch: region.Meta.RegionEpoch,
}

}
Expand Down
12 changes: 6 additions & 6 deletions store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
return m.pdc.GetTS(m.ctx)
}

func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
region, peer := c.cluster.GetRegionByKey(key)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
region, peer := c.cluster.GetPrevRegionByKey(key)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
region, peer := c.cluster.GetRegionByID(regionID)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand Down
32 changes: 16 additions & 16 deletions store/tikv/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ type codecPDClient struct {

// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
encodedKey := codec.EncodeBytes([]byte(nil), key)
region, peer, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
region, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
encodedKey := codec.EncodeBytes([]byte(nil), key)
region, peer, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
region, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
region, peer, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, peer, err)
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
region, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, err)
}

func (c *codecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand All @@ -68,18 +68,18 @@ func (c *codecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey
return regions, peers, nil
}

func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) {
func processRegionResult(region *pd.Region, err error) (*pd.Region, error) {
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
if region == nil {
return nil, nil, nil
if region == nil || region.Meta == nil {
return nil, nil
}
err = decodeRegionMetaKeyInPlace(region)
err = decodeRegionMetaKeyInPlace(region.Meta)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
return region, peer, nil
return region, nil
}

func decodeRegionMetaKeyInPlace(r *metapb.Region) error {
Expand Down
60 changes: 42 additions & 18 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,29 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
return newestRegion
}

func filterUnavailablePeers(region *pd.Region) {
if len(region.DownPeers) == 0 {
return
}
new := region.Meta.Peers[:0]
for _, p := range region.Meta.Peers {
available := true
for _, downPeer := range region.DownPeers {
if p.Id == downPeer.Id && p.StoreId == downPeer.StoreId {
available = false
break
}
}
if available {
new = append(new, p)
}
}
for i := len(new); i < len(region.Meta.Peers); i++ {
region.Meta.Peers[i] = nil
}
region.Meta.Peers = new
}

// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
Expand All @@ -863,13 +886,12 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
return nil, errors.Trace(err)
}
}
var meta *metapb.Region
var leader *metapb.Peer
var reg *pd.Region
var err error
if searchPrev {
meta, leader, err = c.pdClient.GetPrevRegion(bo.ctx, key)
reg, err = c.pdClient.GetPrevRegion(bo.ctx, key)
} else {
meta, leader, err = c.pdClient.GetRegion(bo.ctx, key)
reg, err = c.pdClient.GetRegion(bo.ctx, key)
}
if err != nil {
tikvRegionCacheCounterWithGetRegionError.Inc()
Expand All @@ -880,21 +902,22 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err)
continue
}
if meta == nil {
if reg == nil || reg.Meta == nil {
backoffErr = errors.Errorf("region not found for key %q", key)
continue
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
if isEndKey && !searchPrev && bytes.Equal(meta.StartKey, key) && len(meta.StartKey) != 0 {
if isEndKey && !searchPrev && bytes.Equal(reg.Meta.StartKey, key) && len(reg.Meta.StartKey) != 0 {
searchPrev = true
continue
}
region := &Region{meta: meta}
region := &Region{meta: reg.Meta}
region.init(c)
if leader != nil {
c.switchToPeer(region, leader.StoreId)
if reg.Leader != nil {
c.switchToPeer(region, reg.Leader.StoreId)
}
return region, nil
}
Expand All @@ -910,7 +933,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
return nil, errors.Trace(err)
}
}
meta, leader, err := c.pdClient.GetRegionByID(bo.ctx, regionID)
reg, err := c.pdClient.GetRegionByID(bo.ctx, regionID)
if err != nil {
tikvRegionCacheCounterWithGetRegionByIDError.Inc()
} else {
Expand All @@ -920,16 +943,17 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err)
continue
}
if meta == nil {
if reg == nil || reg.Meta == nil {
return nil, errors.Errorf("region not found for regionID %d", regionID)
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: meta}
region := &Region{meta: reg.Meta}
region.init(c)
if leader != nil {
c.switchToPeer(region, leader.GetStoreId())
if reg.Leader != nil {
c.switchToPeer(region, reg.Leader.GetStoreId())
}
return region, nil
}
Expand Down
12 changes: 6 additions & 6 deletions store/tikv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,32 +139,32 @@ func (c *mockPDClient) GetTSAsync(ctx context.Context) pd.TSFuture {
return nil
}

func (c *mockPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetRegion(ctx, key)
}

func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetPrevRegion(ctx, key)
}

func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetRegionByID(ctx, regionID)
}
Expand Down

0 comments on commit 49f2853

Please sign in to comment.