Skip to content

Commit

Permalink
puller: remove memory buffer and buffer limitter (#2328)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jul 28, 2021
1 parent 6fa8b92 commit c9e0f1a
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 342 deletions.
6 changes: 0 additions & 6 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ import (
)

const (
// defaultMemBufferCapacity is the default memory buffer per change feed.
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20
Expand All @@ -66,7 +63,6 @@ type oldProcessor struct {
captureInfo model.CaptureInfo
changefeedID string
changefeed model.ChangeFeedInfo
limitter *puller.BlurResourceLimitter
stopped int32

pdCli pd.Client
Expand Down Expand Up @@ -158,7 +154,6 @@ func newProcessor(
) (*oldProcessor, error) {
etcdCli := session.Client()
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

log.Info("start processor with startts",
zap.Uint64("startts", checkpointTs), util.ZapFieldChangefeed(ctx))
Expand Down Expand Up @@ -191,7 +186,6 @@ func newProcessor(

p := &oldProcessor{
id: uuid.New().String(),
limitter: limitter,
captureInfo: captureInfo,
changefeedID: changefeedID,
changefeed: changefeed,
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
)

type pullerNode struct {
limitter *puller.BlurResourceLimitter

tableName string // quoted schema and table, used in metircs only

tableID model.TableID
Expand All @@ -40,10 +38,8 @@ type pullerNode struct {
}

func newPullerNode(
limitter *puller.BlurResourceLimitter,
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node {
return &pullerNode{
limitter: limitter,
tableID: tableID,
replicaInfo: replicaInfo,
tableName: tableName,
Expand Down
4 changes: 1 addition & 3 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -162,7 +161,6 @@ const defaultRunnersSize = 5
// NewTablePipeline creates a table pipeline
// TODO(leoppro): implement a mock kvclient to test the table pipeline
func NewTablePipeline(ctx cdcContext.Context,
limitter *puller.BlurResourceLimitter,
mounter entry.Mounter,
tableID model.TableID,
tableName string,
Expand Down Expand Up @@ -191,7 +189,7 @@ func NewTablePipeline(ctx cdcContext.Context,
runnerSize++
}
p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
p.AppendNode(ctx, "puller", newPullerNode(limitter, tableID, replicaInfo, tableName))
p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", newSorterNode(tableName, tableID, flowController, mounter))
p.AppendNode(ctx, "mounter", newMounterNode())
if cyclicEnabled {
Expand Down
6 changes: 0 additions & 6 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ import (
)

const (
// defaultMemBufferCapacity is the default memory buffer per change feed.
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

schemaStorageGCLag = time.Minute * 20

backoffBaseDelayInMs = 5
Expand All @@ -62,7 +59,6 @@ type processor struct {

tables map[model.TableID]tablepipeline.TablePipeline

limitter *puller.BlurResourceLimitter
schemaStorage entry.SchemaStorage
filter *filter.Filter
mounter entry.Mounter
Expand All @@ -89,7 +85,6 @@ func newProcessor(ctx cdcContext.Context) *processor {
changefeedID := ctx.ChangefeedVars().ID
advertiseAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p := &processor{
limitter: puller.NewBlurResourceLimmter(defaultMemBufferCapacity),
tables: make(map[model.TableID]tablepipeline.TablePipeline),
errCh: make(chan error, 1),
changefeedID: changefeedID,
Expand Down Expand Up @@ -723,7 +718,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs)
table := tablepipeline.NewTablePipeline(
ctx,
p.limitter,
p.mounter,
tableID,
tableNameStr,
Expand Down
188 changes: 0 additions & 188 deletions cdc/puller/buffer.go

This file was deleted.

Loading

0 comments on commit c9e0f1a

Please sign in to comment.