Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch (ticdc): add etcdTxnMaxOps to limit the ops number of an etcd txn #3681

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,14 @@ error = '''
the etcd txn should be aborted and retried immediately
'''

["CDC:ErrEtcdTxnOpsExceed"]
error = '''
patch ops:%d of a single changefeed exceed etcd txn max ops:%d
'''

["CDC:ErrEtcdTxnSizeExceed"]
error = '''
patch size of a single changefeed exceed etcd txn max size
patch size:%d of a single changefeed exceed etcd txn max size:%d
'''

["CDC:ErrEventFeedAborted"]
Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ var (
ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished"))
ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout"))
ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired"))
ErrEtcdTxnSizeExceed = errors.Normalize("patch size of a single changefeed exceed etcd txn max size", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed"))
ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed"))
ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed"))

// pipeline errors
ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline"))
Expand Down
20 changes: 16 additions & 4 deletions pkg/orchestrator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
// 1.25 MiB
// Ref: https://etcd.io/docs/v3.3/dev-guide/limit/
etcdTxnMaxSize = 1024 * (1024 + 256)
// Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops
etcdTxnMaxOps = 128
)

// getBatchChangedState has 4 return values:
Expand All @@ -40,12 +42,22 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat
if err != nil {
return nil, 0, 0, err
}
// if a changefeed's changedState size is large than etcdTxnMaxSize
// if a changefeed's changedState size is larger than etcdTxnMaxSize
// or the length of changedState is larger than etcdTxnMaxOps
// we should return an error instantly
if i == 0 && changedSize >= etcdTxnMaxSize {
return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs()
if i == 0 {
if changedSize > etcdTxnMaxSize {
return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize)
}
if len(changedState) > etcdTxnMaxOps {
return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps)
}
}
if totalSize+changedSize >= etcdTxnMaxSize {

// batchChangedState size should not exceeds the etcdTxnMaxSize limit
// and keys numbers should not exceeds the etcdTxnMaxOps limit
if totalSize+changedSize >= etcdTxnMaxSize ||
len(batchChangedState)+len(changedState) >= etcdTxnMaxOps {
break
}
for k, v := range changedState {
Expand Down
25 changes: 21 additions & 4 deletions pkg/orchestrator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,35 @@ func TestGetBatchChangeState(t *testing.T) {
require.Nil(t, err)
require.LessOrEqual(t, n, len(patchGroup))
require.LessOrEqual(t, size, etcdTxnMaxSize)
require.LessOrEqual(t, len(changedState), etcdTxnMaxOps)
require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")])

// test single patch exceed txn size
largePatches := []DataPatch{&SingleDataPatch{
// test single patch exceed txn max size
largeSizePatches := []DataPatch{&SingleDataPatch{
Key: util.NewEtcdKey("largePatch"),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = make([]byte, etcdTxnMaxSize)
return newValue, true, nil
},
}}
patchGroup = [][]DataPatch{largePatches}
patchGroup = [][]DataPatch{largeSizePatches}
_, _, _, err = getBatchChangedState(rawState, patchGroup)
require.NotNil(t, err)
require.Contains(t, err.Error(), "patch size of a single changefeed exceed etcd txn max size")
require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size")

// test single patch exceed txn max ops
manyOpsPatches := make([]DataPatch, 0)
for i := 0; i <= etcdTxnMaxOps*2; i++ {
manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{
Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = []byte(fmt.Sprintf("abc%d", i))
return newValue, true, nil
},
})
}
patchGroup = [][]DataPatch{manyOpsPatches}
_, _, _, err = getBatchChangedState(rawState, patchGroup)
require.NotNil(t, err)
require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops")
}