diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 78c8f897be4..6f073a2243f 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -131,50 +131,70 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange } func (c *SubChangeFeed) Start(ctx context.Context) error { - errg, ctx := errgroup.WithContext(ctx) + errCh := make(chan error, 1) - errg.Go(func() error { - ddlSpan := util.Span{ - Start: []byte{'m'}, - End: []byte{'m' + 1}, - } - return c.startOnSpan(ctx, ddlSpan) - }) + ddlSpan := util.Span{ + Start: []byte{'m'}, + End: []byte{'m' + 1}, + } + ddlPuller := c.startOnSpan(ctx, ddlSpan, errCh) - errg.Go(func() error { - tblSpan := util.Span{ - Start: []byte{'t'}, - End: []byte{'t' + 1}, + tblSpan := util.Span{ + Start: []byte{'t'}, + End: []byte{'t' + 1}, + } + dmlPuller := c.startOnSpan(ctx, tblSpan, errCh) + + // TODO: Set up a way to notify the pullers of new resolved ts + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-errCh: + return e + case <-time.After(10 * time.Millisecond): + ts := c.GetResolvedTs(ddlPuller, dmlPuller) + log.Info("Min ResolvedTs", zap.Uint64("ts", ts)) } - return c.startOnSpan(ctx, tblSpan) - }) + } +} - return errg.Wait() +func (c *SubChangeFeed) GetResolvedTs(pullers ...*Puller) uint64 { + minResolvedTs := pullers[0].GetResolvedTs() + for _, p := range pullers[1:] { + ts := p.GetResolvedTs() + if ts < minResolvedTs { + minResolvedTs = ts + } + } + return minResolvedTs } -func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span) error { +func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh chan<- error) *Puller { + // Set it up so that one failed goroutine cancels all others sharing the same ctx + errg, ctx := errgroup.WithContext(ctx) + checkpointTS := c.detail.CheckpointTS if checkpointTS == 0 { checkpointTS = oracle.EncodeTSO(c.detail.CreateTime.Unix() * 1000) } - ctx, cancel := context.WithCancel(ctx) + puller := NewPuller(c.pdCli, checkpointTS, []util.Span{span}, c.detail) - buf := MakeBuffer() - puller := NewPuller(c.pdCli, checkpointTS, c.watchs, c.detail, buf) + errg.Go(func() error { + return puller.Run(ctx) + }) + + errg.Go(func() error { + return puller.CollectRawTxns(ctx, c.writeToSink) + }) go func() { - err := puller.Run(ctx) - if err != nil { - cancel() - log.Error("Puller run", zap.Any("span", span)) - } + err := errg.Wait() + errCh <- err }() - spanFrontier := makeSpanFrontier(span) - - err := collectRawTxns(ctx, buf.Get, c.writeToSink, spanFrontier) - return err + return puller } func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error { diff --git a/cdc/puller.go b/cdc/puller.go index dade4e7b123..77547f3136a 100644 --- a/cdc/puller.go +++ b/cdc/puller.go @@ -30,6 +30,7 @@ type Puller struct { spans []util.Span detail ChangeFeedDetail buf Buffer + tsTracker resolveTsTracker } // NewPuller create a new Puller fetch event start from checkpointTS @@ -40,19 +41,23 @@ func NewPuller( spans []util.Span, // useless now detail ChangeFeedDetail, - buf Buffer, ) *Puller { p := &Puller{ pdCli: pdCli, checkpointTS: checkpointTS, spans: spans, detail: detail, - buf: buf, + buf: MakeBuffer(), + tsTracker: makeSpanFrontier(spans...), } return p } +func (p *Puller) Output() Buffer { + return p.buf +} + // Run the puller, continually fetch event from TiKV and add event into buffer func (p *Puller) Run(ctx context.Context) error { // TODO pull from tikv and push into buf @@ -105,3 +110,11 @@ func (p *Puller) Run(ctx context.Context) error { return g.Wait() } + +func (p *Puller) GetResolvedTs() uint64 { + return p.tsTracker.Frontier() +} + +func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, RawTxn) error) error { + return collectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker) +} diff --git a/cdc/txn.go b/cdc/txn.go index e89103d82e8..ed4dabaa462 100644 --- a/cdc/txn.go +++ b/cdc/txn.go @@ -92,6 +92,7 @@ func (t Txn) IsDDL() bool { type resolveTsTracker interface { Forward(span util.Span, ts uint64) bool + Frontier() uint64 } func collectRawTxns( diff --git a/cdc/txn_test.go b/cdc/txn_test.go index bb975c0863a..4ee54b1b5b8 100644 --- a/cdc/txn_test.go +++ b/cdc/txn_test.go @@ -50,6 +50,10 @@ func (t *mockTracker) Forward(span util.Span, ts uint64) bool { return true } +func (t *mockTracker) Frontier() uint64 { + return 1 +} + var _ = check.Suite(&CollectRawTxnsSuite{}) func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { diff --git a/cmd/debug.go b/cmd/debug.go index 2dd2bc932aa..61e923e224a 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -34,11 +34,11 @@ var pullCmd = &cobra.Command{ return } - buf := cdc.MakeBuffer() ts := oracle.ComposeTS(time.Now().Unix()*1000, 0) detail := cdc.ChangeFeedDetail{} - p := cdc.NewPuller(cli, ts, []util.Span{{Start: nil, End: nil}}, detail, buf) + p := cdc.NewPuller(cli, ts, []util.Span{{Start: nil, End: nil}}, detail) + buf := p.Output() g, ctx := errgroup.WithContext(context.Background())