diff --git a/errors.toml b/errors.toml index 49d16b842b0..dae1872ec45 100755 --- a/errors.toml +++ b/errors.toml @@ -236,9 +236,14 @@ error = ''' the etcd txn should be aborted and retried immediately ''' +["CDC:ErrEtcdTxnOpsExceed"] +error = ''' +patch ops:%d of a single changefeed exceed etcd txn max ops:%d +''' + ["CDC:ErrEtcdTxnSizeExceed"] error = ''' -patch size of a single changefeed exceed etcd txn max size +patch size:%d of a single changefeed exceed etcd txn max size:%d ''' ["CDC:ErrEventFeedAborted"] diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 8c8dc5117c2..14761634a8a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -207,7 +207,8 @@ var ( ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) - ErrEtcdTxnSizeExceed = errors.Normalize("patch size of a single changefeed exceed etcd txn max size", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed")) // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go index 4d28ab35f5c..fec17080a3c 100644 --- a/pkg/orchestrator/batch.go +++ b/pkg/orchestrator/batch.go @@ -23,6 +23,8 @@ const ( // 1.25 MiB // Ref: https://etcd.io/docs/v3.3/dev-guide/limit/ etcdTxnMaxSize = 1024 * (1024 + 256) + // Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops + etcdTxnMaxOps = 128 ) // getBatchChangedState has 4 return values: @@ -40,12 +42,22 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat if err != nil { return nil, 0, 0, err } - // if a changefeed's changedState size is large than etcdTxnMaxSize + // if a changefeed's changedState size is larger than etcdTxnMaxSize + // or the length of changedState is larger than etcdTxnMaxOps // we should return an error instantly - if i == 0 && changedSize >= etcdTxnMaxSize { - return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs() + if i == 0 { + if changedSize > etcdTxnMaxSize { + return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize) + } + if len(changedState) > etcdTxnMaxOps { + return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps) + } } - if totalSize+changedSize >= etcdTxnMaxSize { + + // batchChangedState size should not exceeds the etcdTxnMaxSize limit + // and keys numbers should not exceeds the etcdTxnMaxOps limit + if totalSize+changedSize >= etcdTxnMaxSize || + len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { break } for k, v := range changedState { diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go index 47185e607d2..9a7267a6f9d 100644 --- a/pkg/orchestrator/batch_test.go +++ b/pkg/orchestrator/batch_test.go @@ -41,18 +41,35 @@ func TestGetBatchChangeState(t *testing.T) { require.Nil(t, err) require.LessOrEqual(t, n, len(patchGroup)) require.LessOrEqual(t, size, etcdTxnMaxSize) + require.LessOrEqual(t, len(changedState), etcdTxnMaxOps) require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")]) - // test single patch exceed txn size - largePatches := []DataPatch{&SingleDataPatch{ + // test single patch exceed txn max size + largeSizePatches := []DataPatch{&SingleDataPatch{ Key: util.NewEtcdKey("largePatch"), Func: func(old []byte) (newValue []byte, changed bool, err error) { newValue = make([]byte, etcdTxnMaxSize) return newValue, true, nil }, }} - patchGroup = [][]DataPatch{largePatches} + patchGroup = [][]DataPatch{largeSizePatches} _, _, _, err = getBatchChangedState(rawState, patchGroup) require.NotNil(t, err) - require.Contains(t, err.Error(), "patch size of a single changefeed exceed etcd txn max size") + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size") + + // test single patch exceed txn max ops + manyOpsPatches := make([]DataPatch, 0) + for i := 0; i <= etcdTxnMaxOps*2; i++ { + manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }) + } + patchGroup = [][]DataPatch{manyOpsPatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops") }