Skip to content

Commit

Permalink
*: fix the output in changefeed out of order (#1247)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Jan 21, 2021
1 parent 0074760 commit 5c62e67
Show file tree
Hide file tree
Showing 15 changed files with 1,120 additions and 863 deletions.
18 changes: 12 additions & 6 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/entry"
Expand Down Expand Up @@ -364,6 +363,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
}
c.taskStatus[captureID] = newStatus.Clone()
log.Info("dispatch table success", zap.String("capture-id", captureID), zap.Stringer("status", newStatus))
failpoint.Inject("OwnerRemoveTableError", func() {
if len(cleanedTables) > 0 {
failpoint.Return(errors.New("failpoint injected error"))
}
})
}

for tableID := range cleanedTables {
Expand Down Expand Up @@ -514,28 +518,30 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
log.Warn("ignored the move job, the source capture is not found", zap.Reflect("job", job))
continue
}
replicaInfo, exist := status.RemoveTable(tableID, c.status.CheckpointTs)
// To ensure that the replication pipeline stops exactly at the boundary TS,
// The boundary TS specified by Remove Table Operation MUST greater or equal to the checkpoint TS of this table.
// So the global resolved TS is a reasonable values.
replicaInfo, exist := status.RemoveTable(tableID, c.status.ResolvedTs)
if !exist {
delete(c.moveTableJobs, tableID)
log.Warn("ignored the move job, the table is not exist in the source capture", zap.Reflect("job", job))
continue
}
replicaInfo.StartTs = c.status.CheckpointTs
replicaInfo.StartTs = c.status.ResolvedTs
job.TableReplicaInfo = replicaInfo
job.Status = model.MoveTableStatusDeleted
log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job))
case model.MoveTableStatusDeleted:
// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
replicaInfo.StartTs = c.status.CheckpointTs
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = replicaInfo.StartTs
log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job))
continue
}
status.AddTable(tableID, replicaInfo, c.status.CheckpointTs)
status.AddTable(tableID, replicaInfo, replicaInfo.StartTs)
job.Status = model.MoveTableStatusFinished
delete(c.moveTableJobs, tableID)
log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job))
Expand Down
6 changes: 0 additions & 6 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package cdc

import (
"time"

"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/puller"
Expand All @@ -24,10 +22,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const (
defaultMetricInterval = time.Second * 15
)

var registry = prometheus.NewRegistry()

func init() {
Expand Down
17 changes: 0 additions & 17 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,13 @@ var (
Help: "The time it took to update sub changefeed info.",
Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 18),
}, []string{"capture"})
tableOutputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "txn_output_chan_size",
Help: "size of row changed event output channel from table to processor",
}, []string{"changefeed", "capture"})
processorErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
sinkFlushRowChangedDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "flush_event_duration_seconds",
Help: "Bucketed histogram of processing time (s) of flushing events in processor",
Buckets: prometheus.ExponentialBuckets(0.002 /* 2ms */, 2, 20),
}, []string{"changefeed", "capture"})
)

// initProcessorMetrics registers all metrics used in processor
Expand All @@ -109,7 +94,5 @@ func initProcessorMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(updateInfoDuration)
registry.MustRegister(tableOutputChanSizeGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(sinkFlushRowChangedDuration)
}
Loading

0 comments on commit 5c62e67

Please sign in to comment.