From c9e0f1aa8785c68c258d693122aef7516b3f6a8f Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 28 Jul 2021 14:44:17 +0800 Subject: [PATCH] puller: remove memory buffer and buffer limitter (#2328) --- cdc/processor.go | 6 - cdc/processor/pipeline/puller.go | 4 - cdc/processor/pipeline/table.go | 4 +- cdc/processor/processor.go | 6 - cdc/puller/buffer.go | 188 ------------------------------- cdc/puller/buffer_test.go | 135 ---------------------- 6 files changed, 1 insertion(+), 342 deletions(-) delete mode 100644 cdc/puller/buffer.go delete mode 100644 cdc/puller/buffer_test.go diff --git a/cdc/processor.go b/cdc/processor.go index 681ff37e103..8213f7c7ab2 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -51,9 +51,6 @@ import ( ) const ( - // defaultMemBufferCapacity is the default memory buffer per change feed. - defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G - defaultSyncResolvedBatch = 1024 schemaStorageGCLag = time.Minute * 20 @@ -66,7 +63,6 @@ type oldProcessor struct { captureInfo model.CaptureInfo changefeedID string changefeed model.ChangeFeedInfo - limitter *puller.BlurResourceLimitter stopped int32 pdCli pd.Client @@ -158,7 +154,6 @@ func newProcessor( ) (*oldProcessor, error) { etcdCli := session.Client() cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli) - limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity) log.Info("start processor with startts", zap.Uint64("startts", checkpointTs), util.ZapFieldChangefeed(ctx)) @@ -191,7 +186,6 @@ func newProcessor( p := &oldProcessor{ id: uuid.New().String(), - limitter: limitter, captureInfo: captureInfo, changefeedID: changefeedID, changefeed: changefeed, diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 6f65bec1538..2aa38c5bcbe 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -29,8 +29,6 @@ import ( ) type pullerNode struct { - limitter *puller.BlurResourceLimitter - tableName string // quoted schema and table, used in metircs only tableID model.TableID @@ -40,10 +38,8 @@ type pullerNode struct { } func newPullerNode( - limitter *puller.BlurResourceLimitter, tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node { return &pullerNode{ - limitter: limitter, tableID: tableID, replicaInfo: replicaInfo, tableName: tableName, diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 0ee8b700318..be0efd4925e 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/puller" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/cdc/sink/common" serverConfig "github.com/pingcap/ticdc/pkg/config" @@ -162,7 +161,6 @@ const defaultRunnersSize = 5 // NewTablePipeline creates a table pipeline // TODO(leoppro): implement a mock kvclient to test the table pipeline func NewTablePipeline(ctx cdcContext.Context, - limitter *puller.BlurResourceLimitter, mounter entry.Mounter, tableID model.TableID, tableName string, @@ -191,7 +189,7 @@ func NewTablePipeline(ctx cdcContext.Context, runnerSize++ } p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) - p.AppendNode(ctx, "puller", newPullerNode(limitter, tableID, replicaInfo, tableName)) + p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", newSorterNode(tableName, tableID, flowController, mounter)) p.AppendNode(ctx, "mounter", newMounterNode()) if cyclicEnabled { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index dcdc801292a..78f4997f2fa 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -46,9 +46,6 @@ import ( ) const ( - // defaultMemBufferCapacity is the default memory buffer per change feed. - defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G - schemaStorageGCLag = time.Minute * 20 backoffBaseDelayInMs = 5 @@ -62,7 +59,6 @@ type processor struct { tables map[model.TableID]tablepipeline.TablePipeline - limitter *puller.BlurResourceLimitter schemaStorage entry.SchemaStorage filter *filter.Filter mounter entry.Mounter @@ -89,7 +85,6 @@ func newProcessor(ctx cdcContext.Context) *processor { changefeedID := ctx.ChangefeedVars().ID advertiseAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr p := &processor{ - limitter: puller.NewBlurResourceLimmter(defaultMemBufferCapacity), tables: make(map[model.TableID]tablepipeline.TablePipeline), errCh: make(chan error, 1), changefeedID: changefeedID, @@ -723,7 +718,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs) table := tablepipeline.NewTablePipeline( ctx, - p.limitter, p.mounter, tableID, tableNameStr, diff --git a/cdc/puller/buffer.go b/cdc/puller/buffer.go deleted file mode 100644 index db9a64fb75b..00000000000 --- a/cdc/puller/buffer.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "sync" - "sync/atomic" - "unsafe" - - "github.com/edwingeng/deque" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - cerror "github.com/pingcap/ticdc/pkg/errors" -) - -const ( - defaultBufferSize = 128000 -) - -// EventBuffer in a interface for communicating kv entries. -type EventBuffer interface { - // AddEntry adds an entry to the buffer, return ErrReachLimit if reach budget limit. - AddEntry(ctx context.Context, entry model.RegionFeedEvent) error - Get(ctx context.Context) (model.RegionFeedEvent, error) -} - -// ChanBuffer buffers kv entries -type ChanBuffer chan model.RegionFeedEvent - -var _ EventBuffer = makeChanBuffer() - -func makeChanBuffer() ChanBuffer { - return make(ChanBuffer, defaultBufferSize) -} - -// AddEntry adds an entry to the buffer -func (b ChanBuffer) AddEntry(ctx context.Context, entry model.RegionFeedEvent) error { - select { - case <-ctx.Done(): - return ctx.Err() - case b <- entry: - return nil - } -} - -// Get waits for an entry from the input channel but will stop with respect to the context -func (b ChanBuffer) Get(ctx context.Context) (model.RegionFeedEvent, error) { - select { - case <-ctx.Done(): - return model.RegionFeedEvent{}, ctx.Err() - case e := <-b: - return e, nil - } -} - -var _ EventBuffer = &memBuffer{} - -type memBuffer struct { - limitter *BlurResourceLimitter - - mu struct { - sync.Mutex - entries deque.Deque - } - signalCh chan struct{} -} - -// Passing nil will make a unlimited buffer. -func makeMemBuffer(limitter *BlurResourceLimitter) *memBuffer { - return &memBuffer{ - limitter: limitter, - mu: struct { - sync.Mutex - entries deque.Deque - }{ - entries: deque.NewDeque(), - }, - - signalCh: make(chan struct{}, 1), - } -} - -// AddEntry implements EventBuffer interface. -func (b *memBuffer) AddEntry(ctx context.Context, entry model.RegionFeedEvent) error { - b.mu.Lock() - if b.limitter != nil && b.limitter.OverBucget() { - b.mu.Unlock() - return cerror.ErrBufferReachLimit.GenWithStackByArgs() - } - - b.mu.entries.PushBack(entry) - if b.limitter != nil { - b.limitter.Add(int64(entrySize(entry))) - } - b.mu.Unlock() - - select { - case b.signalCh <- struct{}{}: - default: - } - - return nil -} - -// Get implements EventBuffer interface. -func (b *memBuffer) Get(ctx context.Context) (model.RegionFeedEvent, error) { - for { - b.mu.Lock() - if !b.mu.entries.Empty() { - e := b.mu.entries.PopFront().(model.RegionFeedEvent) - if b.limitter != nil { - b.limitter.Add(int64(-entrySize(e))) - } - b.mu.Unlock() - return e, nil - } - - b.mu.Unlock() - - select { - case <-ctx.Done(): - return model.RegionFeedEvent{}, ctx.Err() - case <-b.signalCh: - } - } -} - -// Size returns the memory size of memBuffer -func (b *memBuffer) Size() int64 { - b.mu.Lock() - defer b.mu.Unlock() - if b.limitter == nil { - return 0 - } - return atomic.LoadInt64(&b.limitter.used) -} - -var ( - sizeOfVal = unsafe.Sizeof(model.RawKVEntry{}) - sizeOfResolve = unsafe.Sizeof(model.ResolvedSpan{}) -) - -func entrySize(e model.RegionFeedEvent) int { - if e.Val != nil { - return int(sizeOfVal) + len(e.Val.Key) + len(e.Val.Value) - } else if e.Resolved != nil { - return int(sizeOfResolve) - } else { - log.Panic("unknow event type") - } - - return 0 -} - -// BlurResourceLimitter limit resource use. -type BlurResourceLimitter struct { - budget int64 - used int64 -} - -// NewBlurResourceLimmter create a BlurResourceLimitter. -func NewBlurResourceLimmter(budget int64) *BlurResourceLimitter { - return &BlurResourceLimitter{ - budget: budget, - } -} - -// Add used resource into limmter -func (rl *BlurResourceLimitter) Add(n int64) { - atomic.AddInt64(&rl.used, n) -} - -// OverBucget retun true if over budget. -func (rl *BlurResourceLimitter) OverBucget() bool { - return atomic.LoadInt64(&rl.used) >= atomic.LoadInt64(&rl.budget) -} diff --git a/cdc/puller/buffer_test.go b/cdc/puller/buffer_test.go deleted file mode 100644 index b1e71380893..00000000000 --- a/cdc/puller/buffer_test.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package puller - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/ticdc/cdc/model" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/regionspan" - "github.com/pingcap/ticdc/pkg/util/testleak" -) - -type bufferSuite struct{} - -var _ = check.Suite(&bufferSuite{}) - -func (bs *bufferSuite) TestCanAddAndReadEntriesInOrder(c *check.C) { - defer testleak.AfterTest(c)() - b := makeChanBuffer() - ctx := context.Background() - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - first, err := b.Get(ctx) - c.Assert(err, check.IsNil) - c.Assert(first.Val.CRTs, check.Equals, uint64(110)) - second, err := b.Get(ctx) - c.Assert(err, check.IsNil) - c.Assert(second.Resolved.ResolvedTs, check.Equals, uint64(111)) - }() - - err := b.AddEntry(ctx, model.RegionFeedEvent{ - Val: &model.RawKVEntry{CRTs: 110}, - }) - c.Assert(err, check.IsNil) - err = b.AddEntry(ctx, model.RegionFeedEvent{ - Resolved: &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{}, - ResolvedTs: 111, - }, - }) - c.Assert(err, check.IsNil) - - wg.Wait() -} - -func (bs *bufferSuite) TestWaitsCanBeCanceled(c *check.C) { - defer testleak.AfterTest(c)() - b := makeChanBuffer() - ctx := context.Background() - - timeout, cancel := context.WithTimeout(ctx, time.Millisecond) - defer cancel() - stopped := make(chan struct{}) - // sleep here to let context timeout first - time.Sleep(time.Millisecond) - go func() { - for { - err := b.AddEntry(timeout, model.RegionFeedEvent{ - Resolved: &model.ResolvedSpan{ - Span: regionspan.ComparableSpan{}, - ResolvedTs: 111, - }, - }) - if err == context.DeadlineExceeded { - close(stopped) - return - } - c.Assert(err, check.Equals, nil) - } - }() - select { - case <-stopped: - case <-time.After(10 * time.Millisecond): - c.Fatal("AddEntry doesn't stop in time.") - } -} - -type memBufferSuite struct{} - -var _ = check.Suite(&memBufferSuite{}) - -func (bs *memBufferSuite) TestMemBuffer(c *check.C) { - defer testleak.AfterTest(c)() - limitter := NewBlurResourceLimmter(1024 * 1024) - bf := makeMemBuffer(limitter) - - var err error - var entries []model.RegionFeedEvent - for { - entry := model.RegionFeedEvent{ - Val: &model.RawKVEntry{ - Value: make([]byte, 1024), - }, - } - err = bf.AddEntry(context.Background(), entry) - if err != nil { - break - } - - entries = append(entries, entry) - } - - c.Assert(cerror.ErrBufferReachLimit.Equal(err), check.IsTrue) - num := float64(bf.mu.entries.Len()) - nearNum := 1024.0 - c.Assert(num >= nearNum*0.9, check.IsTrue) - c.Assert(num <= nearNum*1.1, check.IsTrue) - - // Check can get back the entries. - var getEntries []model.RegionFeedEvent - for len(getEntries) < len(entries) { - entry, err := bf.Get(context.Background()) - c.Assert(err, check.IsNil) - getEntries = append(getEntries, entry) - } - c.Assert(getEntries, check.DeepEquals, entries) -}