Skip to content

Commit

Permalink
server: fix cannot send error in heartbeat stream (tikv#1521)
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and rleungx committed Apr 28, 2019
1 parent dcbb706 commit 67549be
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
7 changes: 2 additions & 5 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,13 @@ func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) {
}
go func(isReciver bool) {
if isReciver {
resp, err := stream.Recv()
_, err := stream.Recv()
c.Assert(err, IsNil)
c.Assert(resp.Header.GetError(), IsNil)
fmt.Println("get resp:", resp)
wg.Done()
}
for {
resp, err := stream.Recv()
_, err := stream.Recv()
c.Assert(err, IsNil)
c.Assert(resp.Header.GetError(), IsNil)
}
}(i == 0)
}
Expand Down
13 changes: 6 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,21 +371,20 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
}

region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("reqeust", request))
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader(), storeAddress, storeLabel)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader(), storeAddress, storeLabel)
}

regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
Expand Down
22 changes: 20 additions & 2 deletions server/heartbeat_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,19 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
defer stream1.close()
defer stream2.close()
checkActiveStream := func() int {
// 1 means stream1 got a valid response
// 2 means stream2 got a valid response
// 3 means got an invalid response
select {
case <-stream1.respCh:
case resp := <-stream1.respCh:
if resp.GetHeader().GetError() != nil {
return 3
}
return 1
case <-stream2.respCh:
case resp := <-stream2.respCh:
if resp.GetHeader().GetError() != nil {
return 3
}
return 2
case <-time.After(time.Second):
return 0
Expand All @@ -74,12 +83,21 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
Leader: s.region.Peers[0],
Region: s.region,
}
invalidRegion := &metapb.Region{Id: 0}
invalidReq := &pdpb.RegionHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Leader: s.region.Peers[0],
Region: invalidRegion,
}
// Active stream is stream1.
c.Assert(stream1.stream.Send(req), IsNil)
c.Assert(checkActiveStream(), Equals, 1)
// Rebind to stream2.
c.Assert(stream2.stream.Send(req), IsNil)
c.Assert(checkActiveStream(), Equals, 2)
// SendErr to stream2.
c.Assert(stream2.stream.Send(invalidReq), IsNil)
c.Assert(checkActiveStream(), Equals, 3)
// Rebind to stream1 if no more heartbeats sent through stream2.
testutil.WaitUntil(c, func(c *C) bool {
c.Assert(stream1.stream.Send(req), IsNil)
Expand Down
4 changes: 3 additions & 1 deletion server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/logutil"
Expand Down Expand Up @@ -164,7 +165,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string, storeLabel string) {
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, targetPeer *metapb.Peer, storeAddress, storeLabel string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Expand All @@ -175,6 +176,7 @@ func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeA
Message: errMsg,
},
},
TargetPeer: targetPeer,
}

select {
Expand Down

0 comments on commit 67549be

Please sign in to comment.