From 67549be8b94e2465949de0a88ab07d0abb75abd0 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Sat, 27 Apr 2019 22:53:48 -0500 Subject: [PATCH] server: fix cannot send error in heartbeat stream (#1521) Signed-off-by: nolouch --- server/cluster_test.go | 7 ++----- server/grpc_service.go | 13 ++++++------- server/heartbeat_stream_test.go | 22 ++++++++++++++++++++-- server/heartbeat_streams.go | 4 +++- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/server/cluster_test.go b/server/cluster_test.go index 5fc65bcebbb..1a20705cc7c 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -535,16 +535,13 @@ func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) { } go func(isReciver bool) { if isReciver { - resp, err := stream.Recv() + _, err := stream.Recv() c.Assert(err, IsNil) - c.Assert(resp.Header.GetError(), IsNil) - fmt.Println("get resp:", resp) wg.Done() } for { - resp, err := stream.Recv() + _, err := stream.Recv() c.Assert(err, IsNil) - c.Assert(resp.Header.GetError(), IsNil) } }(i == 0) } diff --git a/server/grpc_service.go b/server/grpc_service.go index 4e0d0f446d7..ba78d697e2f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -371,21 +371,20 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { } region := core.RegionFromHeartbeat(request) - if region.GetID() == 0 { - msg := fmt.Sprintf("invalid request region, %v", request) - hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel) + if region.GetLeader() == nil { + log.Error("invalid request, the leader is nil", zap.Reflect("reqeust", request)) continue } - if region.GetLeader() == nil { - msg := fmt.Sprintf("invalid request leader, %v", request) - hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel) + if region.GetID() == 0 { + msg := fmt.Sprintf("invalid request region, %v", request) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader(), storeAddress, storeLabel) continue } err = cluster.HandleRegionHeartbeat(region) if err != nil { msg := err.Error() - hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress, storeLabel) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader(), storeAddress, storeLabel) } regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index 8979ee7ad54..a41b603085a 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -60,10 +60,19 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { defer stream1.close() defer stream2.close() checkActiveStream := func() int { + // 1 means stream1 got a valid response + // 2 means stream2 got a valid response + // 3 means got an invalid response select { - case <-stream1.respCh: + case resp := <-stream1.respCh: + if resp.GetHeader().GetError() != nil { + return 3 + } return 1 - case <-stream2.respCh: + case resp := <-stream2.respCh: + if resp.GetHeader().GetError() != nil { + return 3 + } return 2 case <-time.After(time.Second): return 0 @@ -74,12 +83,21 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { Leader: s.region.Peers[0], Region: s.region, } + invalidRegion := &metapb.Region{Id: 0} + invalidReq := &pdpb.RegionHeartbeatRequest{ + Header: newRequestHeader(s.svr.clusterID), + Leader: s.region.Peers[0], + Region: invalidRegion, + } // Active stream is stream1. c.Assert(stream1.stream.Send(req), IsNil) c.Assert(checkActiveStream(), Equals, 1) // Rebind to stream2. c.Assert(stream2.stream.Send(req), IsNil) c.Assert(checkActiveStream(), Equals, 2) + // SendErr to stream2. + c.Assert(stream2.stream.Send(invalidReq), IsNil) + c.Assert(checkActiveStream(), Equals, 3) // Rebind to stream1 if no more heartbeats sent through stream2. testutil.WaitUntil(c, func(c *C) bool { c.Assert(stream1.stream.Send(req), IsNil) diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index 9a822053d10..3161f6dacec 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" @@ -164,7 +165,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear } } -func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string, storeLabel string) { +func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, targetPeer *metapb.Peer, storeAddress, storeLabel string) { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc() msg := &pdpb.RegionHeartbeatResponse{ @@ -175,6 +176,7 @@ func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeA Message: errMsg, }, }, + TargetPeer: targetPeer, } select {