Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11296
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Jun 13, 2024
1 parent c092599 commit 631a748
Show file tree
Hide file tree
Showing 12 changed files with 1,090 additions and 0 deletions.
46 changes: 46 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (p *processor) AddTable(
zap.Bool("isPrepare", isPrepare))
}

<<<<<<< HEAD
if p.pullBasedSinking {
table := p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
if p.redoDMLMgr.Enabled() {
Expand All @@ -232,6 +233,15 @@ func (p *processor) AddTable(
}
p.tables[tableID] = table
}
=======
table := p.sinkManager.r.AddTable(
span, startTs, p.latestInfo.TargetTs)
if p.redo.r.Enabled() {
p.redo.r.AddTable(span, startTs)
}

p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs)
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
return true, nil
}

Expand Down Expand Up @@ -754,6 +764,16 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if p.initialized {
Expand Down Expand Up @@ -910,7 +930,33 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
}

<<<<<<< HEAD
p.agent, err = p.newAgent(ctx, p.liveness, p.changefeedEpoch)
=======
isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
isMysqlBackend)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
p.sinkManager.name = "SinkManager"
p.sinkManager.changefeedID = p.changefeedID
p.sinkManager.spawn(prcCtx)

// Bind them so that sourceManager can notify sinkManager.r.
p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs)
p.agent, err = p.newAgent(prcCtx, p.liveness, p.changefeedEpoch, p.cfg, p.ownerCaptureInfoClient)
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
if err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,17 @@ func New(
schemaStorage entry.SchemaStorage,
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
<<<<<<< HEAD
errChan chan error,
warnChan chan error,
isMysqlBackend bool,
metricsTableSinkTotalRows prometheus.Counter,
metricsTableSinkFlushLagDuration prometheus.Observer,
) (*SinkManager, error) {
=======
isMysqlBackend bool,
) *SinkManager {
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
m := &SinkManager{
changefeedID: changefeedID,
changefeedInfo: changefeedInfo,
Expand All @@ -165,8 +170,13 @@ func New(
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),
isMysqlBackend: isMysqlBackend,
<<<<<<< HEAD

metricsTableSinkTotalRows: metricsTableSinkTotalRows,
=======
metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))

metricsTableSinkFlushLagDuration: metricsTableSinkFlushLagDuration,
}
Expand Down Expand Up @@ -319,7 +329,11 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}

if m.isMysqlBackend {
<<<<<<< HEAD
// For MySQL backend, we should restart sink. Let owner to handle the error.
=======
// For MySQL backend, we should restart changefeed. Let owner to handle the error.
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -858,7 +872,11 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
<<<<<<< HEAD
func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
=======
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
tableID,
Expand All @@ -885,7 +903,11 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
log.Panic("Add an exists table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
<<<<<<< HEAD
zap.Int64("tableID", tableID))
=======
zap.Stringer("span", &span))
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
}
m.sinkMemQuota.AddTable(tableID)
m.redoMemQuota.AddTable(tableID)
Expand Down
10 changes: 10 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,21 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskError")
}()

<<<<<<< HEAD
source.AddTable(1)
manager.AddTable(1, 100, math.MaxUint64)
manager.StartTable(1, 100)
source.Add(1, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(1, 101)
=======
span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100, func() model.Ts { return 0 })
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(span, 101)
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
manager.UpdateBarrierTs(101, nil)

timer := time.NewTimer(5 * time.Second)
Expand Down
97 changes: 97 additions & 0 deletions cdc/processor/sinkmanager/manager_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sinkmanager

import (
"context"
"math"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/sorter"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/sorter/memory"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/upstream"
pd "github.com/tikv/pd/client"
)

// MockPD only for test.
type MockPD struct {
pd.Client
ts int64
}

// GetTS implements the PD interface.
func (p *MockPD) GetTS(_ context.Context) (int64, int64, error) {
if p.ts != 0 {
return p.ts, p.ts, nil
}
return math.MaxInt64, math.MaxInt64, nil
}

// nolint:revive
// In test it is ok move the ctx to the second parameter.
func CreateManagerWithMemEngine(
t *testing.T,
ctx context.Context,
changefeedID model.ChangeFeedID,
changefeedInfo *model.ChangeFeedInfo,
errChan chan error,
) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine) {
handleError := func(err error) {
if err != nil && errors.Cause(err) != context.Canceled {
select {
case errChan <- err:
case <-ctx.Done():
}
}
}

sortEngine := memory.New(ctx)
up := upstream.NewUpstream4Test(&MockPD{})
mg := &entry.MockMountGroup{}
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}

sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
go func() { handleError(sourceManager.Run(ctx)) }()
sourceManager.WaitForReady(ctx)

sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, nil, sourceManager, false)
go func() { handleError(sinkManager.Run(ctx)) }()
sinkManager.WaitForReady(ctx)

return sinkManager, sourceManager, sortEngine
}

// nolint:revive
// In test it is ok move the ctx to the second parameter.
func NewManagerWithMemEngine(
t *testing.T,
changefeedID model.ChangeFeedID,
changefeedInfo *model.ChangeFeedInfo,
redoMgr redo.DMLManager,
) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine) {
sortEngine := memory.New(context.Background())
up := upstream.NewUpstream4Test(&MockPD{})
mg := &entry.MockMountGroup{}
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}
sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
sinkManager := New(changefeedID, changefeedInfo.SinkURI,
changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager, false)
return sinkManager, sourceManager, sortEngine
}
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
<<<<<<< HEAD
x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e)
usedMemSize += size
rows = append(rows, x...)
Expand All @@ -290,6 +291,10 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
} else {
cachedSize -= brokenSize
}
=======
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
}

if err := maybeEmitBatchEvents(false, pos.Valid()); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,16 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
<<<<<<< HEAD
x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e)
events = append(events, x...)
allEventSize += size
usedMem += size
pendingTxnSize += size
=======
x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
}

if err := maybeEmitAndAdvance(false, pos.Valid()); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
<<<<<<< HEAD
zap.Int64("tableID", t.tableID),
=======
zap.Stringer("span", &t.span),
>>>>>>> e3412d9675 (puller(ticdc): fix wrong update splitting behavior after table scheduling (#11296))
zap.Uint64("replicateTs", ts))
return nil
}
Expand Down
Loading

0 comments on commit 631a748

Please sign in to comment.