From 8f263c6c9357d20d7fd1b19c0b49141bad503b10 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 12 Jan 2023 16:50:33 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #8049 Signed-off-by: ti-chi-bot --- cdc/processor/metrics.go | 48 -- cdc/processor/processor.go | 69 +-- cdc/processor/processor_test.go | 39 +- cdc/scheduler/internal/agent.go | 5 - cdc/scheduler/internal/table_executor.go | 5 + cdc/scheduler/internal/v3/agent/agent.go | 6 - cdc/scheduler/internal/v3/agent/agent_test.go | 6 + metrics/alertmanager/ticdc.rules.yml | 23 - metrics/grafana/ticdc.json | 585 ++++++++---------- 9 files changed, 309 insertions(+), 477 deletions(-) diff --git a/cdc/processor/metrics.go b/cdc/processor/metrics.go index 53821eb1e41..4da36ec335b 100644 --- a/cdc/processor/metrics.go +++ b/cdc/processor/metrics.go @@ -21,48 +21,6 @@ import ( ) var ( - resolvedTsGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "resolved_ts", - Help: "local resolved ts of processor", - }, []string{"namespace", "changefeed"}) - resolvedTsLagGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "resolved_ts_lag", - Help: "local resolved ts lag of processor", - }, []string{"namespace", "changefeed"}) - resolvedTsMinTableIDGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "min_resolved_table_id", - Help: "ID of the minimum resolved table", - }, []string{"namespace", "changefeed"}) - checkpointTsGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "checkpoint_ts", - Help: "global checkpoint ts of processor", - }, []string{"namespace", "changefeed"}) - checkpointTsLagGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "checkpoint_ts_lag", - Help: "global checkpoint ts lag of processor", - }, []string{"namespace", "changefeed"}) - checkpointTsMinTableIDGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "processor", - Name: "min_checkpoint_table_id", - Help: "ID of the minimum checkpoint table", - }, []string{"namespace", "changefeed"}) syncTableNumGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -129,12 +87,6 @@ var ( // InitMetrics registers all metrics used in processor func InitMetrics(registry *prometheus.Registry) { - registry.MustRegister(resolvedTsGauge) - registry.MustRegister(resolvedTsLagGauge) - registry.MustRegister(resolvedTsMinTableIDGauge) - registry.MustRegister(checkpointTsGauge) - registry.MustRegister(checkpointTsLagGauge) - registry.MustRegister(checkpointTsMinTableIDGauge) registry.MustRegister(syncTableNumGauge) registry.MustRegister(processorErrorCounter) registry.MustRegister(processorSchemaStorageGcTsGauge) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 6fd41e7fb02..b8d23ace8fe 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "io" - "math" "strconv" "sync" "time" @@ -92,25 +91,17 @@ type processor struct { createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepb.TablePipeline, error) newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error) - liveness *model.Liveness - agent scheduler.Agent - checkpointTs model.Ts - resolvedTs model.Ts - - metricResolvedTsGauge prometheus.Gauge - metricResolvedTsLagGauge prometheus.Gauge - metricMinResolvedTableIDGauge prometheus.Gauge - metricCheckpointTsGauge prometheus.Gauge - metricCheckpointTsLagGauge prometheus.Gauge - metricMinCheckpointTableIDGauge prometheus.Gauge - metricSyncTableNumGauge prometheus.Gauge - metricSchemaStorageGcTsGauge prometheus.Gauge - metricProcessorErrorCounter prometheus.Counter - metricProcessorTickDuration prometheus.Observer - metricsTableSinkTotalRows prometheus.Counter - metricsTableMemoryHistogram prometheus.Observer - metricsProcessorMemoryGauge prometheus.Gauge - metricRemainKVEventGauge prometheus.Gauge + liveness *model.Liveness + agent scheduler.Agent + + metricSyncTableNumGauge prometheus.Gauge + metricSchemaStorageGcTsGauge prometheus.Gauge + metricProcessorErrorCounter prometheus.Counter + metricProcessorTickDuration prometheus.Observer + metricsTableSinkTotalRows prometheus.Counter + metricsTableMemoryHistogram prometheus.Observer + metricsProcessorMemoryGauge prometheus.Gauge + metricRemainKVEventGauge prometheus.Gauge } // checkReadyForMessages checks whether all necessary Etcd keys have been established. @@ -279,9 +270,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo return false } - localResolvedTs := p.resolvedTs globalResolvedTs := p.changefeed.Status.ResolvedTs - localCheckpointTs := p.agent.GetLastSentCheckpointTs() globalCheckpointTs := p.changefeed.Status.CheckpointTs var tableResolvedTs, tableCheckpointTs uint64 @@ -328,10 +317,8 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("changefeed", p.changefeedID.ID), zap.Int64("tableID", tableID), zap.Uint64("tableResolvedTs", tableResolvedTs), - zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("tableCheckpointTs", tableCheckpointTs), - zap.Uint64("localCheckpointTs", localCheckpointTs), zap.Uint64("globalCheckpointTs", globalCheckpointTs), zap.Any("state", state), zap.Bool("isPrepare", isPrepare)) @@ -344,10 +331,8 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("changefeed", p.changefeedID.ID), zap.Int64("tableID", tableID), zap.Uint64("tableResolvedTs", tableResolvedTs), - zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("tableCheckpointTs", tableCheckpointTs), - zap.Uint64("localCheckpointTs", localCheckpointTs), zap.Uint64("globalCheckpointTs", globalCheckpointTs), zap.Any("state", state), zap.Bool("isPrepare", isPrepare)) @@ -442,6 +427,7 @@ func (p *processor) GetAllCurrentTables() []model.TableID { return ret } +<<<<<<< HEAD // GetCheckpoint implements TableExecutor interface. func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { return p.checkpointTs, p.resolvedTs @@ -449,6 +435,10 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { // GetTableStatus implements TableExecutor interface func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus { +======= +// GetTableSpanStatus implements TableExecutor interface +func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus { +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) if p.pullBasedSinking { state, exist := p.sinkManager.GetTableState(tableID) if !exist { @@ -549,18 +539,6 @@ func newProcessor( cancel: func() {}, liveness: liveness, - metricResolvedTsGauge: resolvedTsGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricResolvedTsLagGauge: resolvedTsLagGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricMinResolvedTableIDGauge: resolvedTsMinTableIDGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricCheckpointTsGauge: checkpointTsGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricCheckpointTsLagGauge: checkpointTsLagGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricMinCheckpointTableIDGauge: checkpointTsMinTableIDGauge. - WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricSyncTableNumGauge: syncTableNumGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricProcessorErrorCounter: processorErrorCounter. @@ -709,10 +687,6 @@ func (p *processor) tick(ctx cdcContext.Context) error { return errors.Trace(err) } p.pushResolvedTs2Table() - // it is no need to check the error here, because we will use - // local time when an error return, which is acceptable - pdTime, _ := p.upstream.PDClock.CurrentTime() - p.handlePosition(oracle.GetPhysical(pdTime)) p.doGCSchemaStorage() if err := p.agent.Tick(ctx); err != nil { @@ -1047,6 +1021,7 @@ func (p *processor) sendError(err error) { } } +<<<<<<< HEAD // handlePosition calculates the local resolved ts and local checkpoint ts. // resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs). // table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs. @@ -1109,6 +1084,8 @@ func (p *processor) handlePosition(currentTs int64) { p.resolvedTs = minResolvedTs } +======= +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) // pushResolvedTs2Table sends global resolved ts to all the table pipelines. func (p *processor) pushResolvedTs2Table() { resolvedTs := p.changefeed.Status.ResolvedTs @@ -1396,14 +1373,6 @@ func (p *processor) Close(ctx cdcContext.Context) error { } func (p *processor) cleanupMetrics() { - resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - resolvedTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - - checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - checkpointTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - checkpointTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - syncTableNumGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) processorErrorCounter.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index d084ce77189..0bd119bf1c9 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -241,24 +241,18 @@ type mockAgent struct { // dummy to satisfy the interface scheduler.Agent - executor scheduler.TableExecutor - lastCheckpointTs model.Ts - liveness *model.Liveness - isClosed bool + executor scheduler.TableExecutor + liveness *model.Liveness + isClosed bool } func (a *mockAgent) Tick(_ context.Context) error { if len(a.executor.GetAllCurrentTables()) == 0 { return nil } - a.lastCheckpointTs, _ = a.executor.GetCheckpoint() return nil } -func (a *mockAgent) GetLastSentCheckpointTs() (checkpointTs model.Ts) { - return a.lastCheckpointTs -} - func (a *mockAgent) Close() error { a.isClosed = true return nil @@ -298,10 +292,14 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Len(t, p.tables, 1) +<<<<<<< HEAD checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) done := p.IsAddTableFinished(1, true) +======= + done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) @@ -316,11 +314,15 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) +<<<<<<< HEAD // no table is `replicating` checkpointTs = p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(20)) ok, err = p.AddTable(ctx, 1, 30, true) +======= + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true) +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) @@ -340,9 +342,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) - checkpointTs = p.agent.GetLastSentCheckpointTs() - require.Equal(t, table1.CheckpointTs(), checkpointTs) - err = p.Close(ctx) require.Nil(t, err) require.Nil(t, p.agent) @@ -439,15 +438,16 @@ func TestProcessorClose(t *testing.T) { return status, true, nil }) tester.MustApplyPatches() +<<<<<<< HEAD p.tables[1].(*mockTablePipeline).resolvedTs = 110 p.tables[2].(*mockTablePipeline).resolvedTs = 90 p.tables[1].(*mockTablePipeline).checkpointTs = 90 p.tables[2].(*mockTablePipeline).checkpointTs = 95 +======= +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() - require.EqualValues(t, p.checkpointTs, 90) - require.EqualValues(t, p.resolvedTs, 90) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) require.Nil(t, p.Close(ctx)) @@ -505,6 +505,7 @@ func TestPositionDeleted(t *testing.T) { err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() +<<<<<<< HEAD table1 := p.tables[1].(*mockTablePipeline) table2 := p.tables[2].(*mockTablePipeline) @@ -522,6 +523,8 @@ func TestPositionDeleted(t *testing.T) { require.Equal(t, model.Ts(31), p.checkpointTs) require.Equal(t, model.Ts(31), p.resolvedTs) +======= +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) // some others delete the task position @@ -530,18 +533,12 @@ func TestPositionDeleted(t *testing.T) { return nil, true, nil }) tester.MustApplyPatches() + // position created again err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() require.Equal(t, &model.TaskPosition{}, p.changefeed.TaskPositions[p.captureInfo.ID]) - - // cal position - err = p.Tick(ctx) - require.Nil(t, err) - tester.MustApplyPatches() - require.Equal(t, model.Ts(31), p.checkpointTs) - require.Equal(t, model.Ts(31), p.resolvedTs) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) } diff --git a/cdc/scheduler/internal/agent.go b/cdc/scheduler/internal/agent.go index c274862fdd7..aac1083cb30 100644 --- a/cdc/scheduler/internal/agent.go +++ b/cdc/scheduler/internal/agent.go @@ -15,8 +15,6 @@ package internal import ( "context" - - "github.com/pingcap/tiflow/cdc/model" ) // Agent is an interface for an object inside Processor that is responsible @@ -28,9 +26,6 @@ type Agent interface { // Tick is called periodically by the processor to drive the Agent's internal logic. Tick(context.Context) error - // GetLastSentCheckpointTs returns the last checkpoint-ts already sent to the Owner. - GetLastSentCheckpointTs() (checkpointTs model.Ts) - // Close closes the messenger and does the necessary cleanup. Close() error } diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index ea93574ab25..bad74d52517 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -51,6 +51,7 @@ type TableExecutor interface { // or IsRemoveTableFinished in between two calls to this method. GetAllCurrentTables() []model.TableID +<<<<<<< HEAD // GetCheckpoint returns the local checkpoint-ts and resolved-ts of // the processor. Its calculation should take into consideration all // tables that would have been returned if GetAllCurrentTables had been @@ -59,4 +60,8 @@ type TableExecutor interface { // GetTableStatus return the checkpoint and resolved ts for the given table. GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus +======= + // GetTableSpanStatus return the checkpoint and resolved ts for the given table span. + GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) } diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 8e7df11ec63..fddbd98ffe8 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -351,12 +351,6 @@ func (a *agent) handleMessageDispatchTableRequest( table.injectDispatchTableTask(task) } -// GetLastSentCheckpointTs implement agent interface -func (a *agent) GetLastSentCheckpointTs() (checkpointTs model.Ts) { - // no need to implement this. - return internal.CheckpointCannotProceed -} - // Close implement agent interface func (a *agent) Close() error { log.Debug("schedulerv3: agent closed", diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 4819387b8fd..a7c8d803c4f 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -957,6 +957,7 @@ func (e *MockTableExecutor) GetAllCurrentTables() []model.TableID { return result } +<<<<<<< HEAD // GetCheckpoint returns the last checkpoint. func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { args := e.Called() @@ -966,6 +967,11 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) // GetTableStatus implements TableExecutor interface func (e *MockTableExecutor) GetTableStatus( tableID model.TableID, collectStat bool, +======= +// GetTableSpanStatus implements TableExecutor interface +func (e *MockTableExecutor) GetTableSpanStatus( + span tablepb.Span, collectStat bool, +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) ) tablepb.TableStatus { state, ok := e.tables[tableID] if !ok { diff --git a/metrics/alertmanager/ticdc.rules.yml b/metrics/alertmanager/ticdc.rules.yml index 2462c71f878..e9ed7f59b22 100644 --- a/metrics/alertmanager/ticdc.rules.yml +++ b/metrics/alertmanager/ticdc.rules.yml @@ -37,18 +37,6 @@ groups: value: '{{ $value }}' summary: cdc processor checkpoint delay more than 10 minutes - - alert: cdc_resolvedts_high_delay - expr: ticdc_processor_resolved_ts_lag > 300 - for: 1m - labels: - env: ENV_LABELS_ENV - level: critical - expr: ticdc_processor_resolved_ts_lag > 300 - annotations: - description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' - value: '{{ $value }}' - summary: cdc processor resolved ts delay more than 5 minutes - - alert: ticdc_mounter_unmarshal_and_mount_time_more_than_1s expr: histogram_quantile(0.9, rate(ticdc_mounter_unmarshal_and_mount_bucket[1m])) * 1000 > 1000 for: 1m @@ -73,17 +61,6 @@ groups: value: '{{ $value }}' summary: cdc sink execute_duration_time_more_than_10s - - alert: cdc_processor_checkpoint_tso_no_change_for_1m - expr: changes(ticdc_processor_checkpoint_ts[1m]) < 1 - labels: - env: ENV_LABELS_ENV - level: warning - expr: changes(ticdc_processor_checkpoint_ts[1m]) < 1 - annotations: - description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' - value: '{{ $value }}' - summary: cdc processor checkpoint tso no change for 1m - - alert: ticdc_puller_entry_sorter_sort_bucket expr: histogram_quantile(0.9, rate(ticdc_puller_entry_sorter_sort_bucket{}[1m])) > 1 for: 1m diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 8542816f6ec..bb83439c993 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -204,15 +204,6 @@ "intervalFactor": 1, "legendFormat": "{{changefeed}}", "refId": "A" - }, - { - "exemplar": true, - "expr": "sum(ticdc_processor_checkpoint_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{changefeed}}-{{instance}}", - "refId": "B" } ], "thresholds": [], @@ -314,15 +305,7 @@ "interval": "", "intervalFactor": 1, "legendFormat": "{{changefeed}}-resolvedts", - "refId": "C" - }, - { - "expr": "sum(ticdc_processor_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{changefeed}}-{{instance}}-resolvedts", - "refId": "D" + "refId": "A" } ], "thresholds": [], @@ -3092,8 +3075,8 @@ ] }, "gridPos": { - "h": 4, - "w": 7, + "h": 7, + "w": 12, "x": 0, "y": 3 }, @@ -3129,224 +3112,6 @@ ], "type": "table" }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "decimals": 1, - "description": "The ID of the min resolved table of each TiCDC", - "editable": true, - "error": false, - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "fill": 0, - "fillGradient": 0, - "grid": {}, - "gridPos": { - "h": 11, - "w": 8, - "x": 7, - "y": 3 - }, - "hiddenSeries": false, - "id": 394, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": null, - "total": false, - "values": true - }, - "lines": false, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "paceLength": 10, - "percentage": false, - "pluginVersion": "7.5.11", - "pointradius": 1, - "points": true, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "ticdc_processor_min_resolved_table_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", changefeed=~\"$changefeed\"}", - "format": "time_series", - "hide": false, - "intervalFactor": 2, - "legendFormat": "{{instance}}-{{changefeed}}-table-id", - "refId": "A", - "step": 10 - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Min resolved table ID", - "tooltip": { - "msResolution": false, - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": false - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "decimals": 1, - "description": "The ID of the min checkpoint table ID of each TiCDC", - "editable": true, - "error": false, - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "fill": 0, - "fillGradient": 0, - "grid": {}, - "gridPos": { - "h": 11, - "w": 9, - "x": 15, - "y": 3 - }, - "hiddenSeries": false, - "id": 395, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": null, - "total": false, - "values": true - }, - "lines": false, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "paceLength": 10, - "percentage": false, - "pluginVersion": "7.5.11", - "pointradius": 1, - "points": true, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "ticdc_processor_min_checkpoint_table_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\", changefeed=~\"$changefeed\"}", - "format": "time_series", - "hide": false, - "intervalFactor": 2, - "legendFormat": "{{instance}}-{{changefeed}}-table-id", - "refId": "A", - "step": 10 - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Min checkpoint table ID", - "tooltip": { - "msResolution": false, - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "none", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": false - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -3368,9 +3133,9 @@ "grid": {}, "gridPos": { "h": 7, - "w": 7, - "x": 0, - "y": 7 + "w": 12, + "x": 12, + "y": 3 }, "hiddenSeries": false, "id": 642, @@ -3481,7 +3246,7 @@ "h": 7, "w": 12, "x": 0, - "y": 14 + "y": 10 }, "hiddenSeries": false, "id": 86, @@ -3538,14 +3303,6 @@ "legendFormat": "{{changefeed}}", "refId": "B" }, - { - "exemplar": true, - "expr": "max(ticdc_processor_checkpoint_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance, changefeed)", - "hide": false, - "interval": "", - "legendFormat": "{{instance}}-{{changefeed}}", - "refId": "C" - }, { "exemplar": true, "expr": "max(ticdc_owner_barrier_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)", @@ -3615,7 +3372,7 @@ "h": 7, "w": 12, "x": 12, - "y": 14 + "y": 10 }, "hiddenSeries": false, "id": 512, @@ -3671,14 +3428,6 @@ "intervalFactor": 1, "legendFormat": "{{changefeed}}", "refId": "C" - }, - { - "exemplar": true, - "expr": "max(ticdc_processor_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", - "hide": false, - "interval": "", - "legendFormat": "{{instance}}-{{changefeed}}", - "refId": "B" } ], "thresholds": [], @@ -3742,7 +3491,7 @@ "h": 7, "w": 12, "x": 0, - "y": 21 + "y": 17 }, "hiddenSeries": false, "id": 3, @@ -3782,16 +3531,6 @@ "intervalFactor": 1, "legendFormat": "{{changefeed}}", "refId": "A" - }, - { - "exemplar": true, - "expr": "max(ticdc_processor_checkpoint_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{instance}}-{{changefeed}}", - "refId": "B" } ], "thresholds": [], @@ -3854,7 +3593,7 @@ "h": 7, "w": 12, "x": 12, - "y": 21 + "y": 17 }, "hiddenSeries": false, "id": 513, @@ -3893,18 +3632,8 @@ "format": "time_series", "interval": "", "intervalFactor": 1, - "legendFormat": "{{changefeed}}", - "refId": "C" - }, - { - "exemplar": true, - "expr": "max(ticdc_processor_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{instance}}-{{changefeed}}", - "refId": "D" + "legendFormat": "{{changefeed}}", + "refId": "C" } ], "thresholds": [], @@ -3965,7 +3694,7 @@ "h": 7, "w": 12, "x": 0, - "y": 28 + "y": 24 }, "hiddenSeries": false, "id": 633, @@ -4063,7 +3792,7 @@ "h": 7, "w": 12, "x": 12, - "y": 28 + "y": 24 }, "hiddenSeries": false, "id": 632, @@ -4163,7 +3892,7 @@ "h": 7, "w": 12, "x": 0, - "y": 35 + "y": 31 }, "hiddenSeries": false, "id": 603, @@ -4204,15 +3933,6 @@ "intervalFactor": 1, "legendFormat": "{{changefeed}}", "refId": "C" - }, - { - "exemplar": true, - "expr": "(max(ticdc_processor_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed) - max(ticdc_processor_checkpoint_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}) by (instance,changefeed)) / 1000 > 0", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{instance}}-{{changefeed}}", - "refId": "D" } ], "thresholds": [], @@ -4275,7 +3995,7 @@ "h": 7, "w": 12, "x": 12, - "y": 35 + "y": 31 }, "hiddenSeries": false, "id": 604, @@ -4385,7 +4105,7 @@ "h": 7, "w": 12, "x": 0, - "y": 42 + "y": 38 }, "hiddenSeries": false, "id": 606, @@ -4500,7 +4220,7 @@ "h": 7, "w": 12, "x": 12, - "y": 42 + "y": 38 }, "hiddenSeries": false, "id": 597, @@ -4612,7 +4332,7 @@ "h": 7, "w": 12, "x": 0, - "y": 49 + "y": 45 }, "heatmap": {}, "hideZeroBuckets": false, @@ -4676,7 +4396,7 @@ "h": 7, "w": 12, "x": 12, - "y": 49 + "y": 45 }, "hiddenSeries": false, "id": 164, @@ -4903,7 +4623,7 @@ "h": 7, "w": 12, "x": 0, - "y": 56 + "y": 52 }, "id": 163, "links": [], @@ -4956,7 +4676,7 @@ "h": 7, "w": 12, "x": 12, - "y": 56 + "y": 52 }, "hiddenSeries": false, "id": 514, @@ -5075,7 +4795,7 @@ "h": 7, "w": 12, "x": 0, - "y": 63 + "y": 59 }, "hiddenSeries": false, "id": 102, @@ -5178,7 +4898,7 @@ "h": 7, "w": 12, "x": 12, - "y": 63 + "y": 59 }, "hiddenSeries": false, "id": 82, @@ -6715,6 +6435,223 @@ "x": 0, "y": 4 }, +<<<<<<< HEAD +======= + "id": 717, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Changefeed memory quota", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 5 + }, + "hiddenSeries": false, + "id": 719, + "legend": { + "avg": false, + "current": false, + "max": true, + "min": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "ticdc_sinkmanager_memory_quota{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}", + "interval": "", + "legendFormat": "{{changefeed}}-{{instance}}-memory-{{type}}", + "refId": "A" + }, + { + "exemplar": true, + "expr": "ticdc_sinkmanager_redo_event_cache{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", instance=~\"$ticdc_instance\"}", + "hide": false, + "interval": "", + "legendFormat": "{{changefeed}}-{{instance}}-redo-cache", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Memory Quota", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Ratio of redo event cache hit", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 5 + }, + "hiddenSeries": false, + "id": 721, + "legend": { + "avg": false, + "current": false, + "max": true, + "min": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_sinkmanager_redo_event_cache_access{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", type=\"hit\"}[1m])) /\nsum(rate(ticdc_sinkmanager_redo_event_cache_access{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m]))", + "interval": "", + "legendFormat": "hit-ratio", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Redo Event Cache Hit Ratio", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "percentunit", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "Pull Based Sink", + "type": "row" + }, + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 5 + }, +>>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) "id": 608, "panels": [ { @@ -7247,7 +7184,7 @@ "h": 1, "w": 24, "x": 0, - "y": 5 + "y": 6 }, "id": 616, "panels": [ @@ -7790,7 +7727,7 @@ "h": 1, "w": 24, "x": 0, - "y": 6 + "y": 7 }, "id": 623, "panels": [ @@ -8938,7 +8875,7 @@ "h": 1, "w": 24, "x": 0, - "y": 7 + "y": 8 }, "id": 713, "panels": [ @@ -9143,7 +9080,7 @@ "h": 1, "w": 24, "x": 0, - "y": 7 + "y": 9 }, "id": 528, "panels": [ @@ -10603,7 +10540,7 @@ "h": 1, "w": 24, "x": 0, - "y": 8 + "y": 10 }, "id": 677, "panels": [ @@ -11289,7 +11226,7 @@ "h": 1, "w": 24, "x": 0, - "y": 9 + "y": 11 }, "id": 269, "panels": [ @@ -12856,7 +12793,7 @@ "h": 1, "w": 24, "x": 0, - "y": 10 + "y": 12 }, "id": 447, "panels": [ @@ -15045,7 +14982,7 @@ "h": 1, "w": 24, "x": 0, - "y": 11 + "y": 13 }, "id": 130, "panels": [ @@ -15578,7 +15515,7 @@ "h": 1, "w": 24, "x": 0, - "y": 12 + "y": 14 }, "id": 266, "panels": [ @@ -16564,7 +16501,7 @@ "h": 1, "w": 24, "x": 0, - "y": 13 + "y": 15 }, "id": 58, "panels": [ @@ -18694,7 +18631,7 @@ "h": 1, "w": 24, "x": 0, - "y": 14 + "y": 16 }, "id": 294, "panels": [ @@ -19446,7 +19383,7 @@ "h": 1, "w": 24, "x": 0, - "y": 15 + "y": 17 }, "id": 403, "panels": [ @@ -20281,7 +20218,7 @@ "h": 1, "w": 24, "x": 0, - "y": 16 + "y": 18 }, "id": 187, "panels": [ @@ -20896,7 +20833,7 @@ "h": 1, "w": 24, "x": 0, - "y": 17 + "y": 19 }, "id": 493, "panels": [ @@ -21940,7 +21877,7 @@ "h": 1, "w": 24, "x": 0, - "y": 18 + "y": 20 }, "id": 402, "panels": [ @@ -23659,7 +23596,7 @@ "h": 1, "w": 24, "x": 0, - "y": 19 + "y": 21 }, "id": 155, "panels": [ @@ -24294,7 +24231,7 @@ "allValue": ".*", "current": {}, "datasource": "${DS_TEST-CLUSTER}", - "definition": "label_values(ticdc_processor_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}, changefeed)", + "definition": "label_values(ticdc_processor_processor_tick_duration_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}, changefeed)", "description": null, "error": null, "hide": 0, @@ -24304,8 +24241,8 @@ "name": "changefeed", "options": [], "query": { - "query": "label_values(ticdc_processor_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}, changefeed)", - "refId": "test-cluster-changefeed-Variable-Query" + "query": "label_values(ticdc_processor_processor_tick_duration_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}, changefeed)", + "refId": "StandardVariableQuery" }, "refresh": 2, "regex": "", @@ -24514,5 +24451,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 50 + "version": 51 } From f3d99ec864d93d8a3d21a6ac8435a92bb10e9834 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Thu, 30 Mar 2023 10:22:07 +0800 Subject: [PATCH 2/3] fix conflict --- cdc/processor/processor.go | 75 ------------------- cdc/processor/processor_test.go | 42 +---------- cdc/scheduler/internal/table_executor.go | 11 --- cdc/scheduler/internal/v3/agent/agent_test.go | 12 --- 4 files changed, 1 insertion(+), 139 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b8d23ace8fe..5c854af8df9 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -427,18 +427,8 @@ func (p *processor) GetAllCurrentTables() []model.TableID { return ret } -<<<<<<< HEAD -// GetCheckpoint implements TableExecutor interface. -func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { - return p.checkpointTs, p.resolvedTs -} - // GetTableStatus implements TableExecutor interface func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus { -======= -// GetTableSpanStatus implements TableExecutor interface -func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus { ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) if p.pullBasedSinking { state, exist := p.sinkManager.GetTableState(tableID) if !exist { @@ -1021,71 +1011,6 @@ func (p *processor) sendError(err error) { } } -<<<<<<< HEAD -// handlePosition calculates the local resolved ts and local checkpoint ts. -// resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs). -// table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs. -// checkpointTs = min(resolvedTs, all table's checkpointTs). -func (p *processor) handlePosition(currentTs int64) { - minResolvedTs := uint64(math.MaxUint64) - minResolvedTableID := int64(0) - if p.schemaStorage != nil { - minResolvedTs = p.schemaStorage.ResolvedTs() - } - minCheckpointTs := minResolvedTs - minCheckpointTableID := int64(0) - if p.pullBasedSinking { - tableIDs := p.sinkManager.GetAllCurrentTableIDs() - for _, tableID := range tableIDs { - stats := p.sinkManager.GetTableStats(tableID) - log.Debug("sink manager gets table stats", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Any("stats", stats)) - if stats.ResolvedTs < minResolvedTs { - minResolvedTs = stats.ResolvedTs - minResolvedTableID = tableID - } - if stats.CheckpointTs < minCheckpointTs { - minCheckpointTs = stats.CheckpointTs - minCheckpointTableID = tableID - } - } - } else { - for _, table := range p.tables { - ts := table.ResolvedTs() - if ts < minResolvedTs { - minResolvedTs = ts - minResolvedTableID = table.ID() - } - } - - for _, table := range p.tables { - ts := table.CheckpointTs() - if ts < minCheckpointTs { - minCheckpointTs = ts - minCheckpointTableID = table.ID() - } - } - } - - resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs) - p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3) - p.metricResolvedTsGauge.Set(float64(resolvedPhyTs)) - p.metricMinResolvedTableIDGauge.Set(float64(minResolvedTableID)) - - checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs) - p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3) - p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs)) - p.metricMinCheckpointTableIDGauge.Set(float64(minCheckpointTableID)) - - p.checkpointTs = minCheckpointTs - p.resolvedTs = minResolvedTs -} - -======= ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) // pushResolvedTs2Table sends global resolved ts to all the table pipelines. func (p *processor) pushResolvedTs2Table() { resolvedTs := p.changefeed.Status.ResolvedTs diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 0bd119bf1c9..0b2290fe2be 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -292,14 +292,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Len(t, p.tables, 1) -<<<<<<< HEAD - checkpointTs := p.agent.GetLastSentCheckpointTs() - require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(1, true) -======= - done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) @@ -314,15 +307,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) -<<<<<<< HEAD - // no table is `replicating` - checkpointTs = p.agent.GetLastSentCheckpointTs() - require.Equal(t, checkpointTs, model.Ts(20)) - ok, err = p.AddTable(ctx, 1, 30, true) -======= - ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true) ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) @@ -438,13 +423,7 @@ func TestProcessorClose(t *testing.T) { return status, true, nil }) tester.MustApplyPatches() -<<<<<<< HEAD - p.tables[1].(*mockTablePipeline).resolvedTs = 110 - p.tables[2].(*mockTablePipeline).resolvedTs = 90 - p.tables[1].(*mockTablePipeline).checkpointTs = 90 - p.tables[2].(*mockTablePipeline).checkpointTs = 95 -======= ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) + err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() @@ -505,26 +484,7 @@ func TestPositionDeleted(t *testing.T) { err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() -<<<<<<< HEAD - - table1 := p.tables[1].(*mockTablePipeline) - table2 := p.tables[2].(*mockTablePipeline) - - table1.resolvedTs += 1 - table2.resolvedTs += 1 - - table1.checkpointTs += 1 - table2.checkpointTs += 1 - - // cal position - err = p.Tick(ctx) - require.Nil(t, err) - tester.MustApplyPatches() - require.Equal(t, model.Ts(31), p.checkpointTs) - require.Equal(t, model.Ts(31), p.resolvedTs) -======= ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) // some others delete the task position diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index bad74d52517..8468a50c878 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -51,17 +51,6 @@ type TableExecutor interface { // or IsRemoveTableFinished in between two calls to this method. GetAllCurrentTables() []model.TableID -<<<<<<< HEAD - // GetCheckpoint returns the local checkpoint-ts and resolved-ts of - // the processor. Its calculation should take into consideration all - // tables that would have been returned if GetAllCurrentTables had been - // called immediately before. - GetCheckpoint() (checkpointTs, resolvedTs model.Ts) - // GetTableStatus return the checkpoint and resolved ts for the given table. GetTableStatus(tableID model.TableID, collectStat bool) tablepb.TableStatus -======= - // GetTableSpanStatus return the checkpoint and resolved ts for the given table span. - GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) } diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index a7c8d803c4f..fa8a2bc4d21 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -957,21 +957,9 @@ func (e *MockTableExecutor) GetAllCurrentTables() []model.TableID { return result } -<<<<<<< HEAD -// GetCheckpoint returns the last checkpoint. -func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { - args := e.Called() - return args.Get(0).(model.Ts), args.Get(1).(model.Ts) -} - // GetTableStatus implements TableExecutor interface func (e *MockTableExecutor) GetTableStatus( tableID model.TableID, collectStat bool, -======= -// GetTableSpanStatus implements TableExecutor interface -func (e *MockTableExecutor) GetTableSpanStatus( - span tablepb.Span, collectStat bool, ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) ) tablepb.TableStatus { state, ok := e.tables[tableID] if !ok { From 6ccfc5db673c3165389966662d43940b60cce81a Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Thu, 30 Mar 2023 11:00:41 +0800 Subject: [PATCH 3/3] fix conflict --- metrics/grafana/ticdc.json | 3 --- 1 file changed, 3 deletions(-) diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index bb83439c993..5dfd6079403 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -6435,8 +6435,6 @@ "x": 0, "y": 4 }, -<<<<<<< HEAD -======= "id": 717, "panels": [ { @@ -6651,7 +6649,6 @@ "x": 0, "y": 5 }, ->>>>>>> a7600c4f08 (processor,scheduler(ticdc): clean up unused method and metrics (#8049)) "id": 608, "panels": [ {