From 550e24e5f5ae17b6a081e500a051e27575e8b872 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 1 Dec 2021 10:09:17 +0800 Subject: [PATCH 1/2] batch (ticdc): add etcdTxnMaxOps to limit the ops number of a etcd txn --- pkg/orchestrator/batch.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go index 4d28ab35f5c..c4cdca2b450 100644 --- a/pkg/orchestrator/batch.go +++ b/pkg/orchestrator/batch.go @@ -23,6 +23,8 @@ const ( // 1.25 MiB // Ref: https://etcd.io/docs/v3.3/dev-guide/limit/ etcdTxnMaxSize = 1024 * (1024 + 256) + // Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops + etcdTxnMaxOps = 128 ) // getBatchChangedState has 4 return values: @@ -40,12 +42,17 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat if err != nil { return nil, 0, 0, err } - // if a changefeed's changedState size is large than etcdTxnMaxSize + // if a changefeed's changedState size is larger than etcdTxnMaxSize + // or the length of changedState is larger than etcdTxnMaxOps // we should return an error instantly - if i == 0 && changedSize >= etcdTxnMaxSize { + if i == 0 && changedSize >= etcdTxnMaxSize || + len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs() } - if totalSize+changedSize >= etcdTxnMaxSize { + // batchChangedState size should not exceeds the etcdTxnMaxSize limit + // and keys numbers should not exceeds the etcdTxnMaxOps limit + if totalSize+changedSize >= etcdTxnMaxSize || + len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { break } for k, v := range changedState { From e2f9a0bd0ff3eec9e78424177931bc60f9f18cd8 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 1 Dec 2021 10:48:55 +0800 Subject: [PATCH 2/2] batch (ticdc): add unit test --- errors.toml | 7 ++++++- pkg/errors/errors.go | 3 ++- pkg/orchestrator/batch.go | 11 ++++++++--- pkg/orchestrator/batch_test.go | 25 +++++++++++++++++++++---- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/errors.toml b/errors.toml index fc5f5ed16b1..c211356db90 100755 --- a/errors.toml +++ b/errors.toml @@ -241,9 +241,14 @@ error = ''' the etcd txn should be aborted and retried immediately ''' +["CDC:ErrEtcdTxnOpsExceed"] +error = ''' +patch ops:%d of a single changefeed exceed etcd txn max ops:%d +''' + ["CDC:ErrEtcdTxnSizeExceed"] error = ''' -patch size of a single changefeed exceed etcd txn max size +patch size:%d of a single changefeed exceed etcd txn max size:%d ''' ["CDC:ErrEventFeedAborted"] diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 2e57c7930bb..e4bdc07da06 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -214,7 +214,8 @@ var ( ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) - ErrEtcdTxnSizeExceed = errors.Normalize("patch size of a single changefeed exceed etcd txn max size", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed")) // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go index c4cdca2b450..fec17080a3c 100644 --- a/pkg/orchestrator/batch.go +++ b/pkg/orchestrator/batch.go @@ -45,10 +45,15 @@ func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPat // if a changefeed's changedState size is larger than etcdTxnMaxSize // or the length of changedState is larger than etcdTxnMaxOps // we should return an error instantly - if i == 0 && changedSize >= etcdTxnMaxSize || - len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { - return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs() + if i == 0 { + if changedSize > etcdTxnMaxSize { + return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize) + } + if len(changedState) > etcdTxnMaxOps { + return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps) + } } + // batchChangedState size should not exceeds the etcdTxnMaxSize limit // and keys numbers should not exceeds the etcdTxnMaxOps limit if totalSize+changedSize >= etcdTxnMaxSize || diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go index 47185e607d2..9a7267a6f9d 100644 --- a/pkg/orchestrator/batch_test.go +++ b/pkg/orchestrator/batch_test.go @@ -41,18 +41,35 @@ func TestGetBatchChangeState(t *testing.T) { require.Nil(t, err) require.LessOrEqual(t, n, len(patchGroup)) require.LessOrEqual(t, size, etcdTxnMaxSize) + require.LessOrEqual(t, len(changedState), etcdTxnMaxOps) require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")]) - // test single patch exceed txn size - largePatches := []DataPatch{&SingleDataPatch{ + // test single patch exceed txn max size + largeSizePatches := []DataPatch{&SingleDataPatch{ Key: util.NewEtcdKey("largePatch"), Func: func(old []byte) (newValue []byte, changed bool, err error) { newValue = make([]byte, etcdTxnMaxSize) return newValue, true, nil }, }} - patchGroup = [][]DataPatch{largePatches} + patchGroup = [][]DataPatch{largeSizePatches} _, _, _, err = getBatchChangedState(rawState, patchGroup) require.NotNil(t, err) - require.Contains(t, err.Error(), "patch size of a single changefeed exceed etcd txn max size") + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size") + + // test single patch exceed txn max ops + manyOpsPatches := make([]DataPatch, 0) + for i := 0; i <= etcdTxnMaxOps*2; i++ { + manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }) + } + patchGroup = [][]DataPatch{manyOpsPatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops") }