Skip to content

Commit

Permalink
Add support for deciding the min resolvedTs of all pullers (pingcap#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored Oct 16, 2019
1 parent 0262e07 commit 28a1008
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 32 deletions.
76 changes: 48 additions & 28 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,50 +131,70 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange
}

func (c *SubChangeFeed) Start(ctx context.Context) error {
errg, ctx := errgroup.WithContext(ctx)
errCh := make(chan error, 1)

errg.Go(func() error {
ddlSpan := util.Span{
Start: []byte{'m'},
End: []byte{'m' + 1},
}
return c.startOnSpan(ctx, ddlSpan)
})
ddlSpan := util.Span{
Start: []byte{'m'},
End: []byte{'m' + 1},
}
ddlPuller := c.startOnSpan(ctx, ddlSpan, errCh)

errg.Go(func() error {
tblSpan := util.Span{
Start: []byte{'t'},
End: []byte{'t' + 1},
tblSpan := util.Span{
Start: []byte{'t'},
End: []byte{'t' + 1},
}
dmlPuller := c.startOnSpan(ctx, tblSpan, errCh)

// TODO: Set up a way to notify the pullers of new resolved ts
for {
select {
case <-ctx.Done():
return ctx.Err()
case e := <-errCh:
return e
case <-time.After(10 * time.Millisecond):
ts := c.GetResolvedTs(ddlPuller, dmlPuller)
log.Info("Min ResolvedTs", zap.Uint64("ts", ts))
}
return c.startOnSpan(ctx, tblSpan)
})
}
}

return errg.Wait()
func (c *SubChangeFeed) GetResolvedTs(pullers ...*Puller) uint64 {
minResolvedTs := pullers[0].GetResolvedTs()
for _, p := range pullers[1:] {
ts := p.GetResolvedTs()
if ts < minResolvedTs {
minResolvedTs = ts
}
}
return minResolvedTs
}

func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span) error {
func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh chan<- error) *Puller {
// Set it up so that one failed goroutine cancels all others sharing the same ctx
errg, ctx := errgroup.WithContext(ctx)

checkpointTS := c.detail.CheckpointTS
if checkpointTS == 0 {
checkpointTS = oracle.EncodeTSO(c.detail.CreateTime.Unix() * 1000)
}

ctx, cancel := context.WithCancel(ctx)
puller := NewPuller(c.pdCli, checkpointTS, []util.Span{span}, c.detail)

buf := MakeBuffer()
puller := NewPuller(c.pdCli, checkpointTS, c.watchs, c.detail, buf)
errg.Go(func() error {
return puller.Run(ctx)
})

errg.Go(func() error {
return puller.CollectRawTxns(ctx, c.writeToSink)
})

go func() {
err := puller.Run(ctx)
if err != nil {
cancel()
log.Error("Puller run", zap.Any("span", span))
}
err := errg.Wait()
errCh <- err
}()

spanFrontier := makeSpanFrontier(span)

err := collectRawTxns(ctx, buf.Get, c.writeToSink, spanFrontier)
return err
return puller
}

func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error {
Expand Down
17 changes: 15 additions & 2 deletions cdc/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Puller struct {
spans []util.Span
detail ChangeFeedDetail
buf Buffer
tsTracker resolveTsTracker
}

// NewPuller create a new Puller fetch event start from checkpointTS
Expand All @@ -40,19 +41,23 @@ func NewPuller(
spans []util.Span,
// useless now
detail ChangeFeedDetail,
buf Buffer,
) *Puller {
p := &Puller{
pdCli: pdCli,
checkpointTS: checkpointTS,
spans: spans,
detail: detail,
buf: buf,
buf: MakeBuffer(),
tsTracker: makeSpanFrontier(spans...),
}

return p
}

func (p *Puller) Output() Buffer {
return p.buf
}

// Run the puller, continually fetch event from TiKV and add event into buffer
func (p *Puller) Run(ctx context.Context) error {
// TODO pull from tikv and push into buf
Expand Down Expand Up @@ -105,3 +110,11 @@ func (p *Puller) Run(ctx context.Context) error {

return g.Wait()
}

func (p *Puller) GetResolvedTs() uint64 {
return p.tsTracker.Frontier()
}

func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, RawTxn) error) error {
return collectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker)
}
1 change: 1 addition & 0 deletions cdc/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (t Txn) IsDDL() bool {

type resolveTsTracker interface {
Forward(span util.Span, ts uint64) bool
Frontier() uint64
}

func collectRawTxns(
Expand Down
4 changes: 4 additions & 0 deletions cdc/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (t *mockTracker) Forward(span util.Span, ts uint64) bool {
return true
}

func (t *mockTracker) Frontier() uint64 {
return 1
}

var _ = check.Suite(&CollectRawTxnsSuite{})

func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ var pullCmd = &cobra.Command{
return
}

buf := cdc.MakeBuffer()
ts := oracle.ComposeTS(time.Now().Unix()*1000, 0)
detail := cdc.ChangeFeedDetail{}

p := cdc.NewPuller(cli, ts, []util.Span{{Start: nil, End: nil}}, detail, buf)
p := cdc.NewPuller(cli, ts, []util.Span{{Start: nil, End: nil}}, detail)
buf := p.Output()

g, ctx := errgroup.WithContext(context.Background())

Expand Down

0 comments on commit 28a1008

Please sign in to comment.