Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: fix region cache do not filter the down peers (#17337) #17342

Merged
merged 4 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/danjacques/gofslock v0.0.0-20191023191349-0a45f885bc37
github.com/dgraph-io/ristretto v0.0.1
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/go-sql-driver/mysql v1.4.1
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.4
github.com/golang/snappy v0.0.1
github.com/google/btree v1.0.0
github.com/google/pprof v0.0.0-20190930153522-6ce02741cba3
github.com/google/pprof v0.0.0-20200407044318-7d83b28da2e9
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.3
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
Expand All @@ -22,30 +22,29 @@ require (
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pingcap/br v0.0.0-20200426093517-dd11ae28b885
github.com/pingcap/br v0.0.0-20200521085655-53201addd4ad
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/parser v0.0.0-20200521064712-8dc0fb6ce6f4
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200421113014-507d2bb3a15e+incompatible
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.4.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/shirou/gopsutil v2.19.10+incompatible
github.com/sirupsen/logrus v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/uber-go/atomic v1.3.2
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.2.0
Expand All @@ -55,7 +54,7 @@ require (
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200325203130-f53864d0dba1
google.golang.org/grpc v1.25.1
google.golang.org/grpc v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
Expand Down
189 changes: 134 additions & 55 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,25 +591,25 @@ type RegionFrameRange struct {
func (t *tikvHandlerTool) getRegionsMeta(regionIDs []uint64) ([]RegionMeta, error) {
regions := make([]RegionMeta, len(regionIDs))
for i, regionID := range regionIDs {
meta, leader, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID)
region, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID)
if err != nil {
return nil, errors.Trace(err)
}

failpoint.Inject("errGetRegionByIDEmpty", func(val failpoint.Value) {
if val.(bool) {
meta = nil
region.Meta = nil
}
})

if meta == nil {
if region.Meta == nil {
return nil, errors.Errorf("region not found for regionID %q", regionID)
}
regions[i] = RegionMeta{
ID: regionID,
Leader: leader,
Peers: meta.Peers,
RegionEpoch: meta.RegionEpoch,
Leader: region.Leader,
Peers: region.Meta.Peers,
RegionEpoch: region.Meta.RegionEpoch,
}

}
Expand Down
12 changes: 6 additions & 6 deletions store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
return m.pdc.GetTS(m.ctx)
}

func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
region, peer := c.cluster.GetRegionByKey(key)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
region, peer := c.cluster.GetPrevRegionByKey(key)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
region, peer := c.cluster.GetRegionByID(regionID)
return region, peer, nil
return &pd.Region{Meta: region, Leader: peer}, nil
}

func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand Down
32 changes: 16 additions & 16 deletions store/tikv/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ type codecPDClient struct {

// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
encodedKey := codec.EncodeBytes([]byte(nil), key)
region, peer, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
region, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
encodedKey := codec.EncodeBytes([]byte(nil), key)
region, peer, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
region, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, err)
}

// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
region, peer, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, peer, err)
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
region, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, err)
}

func (c *codecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand All @@ -68,18 +68,18 @@ func (c *codecPDClient) ScanRegions(ctx context.Context, startKey []byte, endKey
return regions, peers, nil
}

func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) {
func processRegionResult(region *pd.Region, err error) (*pd.Region, error) {
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
if region == nil {
return nil, nil, nil
if region == nil || region.Meta == nil {
return nil, nil
}
err = decodeRegionMetaKeyInPlace(region)
err = decodeRegionMetaKeyInPlace(region.Meta)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
return region, peer, nil
return region, nil
}

func decodeRegionMetaKeyInPlace(r *metapb.Region) error {
Expand Down
60 changes: 42 additions & 18 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,29 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
return newestRegion
}

func filterUnavailablePeers(region *pd.Region) {
if len(region.DownPeers) == 0 {
return
}
new := region.Meta.Peers[:0]
for _, p := range region.Meta.Peers {
available := true
for _, downPeer := range region.DownPeers {
if p.Id == downPeer.Id && p.StoreId == downPeer.StoreId {
available = false
break
}
}
if available {
new = append(new, p)
}
}
for i := len(new); i < len(region.Meta.Peers); i++ {
region.Meta.Peers[i] = nil
}
region.Meta.Peers = new
}

// loadRegion loads region from pd client, and picks the first peer as leader.
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
// when processing in reverse order.
Expand All @@ -863,13 +886,12 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
return nil, errors.Trace(err)
}
}
var meta *metapb.Region
var leader *metapb.Peer
var reg *pd.Region
var err error
if searchPrev {
meta, leader, err = c.pdClient.GetPrevRegion(bo.ctx, key)
reg, err = c.pdClient.GetPrevRegion(bo.ctx, key)
} else {
meta, leader, err = c.pdClient.GetRegion(bo.ctx, key)
reg, err = c.pdClient.GetRegion(bo.ctx, key)
}
if err != nil {
tikvRegionCacheCounterWithGetRegionError.Inc()
Expand All @@ -880,21 +902,22 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err)
continue
}
if meta == nil {
if reg == nil || reg.Meta == nil {
backoffErr = errors.Errorf("region not found for key %q", key)
continue
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
if isEndKey && !searchPrev && bytes.Equal(meta.StartKey, key) && len(meta.StartKey) != 0 {
if isEndKey && !searchPrev && bytes.Equal(reg.Meta.StartKey, key) && len(reg.Meta.StartKey) != 0 {
searchPrev = true
continue
}
region := &Region{meta: meta}
region := &Region{meta: reg.Meta}
region.init(c)
if leader != nil {
c.switchToPeer(region, leader.StoreId)
if reg.Leader != nil {
c.switchToPeer(region, reg.Leader.StoreId)
}
return region, nil
}
Expand All @@ -910,7 +933,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
return nil, errors.Trace(err)
}
}
meta, leader, err := c.pdClient.GetRegionByID(bo.ctx, regionID)
reg, err := c.pdClient.GetRegionByID(bo.ctx, regionID)
if err != nil {
tikvRegionCacheCounterWithGetRegionByIDError.Inc()
} else {
Expand All @@ -920,16 +943,17 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err)
continue
}
if meta == nil {
if reg == nil || reg.Meta == nil {
return nil, errors.Errorf("region not found for regionID %d", regionID)
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
filterUnavailablePeers(reg)
if len(reg.Meta.Peers) == 0 {
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: meta}
region := &Region{meta: reg.Meta}
region.init(c)
if leader != nil {
c.switchToPeer(region, leader.GetStoreId())
if reg.Leader != nil {
c.switchToPeer(region, reg.Leader.GetStoreId())
}
return region, nil
}
Expand Down
12 changes: 6 additions & 6 deletions store/tikv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,32 +139,32 @@ func (c *mockPDClient) GetTSAsync(ctx context.Context) pd.TSFuture {
return nil
}

func (c *mockPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetRegion(ctx context.Context, key []byte) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetRegion(ctx, key)
}

func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetPrevRegion(ctx context.Context, key []byte) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetPrevRegion(ctx, key)
}

func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *mockPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*pd.Region, error) {
c.RLock()
defer c.RUnlock()

if c.stop {
return nil, nil, errors.Trace(errStopped)
return nil, errors.Trace(errStopped)
}
return c.client.GetRegionByID(ctx, regionID)
}
Expand Down