Skip to content

Commit

Permalink
cherry pick pingcap#1325 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
amyangfei authored and ti-srebot committed Jan 26, 2021
1 parent d485c43 commit 0a69f47
Showing 1 changed file with 158 additions and 0 deletions.
158 changes: 158 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/security"
Expand Down Expand Up @@ -1138,3 +1139,160 @@ func (s *etcdSuite) TestConnArray(c *check.C) {

ca.Close()
}

// TestPendingRegionError tests kv client should return an error when receiving
// a new subscription (the first event of specific region) but the corresponding
// region is not found in pending regions.
func (s *etcdSuite) TestNoPendingRegionError(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)
defer func() {
close(ch1)
server1.Stop()
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck

cluster.AddStore(1, addr1)
cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4)

baseAllocatedID := currentRequestID()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
var wg2 sync.WaitGroup
wg2.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(cerror.ErrNoPendingRegion.Equal(err), check.IsTrue)
cdcClient.Close() //nolint:errcheck
wg2.Done()
}()

// wait request id allocated with: new session, new request
waitRequestID(c, baseAllocatedID+1)
noPendingRegionEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID() + 1, // an invalid request id
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 100},
},
}}
ch1 <- noPendingRegionEvent
wg2.Wait()
cancel()
}

// TestDropStaleRequest tests kv client should drop an event if its request id is outdated.
func (s *etcdSuite) TestDropStaleRequest(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

defer func() {
close(ch1)
server1.Stop()
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck

regionID := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4)

baseAllocatedID := currentRequestID()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait request id allocated with: new session, new request
waitRequestID(c, baseAllocatedID+1)

initialized := mockInitializedEvent(regionID, currentRequestID())
eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: currentRequestID(),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120},
},
// This event will be dropped
{
RegionId: regionID,
RequestId: currentRequestID() - 1,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 125},
},
{
RegionId: regionID,
RequestId: currentRequestID(),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130},
},
}}
expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 100,
},
RegionID: regionID,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 120,
},
RegionID: regionID,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 130,
},
RegionID: regionID,
},
}

ch1 <- initialized
ch1 <- eventsAfterInit

for _, expectedEv := range expected {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
}
cancel()
}

0 comments on commit 0a69f47

Please sign in to comment.