Skip to content

Commit

Permalink
support file sort in puller
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Apr 20, 2020
1 parent 8dfba2b commit 75bc957
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 36 deletions.
11 changes: 11 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
)

// SortEngine is the sorter engine
type SortEngine string

// sort engines
const (
SortInMemory SortEngine = "memory"
SortInFile SortEngine = "file"
)

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
SinkURI string `json:"sink-uri"`
Expand All @@ -35,6 +44,8 @@ type ChangeFeedInfo struct {
TargetTs uint64 `json:"target-ts"`
// used for admin job notification, trigger watch event in capture
AdminJobType AdminJobType `json:"admin-job-type"`
Engine SortEngine `json:"sort-engine"`
SortDir string `json:"sort-dir"`

Config *util.ReplicaConfig `json:"config"`
}
Expand Down
24 changes: 13 additions & 11 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@

package model

import "context"
import (
"context"
)

// PolymorphicEvent describes a event can be in multiple states
type PolymorphicEvent struct {
Ts uint64
RawKV *RawKVEntry
Row *RowChangedEvent
finished chan struct{}
Ts uint64 `json:"t"`
RawKV *RawKVEntry `json:"-"`
Row *RowChangedEvent `json:"r"`
Finished chan struct{} `json:"-"`
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV
Expand All @@ -31,7 +33,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
return &PolymorphicEvent{
Ts: rawKV.Ts,
RawKV: rawKV,
finished: make(chan struct{}),
Finished: make(chan struct{}),
}
}

Expand All @@ -41,25 +43,25 @@ func NewResolvedPolymorphicEvent(ts uint64) *PolymorphicEvent {
Ts: ts,
RawKV: &RawKVEntry{Ts: ts, OpType: OpTypeResolved},
Row: &RowChangedEvent{Ts: ts, Resolved: true},
finished: nil,
Finished: nil,
}
}

// PrepareFinished marks the prepare process is finished
// In prepare process, Mounter will translate raw KV to row data
func (e *PolymorphicEvent) PrepareFinished() {
if e.finished != nil {
close(e.finished)
if e.Finished != nil {
close(e.Finished)
}
}

// WaitPrepare waits for prepare process finished
func (e *PolymorphicEvent) WaitPrepare(ctx context.Context) error {
if e.finished != nil {
if e.Finished != nil {
select {
case <-ctx.Done():
return ctx.Err()
case <-e.finished:
case <-e.Finished:
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import (

// RowChangedEvent represents a row changed event
type RowChangedEvent struct {
Ts uint64
Resolved bool
Ts uint64 `json:"t"`
Resolved bool `json:"r"`

Schema string
Table string
Schema string `json:"s"`
Table string `json:"a"`

Delete bool
Delete bool `json:"d"`

// if the table of this row only has one unique index(includes primary key),
// IndieMarkCol will be set to the name of the unique index
IndieMarkCol string
Columns map[string]*Column
IndieMarkCol string `json:"i"`
Columns map[string]*Column `json:"c"`
}

// ToMqMessage transforms to message key and value
Expand Down
23 changes: 16 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,16 +636,25 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
// The key in DML kv pair returned from TiKV is not memcompariable encoded,
// so we set `needEncode` to true.
span := util.GetTableSpan(tableID, true)
sorter := puller.NewEntrySorter()
puller := puller.NewPuller(p.pdCli, p.kvStorage, startTs, []util.Span{span}, true, p.limitter)

plr := puller.NewPuller(p.pdCli, p.kvStorage, startTs, []util.Span{span}, true, p.limitter)
go func() {
err := puller.Run(ctx)
err := plr.Run(ctx)
if errors.Cause(err) != context.Canceled {
p.errCh <- err
}
}()

var sorter puller.EventSorter
switch p.changefeed.Engine {
case model.SortInMemory:
sorter = puller.NewEntrySorter()
case model.SortInFile:
sorter = puller.NewFileSorter(p.changefeed.SortDir)
default:
p.errCh <- errors.Errorf("unknown sort engine %s", p.changefeed.Engine)
return
}

go func() {
err := sorter.Run(ctx)
if errors.Cause(err) != context.Canceled {
Expand All @@ -661,12 +670,12 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
p.errCh <- ctx.Err()
}
return
case rawKV := <-puller.Output():
case rawKV := <-plr.Output():
if rawKV == nil {
continue
}
pEvent := model.NewPolymorphicEvent(rawKV)
sorter.AddEntry(pEvent)
sorter.AddEntry(ctx, pEvent)
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
Expand All @@ -679,7 +688,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
if pEvent == nil {
continue
}
if pEvent.RawKV.OpType == model.OpTypeResolved {
if pEvent.RawKV != nil && pEvent.RawKV.OpType == model.OpTypeResolved {
table.storeResolvedTS(pEvent.Ts)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (es *EntrySorter) Run(ctx context.Context) error {
}

// AddEntry adds an RawKVEntry to the EntryGroup
func (es *EntrySorter) AddEntry(entry *model.PolymorphicEvent) {
func (es *EntrySorter) AddEntry(ctx context.Context, entry *model.PolymorphicEvent) {
if atomic.LoadInt32(&es.closed) != 0 {
return
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func SortOutput(ctx context.Context, input <-chan *model.RawKVEntry) <-chan *mod
if rawKV == nil {
continue
}
sorter.AddEntry(model.NewPolymorphicEvent(rawKV))
sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawKV))
case sorted := <-sorter.Output():
if sorted != nil {
output(sorted.RawKV)
Expand Down
16 changes: 8 additions & 8 deletions cdc/puller/entry_sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (s *mockEntrySorterSuite) TestEntrySorter(c *check.C) {
}()
for _, tc := range testCases {
for _, entry := range tc.input {
es.AddEntry(model.NewPolymorphicEvent(entry))
es.AddEntry(ctx, model.NewPolymorphicEvent(entry))
}
es.AddEntry(model.NewResolvedPolymorphicEvent(tc.resolvedTs))
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(tc.resolvedTs))
for i := 0; i < len(tc.expect); i++ {
e := <-es.Output()
c.Check(e.RawKV, check.DeepEquals, tc.expect[i])
Expand Down Expand Up @@ -134,11 +134,11 @@ func (s *mockEntrySorterSuite) TestEntrySorterRandomly(c *check.C) {
Ts: uint64(int64(resolvedTs) + rand.Int63n(int64(maxTs-resolvedTs))),
OpType: opType,
}
es.AddEntry(model.NewPolymorphicEvent(entry))
es.AddEntry(ctx, model.NewPolymorphicEvent(entry))
}
es.AddEntry(model.NewResolvedPolymorphicEvent(resolvedTs))
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(resolvedTs))
}
es.AddEntry(model.NewResolvedPolymorphicEvent(maxTs))
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(maxTs))
}()
var lastTs uint64
var resolvedTs uint64
Expand Down Expand Up @@ -186,11 +186,11 @@ func BenchmarkSorter(b *testing.B) {
Ts: uint64(int64(resolvedTs) + rand.Int63n(1000)),
OpType: opType,
}
es.AddEntry(model.NewPolymorphicEvent(entry))
es.AddEntry(ctx, model.NewPolymorphicEvent(entry))
}
es.AddEntry(model.NewResolvedPolymorphicEvent(resolvedTs))
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(resolvedTs))
}
es.AddEntry(model.NewResolvedPolymorphicEvent(maxTs))
es.AddEntry(ctx, model.NewResolvedPolymorphicEvent(maxTs))
}()
var resolvedTs uint64
for entry := range es.Output() {
Expand Down
Loading

0 comments on commit 75bc957

Please sign in to comment.