diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 745b7d28224..b5a208ea765 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/security" @@ -1138,3 +1139,164 @@ func (s *etcdSuite) TestConnArray(c *check.C) { ca.Close() } + +// TestPendingRegionError tests kv client should return an error when receiving +// a new subscription (the first event of specific region) but the corresponding +// region is not found in pending regions. +func (s *etcdSuite) TestNoPendingRegionError(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + cluster := mocktikv.NewCluster() + mvccStore := mocktikv.MustNewMVCCStore() + rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + defer kvStorage.Close() //nolint:errcheck + + cluster.AddStore(1, addr1) + cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) + + baseAllocatedID := currentRequestID() + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + var wg2 sync.WaitGroup + wg2.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(cerror.ErrNoPendingRegion.Equal(err), check.IsTrue) + cdcClient.Close() //nolint:errcheck + wg2.Done() + }() + + // wait request id allocated with: new session, new request + waitRequestID(c, baseAllocatedID+1) + noPendingRegionEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: 3, + RequestId: currentRequestID() + 1, // an invalid request id + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 100}, + }, + }} + ch1 <- noPendingRegionEvent + wg2.Wait() + cancel() +} + +// TestDropStaleRequest tests kv client should drop an event if its request id is outdated. +func (s *etcdSuite) TestDropStaleRequest(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + cluster := mocktikv.NewCluster() + mvccStore := mocktikv.MustNewMVCCStore() + rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + defer kvStorage.Close() //nolint:errcheck + + regionID := uint64(3) + cluster.AddStore(1, addr1) + cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) + + baseAllocatedID := currentRequestID() + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + // wait request id allocated with: new session, new request + waitRequestID(c, baseAllocatedID+1) + + initialized := mockInitializedEvent(regionID, currentRequestID()) + eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: currentRequestID(), + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, + }, + // This event will be dropped + { + RegionId: regionID, + RequestId: currentRequestID() - 1, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 125}, + }, + { + RegionId: regionID, + RequestId: currentRequestID(), + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130}, + }, + }} + expected := []*model.RegionFeedEvent{ + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 100, + }, + RegionID: regionID, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 120, + }, + RegionID: regionID, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 130, + }, + RegionID: regionID, + }, + } + + ch1 <- initialized + ch1 <- eventsAfterInit + + for _, expectedEv := range expected { + select { + case event := <-eventCh: + c.Assert(event, check.DeepEquals, expectedEv) + case <-time.After(time.Second): + c.Errorf("expected event %v not received", expectedEv) + } + } + cancel() +}