Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3277
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Nov 11, 2021
1 parent 39e07b1 commit ddea6dd
Show file tree
Hide file tree
Showing 14 changed files with 1,405 additions and 47 deletions.
3 changes: 2 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea
if err != nil {
return errors.Trace(err)
}
if err := etcdWorker.Run(ctx, c.session, timerInterval); err != nil {
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
21 changes: 21 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@ import (
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/cdc/puller/sorter"
"github.com/pingcap/ticdc/cdc/sink"
<<<<<<< HEAD
"github.com/pingcap/ticdc/pkg/config"
=======
"github.com/pingcap/ticdc/cdc/sorter"
"github.com/pingcap/ticdc/cdc/sorter/leveldb"
"github.com/pingcap/ticdc/cdc/sorter/memory"
"github.com/pingcap/ticdc/cdc/sorter/unified"
"github.com/pingcap/ticdc/pkg/actor"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator"
>>>>>>> 2569abaa3 (etcd_worker: batch etcd patch (#3277))
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -36,6 +46,17 @@ func init() {
puller.InitMetrics(registry)
sink.InitMetrics(registry)
entry.InitMetrics(registry)
<<<<<<< HEAD
=======
processor.InitMetrics(registry)
tablepipeline.InitMetrics(registry)
owner.InitMetrics(registry)
etcd.InitMetrics(registry)
initServerMetrics(registry)
actor.InitMetrics(registry)
orchestrator.InitMetrics(registry)
// Sorter metrics
>>>>>>> 2569abaa3 (etcd_worker: batch etcd patch (#3277))
sorter.InitMetrics(registry)
if config.NewReplicaImpl {
processor.InitMetrics(registry)
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
}

// GetPatches implements the ReactorState interface
<<<<<<< HEAD:cdc/model/reactor_state.go
func (s *GlobalReactorState) GetPatches() [][]orchestrator.DataPatch {
=======
// Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState
func (s *GlobalReactorState) GetPatches() [][]DataPatch {
>>>>>>> 2569abaa3 (etcd_worker: batch etcd patch (#3277)):pkg/orchestrator/reactor_state.go
pendingPatches := s.pendingPatches
for _, changefeedState := range s.Changefeeds {
pendingPatches = append(pendingPatches, changefeedState.getPatches())
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ error = '''
the etcd txn should be aborted and retried immediately
'''

["CDC:ErrEtcdTxnSizeExceed"]
error = '''
patch size of a single changefeed exceed etcd txn max size
'''

["CDC:ErrEventFeedAborted"]
error = '''
single event feed aborted
Expand Down
Loading

0 comments on commit ddea6dd

Please sign in to comment.