Skip to content

Commit

Permalink
kv/client: support reconnect regions when no event received too long (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 29, 2021
1 parent 62962ce commit 8158b03
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 12 deletions.
54 changes: 47 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region any more.
regionScheduleReload = false

// time interval to force kv client to terminate gRPC stream and reconnect
reconnectInterval = 15 * time.Minute
)

// hard code switch
// true: use kv client v2, which has a region worker for each stream
// false: use kv client v1, which runs a goroutine for every single region
var enableKVClientV2 = true
var enableKVClientV2 = false

type singleRegionInfo struct {
verID tikv.RegionVerID
Expand All @@ -94,7 +97,9 @@ var (
var (
// unreachable error, only used in unit test
errUnreachable = errors.New("kv client unreachable error")
logPanic = log.Panic
// internal error, force the gPRC stream terminate and reconnect
errReconnect = errors.New("internal error, reconnect all regions")
logPanic = log.Panic
)

func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, ts uint64, rpcCtx *tikv.RPCContext) singleRegionInfo {
Expand Down Expand Up @@ -499,8 +504,9 @@ type eventFeedSession struct {
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

streams map[string]cdcpb.ChangeData_EventFeedClient
streamsLock sync.RWMutex
streams map[string]cdcpb.ChangeData_EventFeedClient
streamsLock sync.RWMutex
streamsCanceller map[string]context.CancelFunc

workers map[string]*regionWorker
workersLock sync.RWMutex
Expand Down Expand Up @@ -542,6 +548,7 @@ func newEventFeedSession(
errChSizeGauge: clientChannelSize.WithLabelValues(id, "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(id, "range"),
streams: make(map[string]cdcpb.ChangeData_EventFeedClient),
streamsCanceller: make(map[string]context.CancelFunc),
workers: make(map[string]*regionWorker),
}
}
Expand Down Expand Up @@ -766,7 +773,9 @@ MainLoop:
zap.Uint64("requestID", requestID),
zap.Uint64("storeID", storeID),
zap.String("addr", rpcCtx.Addr))
stream, err = s.client.newStream(ctx, rpcCtx.Addr, storeID)
streamCtx, streamCancel := context.WithCancel(ctx)
_ = streamCancel // to avoid possible context leak warning from govet
stream, err = s.client.newStream(streamCtx, rpcCtx.Addr, storeID)
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
log.Warn("get grpc stream client failed",
Expand All @@ -786,7 +795,7 @@ MainLoop:
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
continue
}
s.addStream(rpcCtx.Addr, stream)
s.addStream(rpcCtx.Addr, stream, streamCancel)

limiter := s.client.getRegionLimiter(regionID)
g.Go(func() error {
Expand Down Expand Up @@ -888,6 +897,20 @@ func (s *eventFeedSession) partialRegionFeed(
return nil
}

if errors.Cause(err) == errReconnect {
cancel, ok := s.getStreamCancel(state.sri.rpcCtx.Addr)
if ok {
// cancel the stream to trigger strem.Recv with context cancel error
// Note use context cancel is the only way to terminate a gRPC stream
cancel()
// Failover in stream.Recv has 0-100ms delay, the onRegionFail
// should be called after stream has been deleted. Add a delay here
// to avoid too frequent region rebuilt.
time.Sleep(time.Second)
}
// if stream is already deleted, just ignore errReconnect
}

failpoint.Inject("kvClientErrUnreachable", func() {
if err == errUnreachable {
failpoint.Return(err)
Expand Down Expand Up @@ -1370,6 +1393,10 @@ func (s *eventFeedSession) singleEventFeed(
case <-ctx.Done():
return lastResolvedTs, ctx.Err()
case <-advanceCheckTicker.C:
failpoint.Inject("kvClientForceReconnect", func() {
log.Warn("kv client reconnect triggered by failpoint")
failpoint.Return(lastResolvedTs, errReconnect)
})
if time.Since(startFeedTime) < resolveLockInterval {
continue
}
Expand All @@ -1382,6 +1409,10 @@ func (s *eventFeedSession) singleEventFeed(
log.Warn("region not receiving event from tikv for too long time",
zap.Uint64("regionID", regionID), zap.Stringer("span", span), zap.Duration("duration", sinceLastEvent))
}
if sinceLastEvent > reconnectInterval {
log.Warn("kv client reconnect triggered", zap.Duration("duration", sinceLastEvent))
return lastResolvedTs, errReconnect
}
version, err := s.kvStorage.(*StorageWithCurVersionCache).GetCachedCurrentVersion()
if err != nil {
log.Warn("failed to get current version from PD", zap.Error(err))
Expand Down Expand Up @@ -1524,16 +1555,18 @@ func (s *eventFeedSession) singleEventFeed(
}
}

func (s *eventFeedSession) addStream(storeAddr string, stream cdcpb.ChangeData_EventFeedClient) {
func (s *eventFeedSession) addStream(storeAddr string, stream cdcpb.ChangeData_EventFeedClient, cancel context.CancelFunc) {
s.streamsLock.Lock()
defer s.streamsLock.Unlock()
s.streams[storeAddr] = stream
s.streamsCanceller[storeAddr] = cancel
}

func (s *eventFeedSession) deleteStream(storeAddr string) {
s.streamsLock.Lock()
defer s.streamsLock.Unlock()
delete(s.streams, storeAddr)
delete(s.streamsCanceller, storeAddr)
}

func (s *eventFeedSession) getStream(storeAddr string) (stream cdcpb.ChangeData_EventFeedClient, ok bool) {
Expand All @@ -1543,6 +1576,13 @@ func (s *eventFeedSession) getStream(storeAddr string) (stream cdcpb.ChangeData_
return
}

func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.CancelFunc, ok bool) {
s.streamsLock.RLock()
defer s.streamsLock.RUnlock()
cancel, ok = s.streamsCanceller[storeAddr]
return
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (*model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
191 changes: 190 additions & 1 deletion cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func newMockServiceSpecificAddr(

// waitRequestID waits request ID larger than the given allocated ID
func waitRequestID(c *check.C, allocatedID uint64) {
err := retry.Run(time.Millisecond*20, 10, func() error {
err := retry.Run(time.Millisecond*10, 20, func() error {
if currentRequestID() > allocatedID {
return nil
}
Expand Down Expand Up @@ -1324,8 +1324,19 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

var requestID uint64
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
atomic.StoreUint64(&requestID, req.RequestId)
}
}
server1, addr1 := newMockService(ctx, c, srv1, wg)

defer func() {
Expand Down Expand Up @@ -1364,6 +1375,14 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {

// wait request id allocated with: new session, new request
waitRequestID(c, baseAllocatedID+1)
err = retry.Run(time.Millisecond*50, 10, func() error {
if atomic.LoadUint64(&requestID) == currentRequestID() {
return nil
}
return errors.Errorf("request is not received, requestID: %d, expected: %d",
atomic.LoadUint64(&requestID), currentRequestID())
})
c.Assert(err, check.IsNil)
initialized1 := mockInitializedEvent(regionID, currentRequestID())
ch1 <- initialized1
err = retry.Run(time.Millisecond*200, 10, func() error {
Expand Down Expand Up @@ -1407,6 +1426,14 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {

// wait request id allocated with: new session, new request*2
waitRequestID(c, baseAllocatedID+2)
err = retry.Run(time.Millisecond*50, 10, func() error {
if atomic.LoadUint64(&requestID) == currentRequestID() {
return nil
}
return errors.Errorf("request is not received, requestID: %d, expected: %d",
atomic.LoadUint64(&requestID), currentRequestID())
})
c.Assert(err, check.IsNil)
initialized2 := mockInitializedEvent(regionID, currentRequestID())
ch1 <- initialized2
err = retry.Run(time.Millisecond*200, 10, func() error {
Expand Down Expand Up @@ -2709,3 +2736,165 @@ func (s *etcdSuite) TestClientErrNoPendingRegion(c *check.C) {
server1.Stop()
wg.Wait()
}

// TestKVClientForceReconnect force reconnect gRPC stream can work
func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
defer func() {
close(ch1)
server1.Stop()
server1Stopped <- struct{}{}
}()
for {
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
break
}
}
}

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
waitRequestID(c, baseAllocatedID+1)
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized

<-server1Stopped

var requestIds sync.Map
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv2 := newMockChangeDataService(c, ch2)
srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
// Reuse the same listen addresss as server 1 to simulate TiKV handles the
// gRPC stream terminate and reconnect.
server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()

// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

initialized = mockInitializedEvent(regionID3, requestID.(uint64))
ch2 <- initialized

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID3,
RequestId: requestID.(uint64),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135},
},
}}
ch2 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
},
}

for _, expectedEv := range expected {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
}

cancel()
}

func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

clientv2 := enableKVClientV2
defer func() {
enableKVClientV2 = clientv2
}()

// test kv client v1
enableKVClientV2 = false
s.testKVClientForceReconnect(c)

enableKVClientV2 = true
s.testKVClientForceReconnect(c)
}
2 changes: 1 addition & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *eventFeedSession) receiveFromStreamV2(

// always create a new region worker, because `receiveFromStreamV2` is ensured
// to call exactly once from outter code logic
worker := newRegionWorker(s, limiter)
worker := newRegionWorker(s, limiter, addr)
s.workersLock.Lock()
s.workers[addr] = worker
s.workersLock.Unlock()
Expand Down
Loading

0 comments on commit 8158b03

Please sign in to comment.