Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor,scheduler(ticdc): clean up unused method and metrics (#8049) #8699

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,6 @@ import (
)

var (
resolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts",
Help: "local resolved ts of processor",
}, []string{"namespace", "changefeed"})
resolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts_lag",
Help: "local resolved ts lag of processor",
}, []string{"namespace", "changefeed"})
resolvedTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_resolved_table_id",
Help: "ID of the minimum resolved table",
}, []string{"namespace", "changefeed"})
checkpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts",
Help: "global checkpoint ts of processor",
}, []string{"namespace", "changefeed"})
checkpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts_lag",
Help: "global checkpoint ts lag of processor",
}, []string{"namespace", "changefeed"})
checkpointTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_checkpoint_table_id",
Help: "ID of the minimum checkpoint table",
}, []string{"namespace", "changefeed"})
syncTableNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -129,12 +87,6 @@ var (

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(resolvedTsGauge)
registry.MustRegister(resolvedTsLagGauge)
registry.MustRegister(resolvedTsMinTableIDGauge)
registry.MustRegister(checkpointTsGauge)
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(checkpointTsMinTableIDGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
Expand Down
128 changes: 11 additions & 117 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io"
"math"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -92,25 +91,17 @@ type processor struct {
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepb.TablePipeline, error)
newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error)

liveness *model.Liveness
agent scheduler.Agent
checkpointTs model.Ts
resolvedTs model.Ts

metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricMinResolvedTableIDGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricMinCheckpointTableIDGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
metricsTableSinkTotalRows prometheus.Counter
metricsTableMemoryHistogram prometheus.Observer
metricsProcessorMemoryGauge prometheus.Gauge
metricRemainKVEventGauge prometheus.Gauge
liveness *model.Liveness
agent scheduler.Agent

metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
metricsTableSinkTotalRows prometheus.Counter
metricsTableMemoryHistogram prometheus.Observer
metricsProcessorMemoryGauge prometheus.Gauge
metricRemainKVEventGauge prometheus.Gauge
}

// checkReadyForMessages checks whether all necessary Etcd keys have been established.
Expand Down Expand Up @@ -279,9 +270,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
return false
}

localResolvedTs := p.resolvedTs
globalResolvedTs := p.changefeed.Status.ResolvedTs
localCheckpointTs := p.agent.GetLastSentCheckpointTs()
globalCheckpointTs := p.changefeed.Status.CheckpointTs

var tableResolvedTs, tableCheckpointTs uint64
Expand Down Expand Up @@ -328,10 +317,8 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("tableCheckpointTs", tableCheckpointTs),
zap.Uint64("localCheckpointTs", localCheckpointTs),
zap.Uint64("globalCheckpointTs", globalCheckpointTs),
zap.Any("state", state),
zap.Bool("isPrepare", isPrepare))
Expand All @@ -344,10 +331,8 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("tableCheckpointTs", tableCheckpointTs),
zap.Uint64("localCheckpointTs", localCheckpointTs),
zap.Uint64("globalCheckpointTs", globalCheckpointTs),
zap.Any("state", state),
zap.Bool("isPrepare", isPrepare))
Expand Down Expand Up @@ -442,11 +427,6 @@ func (p *processor) GetAllCurrentTables() []model.TableID {
return ret
}

// GetCheckpoint implements TableExecutor interface.
func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
return p.checkpointTs, p.resolvedTs
}

// GetTableStatus implements TableExecutor interface
func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus {
if p.pullBasedSinking {
Expand Down Expand Up @@ -549,18 +529,6 @@ func newProcessor(
cancel: func() {},
liveness: liveness,

metricResolvedTsGauge: resolvedTsGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolvedTsLagGauge: resolvedTsLagGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricMinResolvedTableIDGauge: resolvedTsMinTableIDGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsGauge: checkpointTsGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsLagGauge: checkpointTsLagGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricMinCheckpointTableIDGauge: checkpointTsMinTableIDGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricSyncTableNumGauge: syncTableNumGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricProcessorErrorCounter: processorErrorCounter.
Expand Down Expand Up @@ -709,10 +677,6 @@ func (p *processor) tick(ctx cdcContext.Context) error {
return errors.Trace(err)
}
p.pushResolvedTs2Table()
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := p.upstream.PDClock.CurrentTime()
p.handlePosition(oracle.GetPhysical(pdTime))

p.doGCSchemaStorage()
if err := p.agent.Tick(ctx); err != nil {
Expand Down Expand Up @@ -1047,68 +1011,6 @@ func (p *processor) sendError(err error) {
}
}

// handlePosition calculates the local resolved ts and local checkpoint ts.
// resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs).
// table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs.
// checkpointTs = min(resolvedTs, all table's checkpointTs).
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
minResolvedTableID := int64(0)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
}
minCheckpointTs := minResolvedTs
minCheckpointTableID := int64(0)
if p.pullBasedSinking {
tableIDs := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tableIDs {
stats := p.sinkManager.GetTableStats(tableID)
log.Debug("sink manager gets table stats",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("stats", stats))
if stats.ResolvedTs < minResolvedTs {
minResolvedTs = stats.ResolvedTs
minResolvedTableID = tableID
}
if stats.CheckpointTs < minCheckpointTs {
minCheckpointTs = stats.CheckpointTs
minCheckpointTableID = tableID
}
}
} else {
for _, table := range p.tables {
ts := table.ResolvedTs()
if ts < minResolvedTs {
minResolvedTs = ts
minResolvedTableID = table.ID()
}
}

for _, table := range p.tables {
ts := table.CheckpointTs()
if ts < minCheckpointTs {
minCheckpointTs = ts
minCheckpointTableID = table.ID()
}
}
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))
p.metricMinResolvedTableIDGauge.Set(float64(minResolvedTableID))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))
p.metricMinCheckpointTableIDGauge.Set(float64(minCheckpointTableID))

p.checkpointTs = minCheckpointTs
p.resolvedTs = minResolvedTs
}

// pushResolvedTs2Table sends global resolved ts to all the table pipelines.
func (p *processor) pushResolvedTs2Table() {
resolvedTs := p.changefeed.Status.ResolvedTs
Expand Down Expand Up @@ -1396,14 +1298,6 @@ func (p *processor) Close(ctx cdcContext.Context) error {
}

func (p *processor) cleanupMetrics() {
resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

syncTableNumGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorErrorCounter.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
Expand Down
53 changes: 5 additions & 48 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,18 @@ type mockAgent struct {
// dummy to satisfy the interface
scheduler.Agent

executor scheduler.TableExecutor
lastCheckpointTs model.Ts
liveness *model.Liveness
isClosed bool
executor scheduler.TableExecutor
liveness *model.Liveness
isClosed bool
}

func (a *mockAgent) Tick(_ context.Context) error {
if len(a.executor.GetAllCurrentTables()) == 0 {
return nil
}
a.lastCheckpointTs, _ = a.executor.GetCheckpoint()
return nil
}

func (a *mockAgent) GetLastSentCheckpointTs() (checkpointTs model.Ts) {
return a.lastCheckpointTs
}

func (a *mockAgent) Close() error {
a.isClosed = true
return nil
Expand Down Expand Up @@ -298,9 +292,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {

require.Len(t, p.tables, 1)

checkpointTs := p.agent.GetLastSentCheckpointTs()
require.Equal(t, checkpointTs, model.Ts(0))

done := p.IsAddTableFinished(1, true)
require.False(t, done)
require.Equal(t, tablepb.TableStatePreparing, table1.State())
Expand All @@ -316,10 +307,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, done)
require.Equal(t, tablepb.TableStatePrepared, table1.State())

// no table is `replicating`
checkpointTs = p.agent.GetLastSentCheckpointTs()
require.Equal(t, checkpointTs, model.Ts(20))

ok, err = p.AddTable(ctx, 1, 30, true)
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -340,9 +327,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, done)
require.Equal(t, tablepb.TableStateReplicating, table1.State())

checkpointTs = p.agent.GetLastSentCheckpointTs()
require.Equal(t, table1.CheckpointTs(), checkpointTs)

err = p.Close(ctx)
require.Nil(t, err)
require.Nil(t, p.agent)
Expand Down Expand Up @@ -439,15 +423,10 @@ func TestProcessorClose(t *testing.T) {
return status, true, nil
})
tester.MustApplyPatches()
p.tables[1].(*mockTablePipeline).resolvedTs = 110
p.tables[2].(*mockTablePipeline).resolvedTs = 90
p.tables[1].(*mockTablePipeline).checkpointTs = 90
p.tables[2].(*mockTablePipeline).checkpointTs = 95

err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.EqualValues(t, p.checkpointTs, 90)
require.EqualValues(t, p.resolvedTs, 90)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)

require.Nil(t, p.Close(ctx))
Expand Down Expand Up @@ -506,22 +485,6 @@ func TestPositionDeleted(t *testing.T) {
require.Nil(t, err)
tester.MustApplyPatches()

table1 := p.tables[1].(*mockTablePipeline)
table2 := p.tables[2].(*mockTablePipeline)

table1.resolvedTs += 1
table2.resolvedTs += 1

table1.checkpointTs += 1
table2.checkpointTs += 1

// cal position
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

require.Equal(t, model.Ts(31), p.checkpointTs)
require.Equal(t, model.Ts(31), p.resolvedTs)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)

// some others delete the task position
Expand All @@ -530,18 +493,12 @@ func TestPositionDeleted(t *testing.T) {
return nil, true, nil
})
tester.MustApplyPatches()

// position created again
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.Equal(t, &model.TaskPosition{}, p.changefeed.TaskPositions[p.captureInfo.ID])

// cal position
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.Equal(t, model.Ts(31), p.checkpointTs)
require.Equal(t, model.Ts(31), p.resolvedTs)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)
}

Expand Down
Loading