Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and tidy package #64

Merged
merged 10 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions cdc/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,11 @@ import (
"context"

"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"
)

// buffer entry from kv layer
type BufferEntry struct {
KV *kv.RawKVEntry
Resolved *ResolvedSpan
}

func (e *BufferEntry) GetValue() interface{} {
if e.KV != nil {
return e.KV
} else if e.Resolved != nil {
return e.Resolved
} else {
return nil
}
}
type BufferEntry = kv.KvOrResolved

// Buffer buffers kv entries
type Buffer chan BufferEntry
Expand All @@ -44,7 +31,7 @@ func (b Buffer) AddKVEntry(ctx context.Context, kv *kv.RawKVEntry) error {
}

func (b Buffer) AddResolved(ctx context.Context, span util.Span, ts uint64) error {
return b.AddEntry(ctx, BufferEntry{Resolved: &ResolvedSpan{Span: span, Timestamp: ts}})
return b.AddEntry(ctx, BufferEntry{Resolved: &kv.ResolvedSpan{Span: span, Timestamp: ts}})
}

func (b Buffer) Get(ctx context.Context) (BufferEntry, error) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sync"
"time"

"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"

"github.com/pingcap/check"
"github.com/pingcap/tidb-cdc/cdc/kv"
Expand Down
6 changes: 0 additions & 6 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/roles"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/flags"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
Expand All @@ -36,11 +35,6 @@ const (
CaptureOwnerKey = kv.EtcdKeyBase + "/capture/owner"
)

type ResolvedSpan struct {
Span util.Span
Timestamp uint64
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules SubChangeFeed on it.
type Capture struct {
id string
Expand Down
36 changes: 14 additions & 22 deletions cdc/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package cdc

import (
"context"
"database/sql"
"fmt"
"math"
"strconv"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/mock"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/cdc/schema"
"github.com/pingcap/tidb-cdc/cdc/sink"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
)

func TestSuite(t *testing.T) { TestingT(t) }

type CDCSuite struct {
database string
puller *mock.MockTiDB
mock sqlmock.Sqlmock
mounter *TxnMounter
sink Sink
mounter *txn.Mounter
sink sink.Sink
}

var _ = Suite(NewCDCSuite())
Expand All @@ -39,8 +44,8 @@ func NewCDCSuite() *CDCSuite {
if err != nil {
panic(err.Error())
}
// create a schema
schema, err := NewSchema(jobs, false)
// create a picker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can rename picker, but not schema 🙂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about this Schema, I will update it in next pr.
I will take @suzaku 's advice, think of Schema as a storage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

picker, err := schema.NewSchemaPicker(jobs, false)
zier-one marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err.Error())
}
Expand All @@ -51,22 +56,9 @@ func NewCDCSuite() *CDCSuite {
}
cdcSuite.mock = mock

inspector := &cachedInspector{
db: db,
cache: make(map[string]*tableInfo),
tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) {
info, err := getTableInfoFromSchema(schema, schemaName, tableName)
return info, err
},
}
sink := &mysqlSink{
db: db,
infoGetter: schema,
tblInspector: inspector,
}
cdcSuite.sink = sink
cdcSuite.sink = sink.NewMySQLSinkUsingSchema(db, picker)

mounter, err := NewTxnMounter(schema, time.Local)
mounter, err := txn.NewTxnMounter(picker, time.Local)
if err != nil {
panic(err.Error())
}
Expand All @@ -86,7 +78,7 @@ func (s *CDCSuite) RunAndCheckSync(c *C, execute func(func(string, ...interface{
rawKVs = append(rawKVs, kvs...)
}
execute(executeSql)
txn, err := s.mounter.Mount(RawTxn{ts: rawKVs[len(rawKVs)-1].Ts, entries: rawKVs})
txn, err := s.mounter.Mount(txn.RawTxn{TS: rawKVs[len(rawKVs)-1].Ts, Entries: rawKVs})
c.Assert(err, IsNil)
err = s.sink.Emit(context.Background(), *txn)
c.Assert(err, IsNil)
Expand Down
71 changes: 12 additions & 59 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,15 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/kv"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/cdc/schema"
"github.com/pingcap/tidb-cdc/cdc/sink"
"github.com/pingcap/tidb-cdc/cdc/txn"
"github.com/pingcap/tidb-cdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type formatType string

const (
optFormat = "format"

optFormatJSON formatType = "json"
)

type emitEntry struct {
row *encodeRow

resolved *ResolvedSpan
}

// ChangeFeedDetail describe the detail of a ChangeFeed
type ChangeFeedDetail struct {
SinkURI string `json:"sink-uri"`
Expand Down Expand Up @@ -87,12 +76,12 @@ type SubChangeFeed struct {
detail ChangeFeedDetail
watchs []util.Span

schema *Schema
mounter *TxnMounter
schema *schema.Picker
mounter *txn.Mounter

// sink is the Sink to write rows to.
// Resolved timestamps are never written by Capture
sink Sink
sink sink.Sink
}

func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChangeFeed, error) {
Expand All @@ -110,18 +99,18 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange
if err != nil {
return nil, err
}
schema, err := NewSchema(jobs, false)
schema, err := schema.NewSchemaPicker(jobs, false)
if err != nil {
return nil, errors.Trace(err)
}

sink, err := getSink(detail.SinkURI, schema, detail.Opts)
sink, err := sink.NewMySQLSink(detail.SinkURI, schema, detail.Opts)
if err != nil {
return nil, errors.Trace(err)
}

// TODO: get time zone from config
mounter, err := NewTxnMounter(schema, time.UTC)
mounter, err := txn.NewTxnMounter(schema, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -217,8 +206,8 @@ func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh c
return puller
}

func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error {
log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.entries))
func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn txn.RawTxn) error {
log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.Entries))
txn, err := c.mounter.Mount(rawTxn)
if err != nil {
return errors.Trace(err)
Expand All @@ -230,39 +219,3 @@ func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) erro
log.Info("Output Txn", zap.Reflect("Txn", txn))
return nil
}

// kvsToRows gets changed kvs from a closure and converts them into sql rows.
// The returned closure is not threadsafe.
func kvsToRows(
detail ChangeFeedDetail,
inputFn func(context.Context) (BufferEntry, error),
) func(context.Context) (*emitEntry, error) {
panic("todo")
}

// emitEntries connects to a sink, receives rows from a closure, and repeatedly
// emits them to the sink. It returns a closure that may be repeatedly called to
// advance the changefeed and which returns span-level resolved timestamp
// updates. The returned closure is not threadsafe.
func emitEntries(
detail ChangeFeedDetail,
watchedSpans []util.Span,
encoder Encoder,
sink Sink,
inputFn func(context.Context) (*emitEntry, error),
) func(context.Context) ([]ResolvedSpan, error) {
panic("todo")
}

// emitResolvedTimestamp emits a changefeed-level resolved timestamp
func emitResolvedTimestamp(
ctx context.Context, encoder Encoder, sink Sink, resolved uint64,
) error {
if err := sink.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil {
return err
}

log.Info("resolved", zap.Uint64("timestamp", resolved))

return nil
}
85 changes: 0 additions & 85 deletions cdc/encoder.go

This file was deleted.

22 changes: 21 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb-cdc/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -35,6 +35,26 @@ const (

type RawKVEntry = RegionFeedValue

type KvOrResolved struct {
KV *RawKVEntry
Resolved *ResolvedSpan
}

type ResolvedSpan struct {
Span util.Span
Timestamp uint64
}

func (e *KvOrResolved) GetValue() interface{} {
if e.KV != nil {
return e.KV
} else if e.Resolved != nil {
return e.Resolved
} else {
return nil
}
}

// RegionFeedEvent from the kv layer.
// Only one of the event will be setted.
type RegionFeedEvent struct {
Expand Down
Loading