Skip to content

Commit

Permalink
tests: test TiKV is not compatible with TiCDC to simulate rolling upd…
Browse files Browse the repository at this point in the history
…ate (#1298) (#1337)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jan 25, 2021
1 parent 2444810 commit d485c43
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -728,7 +729,11 @@ MainLoop:
zap.String("error", err.Error()))
if cerror.ErrVersionIncompatible.Equal(err) {
// It often occurs on rolling update. Sleep 20s to reduce logs.
time.Sleep(20 * time.Second)
delay := 20 * time.Second
failpoint.Inject("kvClientDelayWhenIncompatible", func() {
delay = 100 * time.Millisecond
})
time.Sleep(delay)
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, needReloadRegion(sri.failStoreIDs, rpcCtx), err)
Expand Down
126 changes: 108 additions & 18 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
"context"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -141,7 +143,7 @@ func newMockServiceSpecificAddr(

type mockPDClient struct {
pd.Client
version string
versionGen func() string
}

var _ pd.Client = &mockPDClient{}
Expand All @@ -151,10 +153,14 @@ func (m *mockPDClient) GetStore(ctx context.Context, storeID uint64) (*metapb.St
if err != nil {
return nil, err
}
s.Version = m.version
s.Version = m.versionGen()
return s, nil
}

var defaultVersionGen = func() string {
return version.MinTiKVVersion.String()
}

// waitRequestID waits request ID larger than the given allocated ID
func waitRequestID(c *check.C, allocatedID uint64) {
err := retry.Run(time.Millisecond*20, 10, func() error {
Expand Down Expand Up @@ -186,7 +192,7 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: version.MinTiKVVersion.String()}
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck
Expand Down Expand Up @@ -274,7 +280,7 @@ func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) {
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: version.MinTiKVVersion.String()}
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
defer pdClient.Close() //nolint:errcheck
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -364,7 +370,7 @@ func (s *etcdSuite) TestHandleError(c *check.C) {
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: version.MinTiKVVersion.String()}
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck
Expand Down Expand Up @@ -525,7 +531,7 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: version.MinTiKVVersion.String()}
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck
Expand Down Expand Up @@ -924,7 +930,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: version.MinTiKVVersion.String()}
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck
Expand Down Expand Up @@ -1004,28 +1010,112 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
cancel()
}

// TODO enable the test
func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) {
// TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible`
// error (in fact this error is raised before EventFeed API is really called),
// TiCDC will wait 20s and then retry. This is a common scenario when rolling
// upgrade a cluster and the new version is not compatible with the old version
// (upgrade TiCDC before TiKV, since upgrade TiKV often takes much longer).
func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

// the minimum valid TiKV version is "4.0.0-rc.1"
incompatibilityVers := []string{"v2.1.10", "v3.0.10", "v3.1.0", "v4.0.0-rc"}
nextVer := -1
call := int32(0)
// 20 here not too much, since check version itself has 3 time retry, and
// region cache could also call get store API, which will trigger version
// generator too.
versionGenCallBoundary := int32(20)
gen := func() string {
atomic.AddInt32(&call, 1)
if atomic.LoadInt32(&call) < versionGenCallBoundary {
nextVer = (nextVer + 1) % len(incompatibilityVers)
return incompatibilityVers[nextVer]
}
return defaultVersionGen()
}

var requestIds sync.Map
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
server1, addr1 := newMockService(ctx, c, srv1, wg)

defer func() {
close(ch1)
server1.Stop()
wg.Wait()
}()

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, version: "v2.1.0" /* CDC is not compatible with 2.1.0 */}
pdClient = &mockPDClient{Client: pdClient, versionGen: gen}
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
defer kvStorage.Close() //nolint:errcheck

cluster.AddStore(1, "localhost:23375")
cluster.Bootstrap(2, []uint64{1}, []uint64{3}, 3)
regionID := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientDelayWhenIncompatible", "return(true)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientDelayWhenIncompatible")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh)
_ = err
// TODO find a way to verify the error
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

err = retry.Run(time.Millisecond*500, 20, func() error {
if atomic.LoadInt32(&call) >= versionGenCallBoundary {
return nil
}
return errors.Errorf("version generator is not updated in time, call time %d", atomic.LoadInt32(&call))
})
c.Assert(err, check.IsNil)
err = retry.Run(time.Millisecond*200, 10, func() error {
_, ok := requestIds.Load(regionID)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
reqID, _ := requestIds.Load(regionID)
initialized := mockInitializedEvent(regionID, reqID.(uint64))
ch1 <- initialized
select {
case event := <-eventCh:
c.Assert(event.Resolved, check.NotNil)
c.Assert(event.RegionID, check.Equals, regionID)
case <-time.After(time.Second):
c.Errorf("expected events are not receive")
}

cancel()
}

// Use etcdSuite for some special reasons, the embed etcd uses zap as the only candidate
Expand Down

0 comments on commit d485c43

Please sign in to comment.