Skip to content

Commit

Permalink
batch (ticdc): add etcdTxnMaxOps to limit the ops number of an etcd t…
Browse files Browse the repository at this point in the history
…xn (#3681)
  • Loading branch information
asddongmen authored Dec 3, 2021
1 parent 3a2d2c5 commit 6f70625
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 10 deletions.
7 changes: 6 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,14 @@ error = '''
the etcd txn should be aborted and retried immediately
'''

["CDC:ErrEtcdTxnOpsExceed"]
error = '''
patch ops:%d of a single changefeed exceed etcd txn max ops:%d
'''

["CDC:ErrEtcdTxnSizeExceed"]
error = '''
patch size of a single changefeed exceed etcd txn max size
patch size:%d of a single changefeed exceed etcd txn max size:%d
'''

["CDC:ErrEventFeedAborted"]
Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ var (
ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished"))
ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout"))
ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired"))
ErrEtcdTxnSizeExceed = errors.Normalize("patch size of a single changefeed exceed etcd txn max size", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed"))
ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed"))
ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed"))

// pipeline errors
ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline"))
Expand Down
20 changes: 16 additions & 4 deletions pkg/orchestrator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
// 1.25 MiB
// Ref: https://etcd.io/docs/v3.3/dev-guide/limit/
etcdTxnMaxSize = 1024 * (1024 + 256)
// Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops
etcdTxnMaxOps = 128
)

// getBatchChangedState has 4 return values:
Expand All @@ -40,12 +42,22 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat
if err != nil {
return nil, 0, 0, err
}
// if a changefeed's changedState size is large than etcdTxnMaxSize
// if a changefeed's changedState size is larger than etcdTxnMaxSize
// or the length of changedState is larger than etcdTxnMaxOps
// we should return an error instantly
if i == 0 && changedSize >= etcdTxnMaxSize {
return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs()
if i == 0 {
if changedSize > etcdTxnMaxSize {
return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize)
}
if len(changedState) > etcdTxnMaxOps {
return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps)
}
}
if totalSize+changedSize >= etcdTxnMaxSize {

// batchChangedState size should not exceeds the etcdTxnMaxSize limit
// and keys numbers should not exceeds the etcdTxnMaxOps limit
if totalSize+changedSize >= etcdTxnMaxSize ||
len(batchChangedState)+len(changedState) >= etcdTxnMaxOps {
break
}
for k, v := range changedState {
Expand Down
25 changes: 21 additions & 4 deletions pkg/orchestrator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,35 @@ func TestGetBatchChangeState(t *testing.T) {
require.Nil(t, err)
require.LessOrEqual(t, n, len(patchGroup))
require.LessOrEqual(t, size, etcdTxnMaxSize)
require.LessOrEqual(t, len(changedState), etcdTxnMaxOps)
require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")])

// test single patch exceed txn size
largePatches := []DataPatch{&SingleDataPatch{
// test single patch exceed txn max size
largeSizePatches := []DataPatch{&SingleDataPatch{
Key: util.NewEtcdKey("largePatch"),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = make([]byte, etcdTxnMaxSize)
return newValue, true, nil
},
}}
patchGroup = [][]DataPatch{largePatches}
patchGroup = [][]DataPatch{largeSizePatches}
_, _, _, err = getBatchChangedState(rawState, patchGroup)
require.NotNil(t, err)
require.Contains(t, err.Error(), "patch size of a single changefeed exceed etcd txn max size")
require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size")

// test single patch exceed txn max ops
manyOpsPatches := make([]DataPatch, 0)
for i := 0; i <= etcdTxnMaxOps*2; i++ {
manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{
Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = []byte(fmt.Sprintf("abc%d", i))
return newValue, true, nil
},
})
}
patchGroup = [][]DataPatch{manyOpsPatches}
_, _, _, err = getBatchChangedState(rawState, patchGroup)
require.NotNil(t, err)
require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops")
}

0 comments on commit 6f70625

Please sign in to comment.