Skip to content

Commit

Permalink
*: port sink flow control for old processor & new processor to releas…
Browse files Browse the repository at this point in the history
…e-5.0 (pingcap#1840)
  • Loading branch information
liuzix authored May 27, 2021
1 parent 5c008b3 commit 5069d1d
Show file tree
Hide file tree
Showing 15 changed files with 1,128 additions and 44 deletions.
8 changes: 8 additions & 0 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
tableMemoryGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
}, []string{"changefeed", "capture", "table"})
)

// initProcessorMetrics registers all metrics used in processor
Expand All @@ -95,4 +102,5 @@ func initProcessorMetrics(registry *prometheus.Registry) {
registry.MustRegister(txnCounter)
registry.MustRegister(updateInfoDuration)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(tableMemoryGauge)
}
173 changes: 147 additions & 26 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/pingcap/ticdc/cdc/puller"
psorter "github.com/pingcap/ticdc/cdc/puller/sorter"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
Expand All @@ -58,6 +60,11 @@ const (
defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20

// for better sink performance under flow control
resolvedTsInterpolateInterval = 200 * time.Millisecond
flushMemoryMetricsDuration = time.Second * 5
flowControlOutChSize = 128
)

type oldProcessor struct {
Expand Down Expand Up @@ -919,7 +926,117 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc()
}

const maxLagWithCheckpointTs = (30 * 1000) << 18 // 30s
// runFlowControl controls the flow of events out of the sorter.
func (p *oldProcessor) runFlowControl(
ctx context.Context,
tableID model.TableID,
flowController *common.TableFlowController,
inCh <-chan *model.PolymorphicEvent,
outCh chan<- *model.PolymorphicEvent) {
var (
lastSendResolvedTsTime time.Time
lastCRTs, lastSentResolvedTs uint64
)

for {
select {
case <-ctx.Done():
// NOTE: This line is buggy, because `context.Canceled` may indicate an actual error.
// TODO Will be resolved together with other similar problems.
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case event, ok := <-inCh:
if !ok {
// sorter output channel has been closed.
// The sorter must have exited and has a reportable exit reason,
// so we don't need to worry about sending an error here.
log.Info("sorter output channel closed",
zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx))
return
}

if event == nil || event.RawKV == nil {
// This is an invariant violation.
log.Panic("unexpected empty event", zap.Reflect("event", event))
}

if event.RawKV.OpType != model.OpTypeResolved {
size := uint64(event.RawKV.ApproximateSize())
commitTs := event.CRTs
// We interpolate a resolved-ts if none has been sent for some time.
if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval {
// Refer to `cdc/processor/pipeline/sorter.go` for detailed explanation of the design.
// This is a backport.
if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs {
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
interpolatedEvent := model.NewResolvedPolymorphicEvent(0, lastCRTs)

select {
case <-ctx.Done():
// TODO fix me
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case outCh <- interpolatedEvent:
}
}
}
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
err := flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
// Not sending a Resolved Event here will very likely deadlock the pipeline.
// NOTE: This is NOT an optimization, but is for liveness.
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()

msg := model.NewResolvedPolymorphicEvent(0, lastCRTs)
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- msg:
}
}
return nil
})
if err != nil {
log.Error("flow control error", zap.Error(err))
if cerror.ErrFlowControllerAborted.Equal(err) {
log.Info("flow control cancelled for table",
zap.Int64("tableID", tableID),
util.ZapFieldChangefeed(ctx))
} else {
p.sendError(ctx.Err())
}
return
}
lastCRTs = commitTs
} else {
// handle OpTypeResolved
if event.CRTs < lastSentResolvedTs {
continue
}
lastSentResolvedTs = event.CRTs
lastSendResolvedTsTime = time.Now()
}

select {
case <-ctx.Done():
// TODO fix me
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case outCh <- event:
}
}
}
}

// sorterConsume receives sorted PolymorphicEvent from sorter of each table and
// sends to processor's output chan
Expand All @@ -933,7 +1050,7 @@ func (p *oldProcessor) sorterConsume(
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
) {
var lastResolvedTs, lastCheckPointTs uint64
var lastResolvedTs uint64
opDone := false
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
checkDoneTicker := time.NewTicker(1 * time.Second)
Expand Down Expand Up @@ -1022,6 +1139,19 @@ func (p *oldProcessor) sorterConsume(
}
defer globalResolvedTsReceiver.Stop()

perTableMemoryQuota := serverConfig.GetGlobalServerConfig().PerTableMemoryQuota
log.Debug("creating table flow controller",
zap.String("table-name", tableName),
zap.Int64("table-id", tableID),
zap.Uint64("quota", perTableMemoryQuota),
util.ZapFieldChangefeed(ctx))

flowController := common.NewTableFlowController(perTableMemoryQuota)
defer func() {
flowController.Abort()
tableMemoryGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
}()

sendResolvedTs2Sink := func() error {
localResolvedTs := atomic.LoadUint64(&p.localResolvedTs)
globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs)
Expand All @@ -1044,52 +1174,43 @@ func (p *oldProcessor) sorterConsume(
}
return err
}
lastCheckPointTs = checkpointTs

if checkpointTs < replicaInfo.StartTs {
checkpointTs = replicaInfo.StartTs
}

if checkpointTs != 0 {
atomic.StoreUint64(pCheckpointTs, checkpointTs)
flowController.Release(checkpointTs)
p.localCheckpointTsNotifier.Notify()
}
return nil
}

flowControlOutCh := make(chan *model.PolymorphicEvent, flowControlOutChSize)
go func() {
p.runFlowControl(ctx, tableID, flowController, sorter.Output(), flowControlOutCh)
close(flowControlOutCh)
}()

metricsTableMemoryGauge := tableMemoryGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
metricsTicker := time.NewTicker(flushMemoryMetricsDuration)
defer metricsTicker.Stop()

for {
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case pEvent := <-sorter.Output():
case <-metricsTicker.C:
metricsTableMemoryGauge.Set(float64(flowController.GetConsumption()))
case pEvent := <-flowControlOutCh:
if pEvent == nil {
continue
}

for lastResolvedTs > maxLagWithCheckpointTs+lastCheckPointTs {
log.Debug("the lag between local checkpoint Ts and local resolved Ts is too lang",
zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("lastCheckPointTs", lastCheckPointTs),
zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx))
select {
case <-ctx.Done():
if ctx.Err() != context.Canceled {
p.sendError(errors.Trace(ctx.Err()))
}
return
case <-globalResolvedTsReceiver.C:
if err := sendResolvedTs2Sink(); err != nil {
// error is already sent to processor, so we can just ignore it
return
}
case <-checkDoneTicker.C:
if !opDone {
checkDone()
}
}
}

pEvent.SetUpFinishedChan()
select {
case <-ctx.Done():
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/pipeline/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ var (
Name: "txn_count",
Help: "txn count received/executed by this processor",
}, []string{"type", "changefeed", "capture"})
tableMemoryGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
}, []string{"changefeed", "capture", "table"})
)

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(tableResolvedTsGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(tableMemoryGauge)
}
9 changes: 8 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,20 @@ type sinkNode struct {

eventBuffer []*model.PolymorphicEvent
rowBuffer []*model.RowChangedEvent

flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode {
func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
resolvedTs: startTs,
checkpointTs: startTs,
barrierTs: startTs,

flowController: flowController,
}
}

Expand Down Expand Up @@ -130,6 +134,8 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down Expand Up @@ -216,5 +222,6 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
n.status.store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close()
}
26 changes: 22 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ type mockSink struct {
}
}

// mockFlowController is created because a real tableFlowController cannot be used
// we are testing sinkNode by itself.
type mockFlowController struct{}

func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
return nil
}

func (c *mockFlowController) Release(resolvedTs uint64) {
}

func (c *mockFlowController) Abort() {
}

func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}
Expand Down Expand Up @@ -90,7 +108,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
ctx := context.NewContext(stdContext.Background(), &context.Vars{})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10)
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -116,7 +134,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10)
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -138,7 +156,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(6))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10)
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -164,7 +182,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.Vars{})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10)
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down
Loading

0 comments on commit 5069d1d

Please sign in to comment.