Skip to content

Commit

Permalink
Refactor and tidy package (pingcap#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored Oct 23, 2019
1 parent 810aafb commit 161b199
Show file tree
Hide file tree
Showing 27 changed files with 326 additions and 409 deletions.
19 changes: 3 additions & 16 deletions cdc/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,11 @@ import (
"context"

"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"
)

// buffer entry from kv layer
type BufferEntry struct {
KV *kv.RawKVEntry
Resolved *ResolvedSpan
}

func (e *BufferEntry) GetValue() interface{} {
if e.KV != nil {
return e.KV
} else if e.Resolved != nil {
return e.Resolved
} else {
return nil
}
}
type BufferEntry = kv.KvOrResolved

// Buffer buffers kv entries
type Buffer chan BufferEntry
Expand All @@ -44,7 +31,7 @@ func (b Buffer) AddKVEntry(ctx context.Context, kv *kv.RawKVEntry) error {
}

func (b Buffer) AddResolved(ctx context.Context, span util.Span, ts uint64) error {
return b.AddEntry(ctx, BufferEntry{Resolved: &ResolvedSpan{Span: span, Timestamp: ts}})
return b.AddEntry(ctx, BufferEntry{Resolved: &kv.ResolvedSpan{Span: span, Timestamp: ts}})
}

func (b Buffer) Get(ctx context.Context) (BufferEntry, error) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sync"
"time"

"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"

"github.com/pingcap/check"
"github.com/pingcap/tidb-cdc/cdc/kv"
Expand Down
6 changes: 0 additions & 6 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/roles"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/flags"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
Expand All @@ -36,11 +35,6 @@ const (
CaptureOwnerKey = kv.EtcdKeyBase + "/capture/owner"
)

type ResolvedSpan struct {
Span util.Span
Timestamp uint64
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules SubChangeFeed on it.
type Capture struct {
id string
Expand Down
36 changes: 14 additions & 22 deletions cdc/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package cdc

import (
"context"
"database/sql"
"fmt"
"math"
"strconv"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/mock"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/cdc/schema"
"github.com/pingcap/tidb-cdc/cdc/sink"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
)

func TestSuite(t *testing.T) { TestingT(t) }

type CDCSuite struct {
database string
puller *mock.MockTiDB
mock sqlmock.Sqlmock
mounter *TxnMounter
sink Sink
mounter *txn.Mounter
sink sink.Sink
}

var _ = Suite(NewCDCSuite())
Expand All @@ -39,8 +44,8 @@ func NewCDCSuite() *CDCSuite {
if err != nil {
panic(err.Error())
}
// create a schema
schema, err := NewSchema(jobs, false)
// create a picker
picker, err := schema.NewSchema(jobs, false)
if err != nil {
panic(err.Error())
}
Expand All @@ -51,22 +56,9 @@ func NewCDCSuite() *CDCSuite {
}
cdcSuite.mock = mock

inspector := &cachedInspector{
db: db,
cache: make(map[string]*tableInfo),
tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) {
info, err := getTableInfoFromSchema(schema, schemaName, tableName)
return info, err
},
}
sink := &mysqlSink{
db: db,
infoGetter: schema,
tblInspector: inspector,
}
cdcSuite.sink = sink
cdcSuite.sink = sink.NewMySQLSinkUsingSchema(db, picker)

mounter, err := NewTxnMounter(schema, time.Local)
mounter, err := txn.NewTxnMounter(picker, time.Local)
if err != nil {
panic(err.Error())
}
Expand All @@ -86,7 +78,7 @@ func (s *CDCSuite) RunAndCheckSync(c *C, execute func(func(string, ...interface{
rawKVs = append(rawKVs, kvs...)
}
execute(executeSql)
txn, err := s.mounter.Mount(RawTxn{ts: rawKVs[len(rawKVs)-1].Ts, entries: rawKVs})
txn, err := s.mounter.Mount(txn.RawTxn{TS: rawKVs[len(rawKVs)-1].Ts, Entries: rawKVs})
c.Assert(err, IsNil)
err = s.sink.Emit(context.Background(), *txn)
c.Assert(err, IsNil)
Expand Down
35 changes: 12 additions & 23 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,15 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/cdc/schema"
"github.com/pingcap/tidb-cdc/cdc/sink"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type formatType string

const (
optFormat = "format"

optFormatJSON formatType = "json"
)

type emitEntry struct {
row *encodeRow

resolved *ResolvedSpan
}

// ChangeFeedDetail describe the detail of a ChangeFeed
type ChangeFeedDetail struct {
SinkURI string `json:"sink-uri"`
Expand Down Expand Up @@ -87,12 +76,12 @@ type SubChangeFeed struct {
detail ChangeFeedDetail
watchs []util.Span

schema *Schema
mounter *TxnMounter
schema *schema.Schema
mounter *txn.Mounter

// sink is the Sink to write rows to.
// Resolved timestamps are never written by Capture
sink Sink
sink sink.Sink
}

func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChangeFeed, error) {
Expand All @@ -110,18 +99,18 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange
if err != nil {
return nil, err
}
schema, err := NewSchema(jobs, false)
schema, err := schema.NewSchema(jobs, false)
if err != nil {
return nil, errors.Trace(err)
}

sink, err := getSink(detail.SinkURI, schema, detail.Opts)
sink, err := sink.NewMySQLSink(detail.SinkURI, schema, detail.Opts)
if err != nil {
return nil, errors.Trace(err)
}

// TODO: get time zone from config
mounter, err := NewTxnMounter(schema, time.UTC)
mounter, err := txn.NewTxnMounter(schema, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -217,8 +206,8 @@ func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh c
return puller
}

func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error {
log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.entries))
func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn txn.RawTxn) error {
log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.Entries))
txn, err := c.mounter.Mount(rawTxn)
if err != nil {
return errors.Trace(err)
Expand Down
85 changes: 0 additions & 85 deletions cdc/encoder.go

This file was deleted.

22 changes: 21 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -35,6 +35,26 @@ const (

type RawKVEntry = RegionFeedValue

type KvOrResolved struct {
KV *RawKVEntry
Resolved *ResolvedSpan
}

type ResolvedSpan struct {
Span util.Span
Timestamp uint64
}

func (e *KvOrResolved) GetValue() interface{} {
if e.KV != nil {
return e.KV
} else if e.Resolved != nil {
return e.Resolved
} else {
return nil
}
}

// RegionFeedEvent from the kv layer.
// Only one of the event will be setted.
type RegionFeedEvent struct {
Expand Down
9 changes: 5 additions & 4 deletions cdc/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
"golang.org/x/sync/errgroup"
)

Expand All @@ -31,7 +32,7 @@ type Puller struct {
spans []util.Span
detail ChangeFeedDetail
buf Buffer
tsTracker resolveTsTracker
tsTracker txn.ResolveTsTracker
}

// NewPuller create a new Puller fetch event start from checkpointTS
Expand Down Expand Up @@ -125,6 +126,6 @@ func (p *Puller) GetResolvedTs() uint64 {
return p.tsTracker.Frontier()
}

func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, RawTxn) error) error {
return collectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker)
func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, txn.RawTxn) error) error {
return txn.CollectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker)
}
Loading

0 comments on commit 161b199

Please sign in to comment.