Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix the output in changefeed out of order #1247

Merged
merged 53 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
a05b81a
panic when adding a dying table
Dec 29, 2020
2985735
Merge branch 'master' into fix_fatal_sink
Dec 30, 2020
c05c08b
add a test
Dec 30, 2020
21ec264
add a test
Dec 30, 2020
d72853d
add a test
Dec 30, 2020
fc77ae8
add a test
Dec 30, 2020
ef86a5b
update the checkpoint when exec ddl
Dec 30, 2020
ee598c5
add a middleware before sink
Jan 5, 2021
59d0ab5
add a middleware before sink
Jan 6, 2021
a21c68b
Merge branch 'master' into fix_fatal_sink
Jan 6, 2021
8c1eb0b
fix build
Jan 6, 2021
9804ed5
fix build
Jan 6, 2021
6598b81
fix test
Jan 6, 2021
315be4a
add unit tests
Jan 6, 2021
0bfac0c
fix race
Jan 6, 2021
fb6627d
add debug code
Jan 7, 2021
3326dec
fix some bug
Jan 7, 2021
7cc0bec
fix check
Jan 7, 2021
0b2b11b
fix check
Jan 7, 2021
5d5900f
refine fmt
Jan 7, 2021
9f4b935
fix test
Jan 7, 2021
d86f674
Merge branch 'master' into fix_fatal_sink
Jan 7, 2021
3d03fb5
fix test
Jan 7, 2021
00f59c6
add debug code
Jan 7, 2021
2ccdc94
add debug code
Jan 7, 2021
a766397
Merge remote-tracking branch 'pingcap/master' into fix_fatal_sink
Jan 8, 2021
7d684de
update test
Jan 8, 2021
c49ea60
update test
Jan 8, 2021
1d51a22
fix data race
Jan 8, 2021
8ff15f1
remove debug code
Jan 8, 2021
f337bcf
update test script
Jan 8, 2021
f17c682
update test script
Jan 8, 2021
5077336
update test script
Jan 8, 2021
b38f390
update test script
Jan 8, 2021
b1c0209
update test script
Jan 8, 2021
bea1477
fix move table
Jan 8, 2021
c750ee3
fix move table
Jan 8, 2021
79cef87
accepting comments
Jan 11, 2021
e457476
add debug log
Jan 11, 2021
76e6497
fix cancel
Jan 11, 2021
d59a62c
Merge branch 'master' into fix_fatal_sink
Jan 11, 2021
f50b314
fix build
Jan 11, 2021
ad6ad57
update data-flow
Jan 12, 2021
90de5ff
update metrics
Jan 12, 2021
0c1c2ee
update dot
Jan 12, 2021
59855ed
update docs
Jan 12, 2021
c18d3f8
Merge branch 'master' into fix_fatal_sink
Jan 13, 2021
de2729b
update ticdc.json
Jan 13, 2021
75fbb46
add some test cases
Jan 14, 2021
3cea12f
fix fmt
Jan 14, 2021
0279d45
Merge branch 'master' into fix_fatal_sink
ti-srebot Jan 19, 2021
1f35ac3
Merge branch 'master' into fix_fatal_sink
Jan 20, 2021
45813e9
Merge branch 'master' into fix_fatal_sink
Jan 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/entry"
Expand Down Expand Up @@ -364,6 +363,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
}
c.taskStatus[captureID] = newStatus.Clone()
log.Info("dispatch table success", zap.String("capture-id", captureID), zap.Stringer("status", newStatus))
failpoint.Inject("OwnerRemoveTableError", func() {
if len(cleanedTables) > 0 {
failpoint.Return(errors.New("failpoint injected error"))
}
})
}

for tableID := range cleanedTables {
Expand Down Expand Up @@ -514,28 +518,30 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
log.Warn("ignored the move job, the source capture is not found", zap.Reflect("job", job))
continue
}
replicaInfo, exist := status.RemoveTable(tableID, c.status.CheckpointTs)
// To ensure that the replication pipeline stops exactly at the boundary TS,
// The boundary TS specified by Remove Table Operation MUST greater or equal to the checkpoint TS of this table.
// So the global resolved TS is a reasonable values.
replicaInfo, exist := status.RemoveTable(tableID, c.status.ResolvedTs)
if !exist {
delete(c.moveTableJobs, tableID)
log.Warn("ignored the move job, the table is not exist in the source capture", zap.Reflect("job", job))
continue
}
replicaInfo.StartTs = c.status.CheckpointTs
replicaInfo.StartTs = c.status.ResolvedTs
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
job.TableReplicaInfo = replicaInfo
job.Status = model.MoveTableStatusDeleted
log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job))
case model.MoveTableStatusDeleted:
// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
replicaInfo.StartTs = c.status.CheckpointTs
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = replicaInfo.StartTs
log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job))
continue
}
status.AddTable(tableID, replicaInfo, c.status.CheckpointTs)
status.AddTable(tableID, replicaInfo, replicaInfo.StartTs)
job.Status = model.MoveTableStatusFinished
delete(c.moveTableJobs, tableID)
log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job))
Expand Down
6 changes: 0 additions & 6 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package cdc

import (
"time"

"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/puller"
Expand All @@ -24,10 +22,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const (
defaultMetricInterval = time.Second * 15
)

var registry = prometheus.NewRegistry()

func init() {
Expand Down
17 changes: 0 additions & 17 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,13 @@ var (
Help: "The time it took to update sub changefeed info.",
Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 18),
}, []string{"capture"})
tableOutputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "txn_output_chan_size",
Help: "size of row changed event output channel from table to processor",
}, []string{"changefeed", "capture"})
zier-one marked this conversation as resolved.
Show resolved Hide resolved
processorErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
sinkFlushRowChangedDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "flush_event_duration_seconds",
Help: "Bucketed histogram of processing time (s) of flushing events in processor",
Buckets: prometheus.ExponentialBuckets(0.002 /* 2ms */, 2, 20),
}, []string{"changefeed", "capture"})
)

// initProcessorMetrics registers all metrics used in processor
Expand All @@ -109,7 +94,5 @@ func initProcessorMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(updateInfoDuration)
registry.MustRegister(tableOutputChanSizeGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(sinkFlushRowChangedDuration)
}
Loading