diff --git a/cdc/buffer.go b/cdc/buffer.go index 8009a038a81..3fa6b91e0ec 100644 --- a/cdc/buffer.go +++ b/cdc/buffer.go @@ -4,24 +4,11 @@ import ( "context" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) // buffer entry from kv layer -type BufferEntry struct { - KV *kv.RawKVEntry - Resolved *ResolvedSpan -} - -func (e *BufferEntry) GetValue() interface{} { - if e.KV != nil { - return e.KV - } else if e.Resolved != nil { - return e.Resolved - } else { - return nil - } -} +type BufferEntry = kv.KvOrResolved // Buffer buffers kv entries type Buffer chan BufferEntry @@ -44,7 +31,7 @@ func (b Buffer) AddKVEntry(ctx context.Context, kv *kv.RawKVEntry) error { } func (b Buffer) AddResolved(ctx context.Context, span util.Span, ts uint64) error { - return b.AddEntry(ctx, BufferEntry{Resolved: &ResolvedSpan{Span: span, Timestamp: ts}}) + return b.AddEntry(ctx, BufferEntry{Resolved: &kv.ResolvedSpan{Span: span, Timestamp: ts}}) } func (b Buffer) Get(ctx context.Context) (BufferEntry, error) { diff --git a/cdc/buffer_test.go b/cdc/buffer_test.go index 2e61f15dcd0..d9419a91c98 100644 --- a/cdc/buffer_test.go +++ b/cdc/buffer_test.go @@ -18,7 +18,7 @@ import ( "sync" "time" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" diff --git a/cdc/capture.go b/cdc/capture.go index a4be4761182..075fa39b1ee 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/roles" - "github.com/pingcap/tidb-cdc/cdc/util" "github.com/pingcap/tidb-cdc/pkg/flags" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store" @@ -36,11 +35,6 @@ const ( CaptureOwnerKey = kv.EtcdKeyBase + "/capture/owner" ) -type ResolvedSpan struct { - Span util.Span - Timestamp uint64 -} - // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules SubChangeFeed on it. type Capture struct { id string diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index 73b0579aa91..99f75795aec 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -2,25 +2,30 @@ package cdc import ( "context" - "database/sql" "fmt" "math" "strconv" + "testing" "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/cdc/sink" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" ) +func TestSuite(t *testing.T) { TestingT(t) } + type CDCSuite struct { database string puller *mock.MockTiDB mock sqlmock.Sqlmock - mounter *TxnMounter - sink Sink + mounter *txn.Mounter + sink sink.Sink } var _ = Suite(NewCDCSuite()) @@ -39,8 +44,8 @@ func NewCDCSuite() *CDCSuite { if err != nil { panic(err.Error()) } - // create a schema - schema, err := NewSchema(jobs, false) + // create a picker + picker, err := schema.NewSchema(jobs, false) if err != nil { panic(err.Error()) } @@ -51,22 +56,9 @@ func NewCDCSuite() *CDCSuite { } cdcSuite.mock = mock - inspector := &cachedInspector{ - db: db, - cache: make(map[string]*tableInfo), - tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) { - info, err := getTableInfoFromSchema(schema, schemaName, tableName) - return info, err - }, - } - sink := &mysqlSink{ - db: db, - infoGetter: schema, - tblInspector: inspector, - } - cdcSuite.sink = sink + cdcSuite.sink = sink.NewMySQLSinkUsingSchema(db, picker) - mounter, err := NewTxnMounter(schema, time.Local) + mounter, err := txn.NewTxnMounter(picker, time.Local) if err != nil { panic(err.Error()) } @@ -86,7 +78,7 @@ func (s *CDCSuite) RunAndCheckSync(c *C, execute func(func(string, ...interface{ rawKVs = append(rawKVs, kvs...) } execute(executeSql) - txn, err := s.mounter.Mount(RawTxn{ts: rawKVs[len(rawKVs)-1].Ts, entries: rawKVs}) + txn, err := s.mounter.Mount(txn.RawTxn{TS: rawKVs[len(rawKVs)-1].Ts, Entries: rawKVs}) c.Assert(err, IsNil) err = s.sink.Emit(context.Background(), *txn) c.Assert(err, IsNil) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index f583935a828..f8cf9396c45 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -24,26 +24,15 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/cdc/sink" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) -type formatType string - -const ( - optFormat = "format" - - optFormatJSON formatType = "json" -) - -type emitEntry struct { - row *encodeRow - - resolved *ResolvedSpan -} - // ChangeFeedDetail describe the detail of a ChangeFeed type ChangeFeedDetail struct { SinkURI string `json:"sink-uri"` @@ -87,12 +76,12 @@ type SubChangeFeed struct { detail ChangeFeedDetail watchs []util.Span - schema *Schema - mounter *TxnMounter + schema *schema.Schema + mounter *txn.Mounter // sink is the Sink to write rows to. // Resolved timestamps are never written by Capture - sink Sink + sink sink.Sink } func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChangeFeed, error) { @@ -110,18 +99,18 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange if err != nil { return nil, err } - schema, err := NewSchema(jobs, false) + schema, err := schema.NewSchema(jobs, false) if err != nil { return nil, errors.Trace(err) } - sink, err := getSink(detail.SinkURI, schema, detail.Opts) + sink, err := sink.NewMySQLSink(detail.SinkURI, schema, detail.Opts) if err != nil { return nil, errors.Trace(err) } // TODO: get time zone from config - mounter, err := NewTxnMounter(schema, time.UTC) + mounter, err := txn.NewTxnMounter(schema, time.UTC) if err != nil { return nil, errors.Trace(err) } @@ -217,8 +206,8 @@ func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh c return puller } -func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error { - log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.entries)) +func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn txn.RawTxn) error { + log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.Entries)) txn, err := c.mounter.Mount(rawTxn) if err != nil { return errors.Trace(err) @@ -230,39 +219,3 @@ func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) erro log.Info("Output Txn", zap.Reflect("Txn", txn)) return nil } - -// kvsToRows gets changed kvs from a closure and converts them into sql rows. -// The returned closure is not threadsafe. -func kvsToRows( - detail ChangeFeedDetail, - inputFn func(context.Context) (BufferEntry, error), -) func(context.Context) (*emitEntry, error) { - panic("todo") -} - -// emitEntries connects to a sink, receives rows from a closure, and repeatedly -// emits them to the sink. It returns a closure that may be repeatedly called to -// advance the changefeed and which returns span-level resolved timestamp -// updates. The returned closure is not threadsafe. -func emitEntries( - detail ChangeFeedDetail, - watchedSpans []util.Span, - encoder Encoder, - sink Sink, - inputFn func(context.Context) (*emitEntry, error), -) func(context.Context) ([]ResolvedSpan, error) { - panic("todo") -} - -// emitResolvedTimestamp emits a changefeed-level resolved timestamp -func emitResolvedTimestamp( - ctx context.Context, encoder Encoder, sink Sink, resolved uint64, -) error { - if err := sink.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return err - } - - log.Info("resolved", zap.Uint64("timestamp", resolved)) - - return nil -} diff --git a/cdc/encoder.go b/cdc/encoder.go deleted file mode 100644 index 46b61612bf3..00000000000 --- a/cdc/encoder.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdc - -import ( - "fmt" - - "github.com/pingcap/errors" - "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/types" -) - -// encodeRow holds all the pieces necessary to encode a row change into a key or -// value. -type encodeRow struct { - // datums is the new value of a changed table row. - datums []types.Datum - - commitTS uint64 - - // deleted is true if row is a deletion. In this case, only the primary - // key columns are guaranteed to be set in `datums`. - deleted bool - - tableInfo *model.TableInfo -} - -type Encoder interface { - // EncodeKey encodes the primary key of the given row. - EncodeKey(encodeRow) ([]byte, error) - - // EncodeValue encodes the primary key of the given row. - EncodeValue(encodeRow) ([]byte, error) - - // EncodeResolvedTimestamp encodes a resolved timestamp payload - EncodeResolvedTimestamp(uint64) ([]byte, error) -} - -func getEncoder(opts map[string]string) (Encoder, error) { - switch formatType(opts[optFormat]) { - case "", optFormatJSON: - return newJSONEncoder(opts) - default: - return nil, errors.Errorf("unknow format: %s", opts[optFormat]) - } -} - -type jsonEncoder struct { -} - -var _ Encoder = &jsonEncoder{} - -func newJSONEncoder(opts map[string]string) (*jsonEncoder, error) { - // TODO - return &jsonEncoder{}, nil -} - -func (e *jsonEncoder) EncodeKey(row encodeRow) ([]byte, error) { - // TODO - str := fmt.Sprintf("%+v", row) - return []byte(str), nil -} - -func (e *jsonEncoder) EncodeValue(row encodeRow) ([]byte, error) { - // TODO - str := fmt.Sprintf("%+v", row) - return []byte(str), nil -} - -func (e *jsonEncoder) EncodeResolvedTimestamp(ts uint64) ([]byte, error) { - // TODO - str := fmt.Sprintf("ts: %v", ts) - return []byte(str), nil -} diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 53323cd060d..34cf580b4ae 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -12,7 +12,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" pd "github.com/pingcap/pd/client" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -35,6 +35,26 @@ const ( type RawKVEntry = RegionFeedValue +type KvOrResolved struct { + KV *RawKVEntry + Resolved *ResolvedSpan +} + +type ResolvedSpan struct { + Span util.Span + Timestamp uint64 +} + +func (e *KvOrResolved) GetValue() interface{} { + if e.KV != nil { + return e.KV + } else if e.Resolved != nil { + return e.Resolved + } else { + return nil + } +} + // RegionFeedEvent from the kv layer. // Only one of the event will be setted. type RegionFeedEvent struct { diff --git a/cdc/puller.go b/cdc/puller.go index 3543ec02516..d140bcb589e 100644 --- a/cdc/puller.go +++ b/cdc/puller.go @@ -20,7 +20,8 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" "golang.org/x/sync/errgroup" ) @@ -31,7 +32,7 @@ type Puller struct { spans []util.Span detail ChangeFeedDetail buf Buffer - tsTracker resolveTsTracker + tsTracker txn.ResolveTsTracker } // NewPuller create a new Puller fetch event start from checkpointTS @@ -125,6 +126,6 @@ func (p *Puller) GetResolvedTs() uint64 { return p.tsTracker.Frontier() } -func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, RawTxn) error) error { - return collectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker) +func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, txn.RawTxn) error) error { + return txn.CollectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker) } diff --git a/cdc/schema.go b/cdc/schema/schema.go similarity index 98% rename from cdc/schema.go rename to cdc/schema/schema.go index 2590451fbb2..e0ce436fce7 100644 --- a/cdc/schema.go +++ b/cdc/schema/schema.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package schema import ( "encoding/json" @@ -248,7 +248,7 @@ func (s *Schema) addJob(job *model.Job) { } } -func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { +func (s *Schema) HandlePreviousDDLJobIfNeed(commitTs uint64) error { var i int var job *model.Job // TODO: Make sure jobs are sorted by BinlogInfo.FinishedTS @@ -265,7 +265,7 @@ func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { continue } - _, _, _, err := s.handleDDL(job) + _, _, _, err := s.HandleDDL(job) if err != nil { return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", job, s) } @@ -276,12 +276,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { return nil } -// handleDDL has four return values, +// HandleDDL has four return values, // the first value[string]: the schema name // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { +func (s *Schema) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job)) if skipJob(job) { diff --git a/cdc/schema_test.go b/cdc/schema/schema_test.go similarity index 95% rename from cdc/schema_test.go rename to cdc/schema/schema_test.go index b46460d6a98..c4893ad0a18 100644 --- a/cdc/schema_test.go +++ b/cdc/schema/schema_test.go @@ -11,10 +11,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package schema import ( "fmt" + "testing" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -25,6 +26,8 @@ import ( type schemaSuite struct{} +func Test(t *testing.T) { TestingT(t) } + var _ = Suite(&schemaSuite{}) func (t *schemaSuite) TestSchema(c *C) { @@ -61,7 +64,7 @@ func (t *schemaSuite) TestSchema(c *C) { // reconstruct the local schema schema, err := NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(123) + err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(err, IsNil) // test drop schema @@ -78,7 +81,7 @@ func (t *schemaSuite) TestSchema(c *C) { ) schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(124) + err = schema.HandlePreviousDDLJobIfNeed(124) c.Assert(err, IsNil) // test create schema already exist error @@ -87,7 +90,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append(jobs, jobDup) schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(125) + err = schema.HandlePreviousDDLJobIfNeed(125) c.Log(err) c.Assert(errors.IsAlreadyExists(err), IsTrue) @@ -106,7 +109,7 @@ func (t *schemaSuite) TestSchema(c *C) { ) schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(123) + err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(errors.IsNotFound(err), IsTrue) } @@ -204,7 +207,7 @@ func (*schemaSuite) TestTable(c *C) { // reconstruct the local schema schema, err := NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(126) + err = schema.HandlePreviousDDLJobIfNeed(126) c.Assert(err, IsNil) // check the historical db that constructed above whether in the schema list of local schema @@ -235,7 +238,7 @@ func (*schemaSuite) TestTable(c *C) { ) schema1, err := NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema1.handlePreviousDDLJobIfNeed(127) + err = schema1.HandlePreviousDDLJobIfNeed(127) c.Assert(err, IsNil) _, ok = schema1.TableByID(tblInfo1.ID) c.Assert(ok, IsTrue) @@ -257,7 +260,7 @@ func (*schemaSuite) TestTable(c *C) { ) schema2, err := NewSchema(jobs, false) c.Assert(err, IsNil) - err = schema2.handlePreviousDDLJobIfNeed(128) + err = schema2.HandlePreviousDDLJobIfNeed(128) c.Assert(err, IsNil) _, ok = schema2.TableByID(tblInfo.ID) @@ -282,13 +285,13 @@ func (t *schemaSuite) TestHandleDDL(c *C) { // check rollback done job job := &model.Job{ID: 1, State: model.JobStateRollbackDone} - _, _, sql, err := schema.handleDDL(job) + _, _, sql, err := schema.HandleDDL(job) c.Assert(err, IsNil) c.Assert(sql, Equals, "") // check job.Query is empty job = &model.Job{ID: 1, State: model.JobStateDone} - _, _, sql, err = schema.handleDDL(job) + _, _, sql, err = schema.HandleDDL(job) c.Assert(sql, Equals, "") c.Assert(err, NotNil, Commentf("should return not found job.Query")) @@ -397,7 +400,7 @@ func (t *schemaSuite) TestAddImplicitColumn(c *C) { } func testDoDDLAndCheck(c *C, schema *Schema, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { - schemaName, tableName, resSQL, err := schema.handleDDL(job) + schemaName, tableName, resSQL, err := schema.HandleDDL(job) c.Logf("handle: %s", job.Query) c.Logf("result: %s, %s, %s, %v", schemaName, tableName, resSQL, err) c.Assert(err != nil, Equals, isErr) diff --git a/cdc/cached_inspector.go b/cdc/sink/cached_inspector.go similarity index 88% rename from cdc/cached_inspector.go rename to cdc/sink/cached_inspector.go index c937933c381..a7231414d9f 100644 --- a/cdc/cached_inspector.go +++ b/cdc/sink/cached_inspector.go @@ -11,9 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink -import "database/sql" +import ( + "database/sql" + + "github.com/pingcap/tidb-cdc/pkg/util" +) type cachedInspector struct { db *sql.DB @@ -32,7 +36,7 @@ func newCachedInspector(db *sql.DB) *cachedInspector { var _ tableInspector = &cachedInspector{} func (i *cachedInspector) Get(schema, table string) (*tableInfo, error) { - key := quoteSchema(schema, table) + key := util.QuoteSchema(schema, table) t, ok := i.cache[key] if !ok { var err error @@ -46,6 +50,6 @@ func (i *cachedInspector) Get(schema, table string) (*tableInfo, error) { } func (i *cachedInspector) Refresh(schema, table string) { - key := quoteSchema(schema, table) + key := util.QuoteSchema(schema, table) delete(i.cache, key) } diff --git a/cdc/cached_inspector_test.go b/cdc/sink/cached_inspector_test.go similarity index 99% rename from cdc/cached_inspector_test.go rename to cdc/sink/cached_inspector_test.go index bde0634f246..76ad2daeeaf 100644 --- a/cdc/cached_inspector_test.go +++ b/cdc/sink/cached_inspector_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "database/sql" diff --git a/cdc/mysql.go b/cdc/sink/mysql.go similarity index 79% rename from cdc/mysql.go rename to cdc/sink/mysql.go index 1964fb13f32..685f6717a65 100644 --- a/cdc/mysql.go +++ b/cdc/sink/mysql.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" @@ -20,26 +20,23 @@ import ( "regexp" "strings" + "github.com/cenkalti/backoff" dmysql "github.com/go-sql-driver/mysql" + "go.uber.org/zap" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/table" - - "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" - - "github.com/pingcap/errors" - - "github.com/pingcap/parser/model" - - "github.com/cenkalti/backoff" - _ "github.com/pingcap/tidb/types/parser_driver" - - "github.com/pingcap/log" - "go.uber.org/zap" ) type tableInspector interface { @@ -57,7 +54,42 @@ type mysqlSink struct { var _ Sink = &mysqlSink{} -func (s *mysqlSink) Emit(ctx context.Context, txn Txn) error { +func NewMySQLSink( + sinkURI string, + infoGetter TableInfoGetter, + opts map[string]string, +) (Sink, error) { + // TODO + db, err := sql.Open("mysql", sinkURI) + if err != nil { + return nil, err + } + cachedInspector := newCachedInspector(db) + sink := mysqlSink{ + db: db, + infoGetter: infoGetter, + tblInspector: cachedInspector, + } + return &sink, nil +} + +func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Schema) Sink { + inspector := &cachedInspector{ + db: db, + cache: make(map[string]*tableInfo), + tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) { + info, err := getTableInfoFromSchema(picker, schemaName, tableName) + return info, err + }, + } + return &mysqlSink{ + db: db, + infoGetter: picker, + tblInspector: inspector, + } +} + +func (s *mysqlSink) Emit(ctx context.Context, txn txn.Txn) error { filterBySchemaAndTable(&txn) if len(txn.DMLs) == 0 && txn.DDL == nil { log.Info("Whole txn ignored", zap.Uint64("ts", txn.Ts)) @@ -78,24 +110,24 @@ func (s *mysqlSink) Emit(ctx context.Context, txn Txn) error { return errors.Trace(s.execDMLs(ctx, dmls)) } -func filterBySchemaAndTable(txn *Txn) { +func filterBySchemaAndTable(t *txn.Txn) { toIgnore := regexp.MustCompile("(?i)^(INFORMATION_SCHEMA|PERFORMANCE_SCHEMA|MYSQL)$") - if txn.IsDDL() { - if toIgnore.MatchString(txn.DDL.Database) { - txn.DDL = nil + if t.IsDDL() { + if toIgnore.MatchString(t.DDL.Database) { + t.DDL = nil } } else { - filteredDMLs := make([]*DML, 0, len(txn.DMLs)) - for _, dml := range txn.DMLs { + filteredDMLs := make([]*txn.DML, 0, len(t.DMLs)) + for _, dml := range t.DMLs { if !toIgnore.MatchString(dml.Database) { filteredDMLs = append(filteredDMLs, dml) } } - txn.DMLs = filteredDMLs + t.DMLs = filteredDMLs } } -func (s *mysqlSink) EmitResolvedTimestamp(ctx context.Context, encoder Encoder, resolved uint64) error { +func (s *mysqlSink) EmitResolvedTimestamp(ctx context.Context, resolved uint64) error { return nil } @@ -107,7 +139,7 @@ func (s *mysqlSink) Close() error { return nil } -func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *DDL, maxRetries uint64) error { +func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *txn.DDL, maxRetries uint64) error { retryCfg := backoff.WithMaxRetries( backoff.WithContext( backoff.NewExponentialBackOff(), ctx), @@ -125,7 +157,7 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *DDL, maxRetr }, retryCfg) } -func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { +func (s *mysqlSink) execDDL(ctx context.Context, ddl *txn.DDL) error { shouldSwitchDB := len(ddl.Database) > 0 && ddl.Type != model.ActionCreateSchema tx, err := s.db.BeginTx(ctx, nil) @@ -134,7 +166,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { } if shouldSwitchDB { - _, err = tx.ExecContext(ctx, "USE "+quoteName(ddl.Database)+";") + _, err = tx.ExecContext(ctx, "USE "+util.QuoteName(ddl.Database)+";") if err != nil { tx.Rollback() return err @@ -154,18 +186,18 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { return nil } -func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*DML) error { +func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*txn.DML) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } for _, dml := range dmls { - var fPrepare func(*DML) (string, []interface{}, error) + var fPrepare func(*txn.DML) (string, []interface{}, error) switch dml.Tp { - case InsertDMLType, UpdateDMLType: + case txn.InsertDMLType, txn.UpdateDMLType: fPrepare = s.prepareReplace - case DeleteDMLType: + case txn.DeleteDMLType: fPrepare = s.prepareDelete default: return fmt.Errorf("invalid dml type: %v", dml.Tp) @@ -189,8 +221,8 @@ func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*DML) error { return nil } -func (s *mysqlSink) formatDMLs(dmls []*DML) ([]*DML, error) { - result := make([]*DML, 0, len(dmls)) +func (s *mysqlSink) formatDMLs(dmls []*txn.DML) ([]*txn.DML, error) { + result := make([]*txn.DML, 0, len(dmls)) for _, dml := range dmls { tableInfo, ok := s.getTableDefinition(dml.Database, dml.Table) if !ok { @@ -215,16 +247,16 @@ func (s *mysqlSink) getTableDefinition(schema, table string) (*model.TableInfo, return tableInfo, ok } -func (s *mysqlSink) prepareReplace(dml *DML) (string, []interface{}, error) { +func (s *mysqlSink) prepareReplace(dml *txn.DML) (string, []interface{}, error) { info, err := s.tblInspector.Get(dml.Database, dml.Table) if err != nil { return "", nil, err } var builder strings.Builder cols := "(" + buildColumnList(info.columns) + ")" - tblName := quoteSchema(dml.Database, dml.Table) + tblName := util.QuoteSchema(dml.Database, dml.Table) builder.WriteString("REPLACE INTO " + tblName + cols + " VALUES ") - builder.WriteString("(" + holderString(len(info.columns)) + ");") + builder.WriteString("(" + util.HolderString(len(info.columns)) + ");") args := make([]interface{}, 0, len(info.columns)) for _, name := range info.columns { @@ -238,7 +270,7 @@ func (s *mysqlSink) prepareReplace(dml *DML) (string, []interface{}, error) { return builder.String(), args, nil } -func (s *mysqlSink) prepareDelete(dml *DML) (string, []interface{}, error) { +func (s *mysqlSink) prepareDelete(dml *txn.DML) (string, []interface{}, error) { info, err := s.tblInspector.Get(dml.Database, dml.Table) if err != nil { return "", nil, err @@ -254,9 +286,9 @@ func (s *mysqlSink) prepareDelete(dml *DML) (string, []interface{}, error) { builder.WriteString(" AND ") } if wargs[i].IsNull() { - builder.WriteString(quoteName(colNames[i]) + " IS NULL") + builder.WriteString(util.QuoteName(colNames[i]) + " IS NULL") } else { - builder.WriteString(quoteName(colNames[i]) + " = ?") + builder.WriteString(util.QuoteName(colNames[i]) + " = ?") args = append(args, wargs[i].GetValue()) } } @@ -397,3 +429,16 @@ func getSQLErrCode(err error) (terror.ErrCode, bool) { return terror.ErrCode(mysqlErr.Number), true } + +func buildColumnList(names []string) string { + var b strings.Builder + for i, name := range names { + if i > 0 { + b.WriteString(",") + } + b.WriteString(util.QuoteName(name)) + + } + + return b.String() +} diff --git a/cdc/mysql_test.go b/cdc/sink/mysql_test.go similarity index 83% rename from cdc/mysql_test.go rename to cdc/sink/mysql_test.go index fa34e21c548..b2855869474 100644 --- a/cdc/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -11,13 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/parser/model" @@ -50,8 +51,8 @@ func (s EmitSuite) TestShouldExecDDL(c *check.C) { tblInspector: dummyInspector{}, } - txn := Txn{ - DDL: &DDL{ + t := txn.Txn{ + DDL: &txn.DDL{ Database: "test", Table: "user", SQL: "CREATE TABLE user (id INT PRIMARY KEY);", @@ -60,11 +61,11 @@ func (s EmitSuite) TestShouldExecDDL(c *check.C) { mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec(txn.DDL.SQL).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(t.DDL.SQL).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -82,8 +83,8 @@ func (s EmitSuite) TestShouldIgnoreCertainDDLError(c *check.C) { tblInspector: dummyInspector{}, } - txn := Txn{ - DDL: &DDL{ + t := txn.Txn{ + DDL: &txn.DDL{ Database: "test", Table: "user", SQL: "CREATE TABLE user (id INT PRIMARY KEY);", @@ -95,10 +96,10 @@ func (s EmitSuite) TestShouldIgnoreCertainDDLError(c *check.C) { ignorable := dmysql.MySQLError{ Number: uint16(infoschema.ErrTableExists.Code()), } - mock.ExpectExec(txn.DDL.SQL).WillReturnError(&ignorable) + mock.ExpectExec(t.DDL.SQL).WillReturnError(&ignorable) // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -161,12 +162,12 @@ func (s EmitSuite) TestShouldExecReplaceInto(c *check.C) { infoGetter: &helper, } - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ { Database: "test", Table: "user", - Tp: InsertDMLType, + Tp: txn.InsertDMLType, Values: map[string]dbtypes.Datum{ "id": dbtypes.NewDatum(42), "name": dbtypes.NewDatum("tester1"), @@ -182,7 +183,7 @@ func (s EmitSuite) TestShouldExecReplaceInto(c *check.C) { mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -202,12 +203,12 @@ func (s EmitSuite) TestShouldExecDelete(c *check.C) { infoGetter: &helper, } - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ { Database: "test", Table: "user", - Tp: DeleteDMLType, + Tp: txn.DeleteDMLType, Values: map[string]dbtypes.Datum{ "id": dbtypes.NewDatum(123), "name": dbtypes.NewDatum("tester1"), @@ -223,7 +224,7 @@ func (s EmitSuite) TestShouldExecDelete(c *check.C) { mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -235,8 +236,8 @@ type FilterSuite struct{} var _ = check.Suite(&FilterSuite{}) func (s *FilterSuite) TestFilterDMLs(c *check.C) { - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ {Database: "INFORMATIOn_SCHEmA"}, {Database: "test"}, {Database: "test_mysql"}, @@ -244,21 +245,21 @@ func (s *FilterSuite) TestFilterDMLs(c *check.C) { }, Ts: 213, } - filterBySchemaAndTable(&txn) - c.Assert(txn.Ts, check.Equals, uint64(213)) - c.Assert(txn.DDL, check.IsNil) - c.Assert(txn.DMLs, check.HasLen, 2) - c.Assert(txn.DMLs[0].Database, check.Equals, "test") - c.Assert(txn.DMLs[1].Database, check.Equals, "test_mysql") + filterBySchemaAndTable(&t) + c.Assert(t.Ts, check.Equals, uint64(213)) + c.Assert(t.DDL, check.IsNil) + c.Assert(t.DMLs, check.HasLen, 2) + c.Assert(t.DMLs[0].Database, check.Equals, "test") + c.Assert(t.DMLs[1].Database, check.Equals, "test_mysql") } func (s *FilterSuite) TestFilterDDL(c *check.C) { - txn := Txn{ - DDL: &DDL{Database: "performance_schema"}, + t := txn.Txn{ + DDL: &txn.DDL{Database: "performance_schema"}, Ts: 10234, } - filterBySchemaAndTable(&txn) - c.Assert(txn.Ts, check.Equals, uint64((10234))) - c.Assert(txn.DMLs, check.HasLen, 0) - c.Assert(txn.DDL, check.IsNil) + filterBySchemaAndTable(&t) + c.Assert(t.Ts, check.Equals, uint64((10234))) + c.Assert(t.DMLs, check.HasLen, 0) + c.Assert(t.DDL, check.IsNil) } diff --git a/cdc/sink.go b/cdc/sink/sink.go similarity index 72% rename from cdc/sink.go rename to cdc/sink/sink.go index 99803fe721e..46b968ac703 100644 --- a/cdc/sink.go +++ b/cdc/sink/sink.go @@ -11,23 +11,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" - "database/sql" "fmt" "io" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-cdc/cdc/txn" ) // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { - Emit(ctx context.Context, txn Txn) error + Emit(ctx context.Context, t txn.Txn) error EmitResolvedTimestamp( ctx context.Context, - encoder Encoder, resolved uint64, ) error // TODO: Add GetLastSuccessTs() uint64 @@ -44,37 +43,18 @@ type TableInfoGetter interface { GetTableIDByName(schema, table string) (int64, bool) } -func getSink( - sinkURI string, - infoGetter TableInfoGetter, - opts map[string]string, -) (Sink, error) { - // TODO - db, err := sql.Open("mysql", sinkURI) - if err != nil { - return nil, err - } - cachedInspector := newCachedInspector(db) - sink := mysqlSink{ - db: db, - infoGetter: infoGetter, - tblInspector: cachedInspector, - } - return &sink, nil -} - type writerSink struct { io.Writer } var _ Sink = &writerSink{} -func (s *writerSink) Emit(ctx context.Context, txn Txn) error { - fmt.Fprintf(s, "commit ts: %d", txn.Ts) +func (s *writerSink) Emit(ctx context.Context, t txn.Txn) error { + fmt.Fprintf(s, "commit ts: %d", t.Ts) return nil } -func (s *writerSink) EmitResolvedTimestamp(ctx context.Context, encoder Encoder, resolved uint64) error { +func (s *writerSink) EmitResolvedTimestamp(ctx context.Context, resolved uint64) error { fmt.Fprintf(s, "resolved: %d", resolved) return nil } diff --git a/cdc/util.go b/cdc/sink/util.go similarity index 86% rename from cdc/util.go rename to cdc/sink/util.go index a23e7dc5357..d22e4b2e192 100644 --- a/cdc/util.go +++ b/cdc/sink/util.go @@ -11,15 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( gosql "database/sql" - "fmt" "strings" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/errors" ) @@ -81,7 +82,7 @@ func getTableInfo(db *gosql.DB, schema string, table string) (info *tableInfo, e return } -func getTableInfoFromSchema(schema *Schema, schemaName, tableName string) (info *tableInfo, err error) { +func getTableInfoFromSchema(schema *schema.Schema, schemaName, tableName string) (info *tableInfo, err error) { info = new(tableInfo) tableId, exist := schema.GetTableIDByName(schemaName, tableName) if !exist { @@ -136,43 +137,6 @@ func getTableInfoFromSchema(schema *Schema, schemaName, tableName string) (info return } -func quoteSchema(schema string, table string) string { - return fmt.Sprintf("`%s`.`%s`", escapeName(schema), escapeName(table)) -} - -func quoteName(name string) string { - return "`" + escapeName(name) + "`" -} - -func escapeName(name string) string { - return strings.Replace(name, "`", "``", -1) -} - -func holderString(n int) string { - var builder strings.Builder - builder.Grow((n-1)*2 + 1) - for i := 0; i < n; i++ { - if i > 0 { - builder.WriteString(",") - } - builder.WriteString("?") - } - return builder.String() -} - -func buildColumnList(names []string) string { - var b strings.Builder - for i, name := range names { - if i > 0 { - b.WriteString(",") - } - b.WriteString(quoteName(name)) - - } - - return b.String() -} - // getColsOfTbl returns a slice of the names of all columns, // generated columns are excluded. // https://dev.mysql.com/doc/mysql-infoschema-excerpt/5.7/en/columns-table.html @@ -257,7 +221,7 @@ func getUniqKeys(db *gosql.DB, schema, table string) (uniqueKeys []indexInfo, er return } -func isTableChanged(ddl *DDL) bool { +func isTableChanged(ddl *txn.DDL) bool { switch ddl.Type { case model.ActionDropTable, model.ActionDropSchema, model.ActionTruncateTable, model.ActionCreateSchema: return false diff --git a/cdc/util_test.go b/cdc/sink/util_test.go similarity index 96% rename from cdc/util_test.go rename to cdc/sink/util_test.go index b617bca3de4..61cdf5484af 100644 --- a/cdc/util_test.go +++ b/cdc/sink/util_test.go @@ -11,15 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "regexp" + "testing" "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" ) +func TestSuite(t *testing.T) { check.TestingT(t) } + type UtilSuite struct{} var _ = check.Suite(&UtilSuite{}) diff --git a/cdc/span_frontier.go b/cdc/span_frontier.go index 677fe01e011..b7b6cef14ad 100644 --- a/cdc/span_frontier.go +++ b/cdc/span_frontier.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/biogo/store/interval" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) // Generic intervals diff --git a/cdc/span_frontier_test.go b/cdc/span_frontier_test.go index 71fdda33815..ff172d44e09 100644 --- a/cdc/span_frontier_test.go +++ b/cdc/span_frontier_test.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/pingcap/check" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) type spanFrontierSuite struct{} diff --git a/cdc/txn.go b/cdc/txn/txn.go similarity index 83% rename from cdc/txn.go rename to cdc/txn/txn.go index ed4dabaa462..9ccc84c7886 100644 --- a/cdc/txn.go +++ b/cdc/txn/txn.go @@ -11,36 +11,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package txn import ( "context" "sort" "time" - "github.com/pingcap/tidb-cdc/cdc/util" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-cdc/cdc/entry" "github.com/pingcap/tidb-cdc/cdc/kv" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/types" "go.uber.org/zap" ) -type sqlType int - -const ( - sqlDML sqlType = iota - sqlDDL sqlType = iota -) - -// RawTxn represents a complete collection of entries that belong to the same transaction +// RawTxn represents a complete collection of Entries that belong to the same transaction type RawTxn struct { - ts uint64 - entries []*kv.RawKVEntry + TS uint64 + Entries []*kv.RawKVEntry } // DMLType represents the dml type @@ -66,7 +59,7 @@ type DML struct { // TableName returns the fully qualified name of the DML's table func (dml *DML) TableName() string { - return quoteSchema(dml.Database, dml.Table) + return util.QuoteSchema(dml.Database, dml.Table) } // DDL holds the ddl info @@ -90,16 +83,16 @@ func (t Txn) IsDDL() bool { return t.DDL != nil } -type resolveTsTracker interface { +type ResolveTsTracker interface { Forward(span util.Span, ts uint64) bool Frontier() uint64 } -func collectRawTxns( +func CollectRawTxns( ctx context.Context, - inputFn func(context.Context) (BufferEntry, error), + inputFn func(context.Context) (kv.KvOrResolved, error), outputFn func(context.Context, RawTxn) error, - tracker resolveTsTracker, + tracker ResolveTsTracker, ) error { entryGroups := make(map[uint64][]*kv.RawKVEntry) for { @@ -112,9 +105,9 @@ func collectRawTxns( } else if be.Resolved != nil { resolvedTs := be.Resolved.Timestamp // 1. Forward is called in a single thread - // 2. The only way the global minimum resolved ts can be forwarded is that + // 2. The only way the global minimum resolved TS can be forwarded is that // the resolveTs we pass in replaces the original one - // Thus, we can just use resolvedTs here as the new global minimum resolved ts. + // Thus, we can just use resolvedTs here as the new global minimum resolved TS. forwarded := tracker.Forward(be.Resolved.Span, resolvedTs) if !forwarded { continue @@ -128,7 +121,7 @@ func collectRawTxns( } // TODO: Handle the case when readyTsList is empty sort.Slice(readyTxns, func(i, j int) bool { - return readyTxns[i].ts < readyTxns[j].ts + return readyTxns[i].TS < readyTxns[j].TS }) for _, t := range readyTxns { err := outputFn(ctx, t) @@ -140,26 +133,26 @@ func collectRawTxns( } } -type TxnMounter struct { - schema *Schema +type Mounter struct { + schema *schema.Schema loc *time.Location } -func NewTxnMounter(schema *Schema, loc *time.Location) (*TxnMounter, error) { - m := &TxnMounter{schema: schema, loc: loc} +func NewTxnMounter(schema *schema.Schema, loc *time.Location) (*Mounter, error) { + m := &Mounter{schema: schema, loc: loc} return m, nil } -func (m *TxnMounter) Mount(rawTxn RawTxn) (*Txn, error) { +func (m *Mounter) Mount(rawTxn RawTxn) (*Txn, error) { txn := &Txn{ - Ts: rawTxn.ts, + Ts: rawTxn.TS, } var replaceDMLs, deleteDMLs []*DML - err := m.schema.handlePreviousDDLJobIfNeed(rawTxn.ts) + err := m.schema.HandlePreviousDDLJobIfNeed(rawTxn.TS) if err != nil { return nil, errors.Trace(err) } - for _, raw := range rawTxn.entries { + for _, raw := range rawTxn.Entries { kvEntry, err := entry.Unmarshal(raw) if err != nil { return nil, errors.Trace(err) @@ -200,7 +193,7 @@ func (m *TxnMounter) Mount(rawTxn RawTxn) (*Txn, error) { return txn, nil } -func (m *TxnMounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { +func (m *Mounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { tableInfo, tableName, handleColName, err := m.fetchTableInfo(row.TableId) if err != nil { return nil, errors.Trace(err) @@ -240,7 +233,7 @@ func (m *TxnMounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { }, nil } -func (m *TxnMounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { +func (m *Mounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { tableInfo, tableName, _, err := m.fetchTableInfo(idx.TableId) if err != nil { return nil, errors.Trace(err) @@ -268,7 +261,7 @@ func (m *TxnMounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { }, nil } -func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *TableName, handleColName string, err error) { +func (m *Mounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *schema.TableName, handleColName string, err error) { tableInfo, exist := m.schema.TableByID(tableId) if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) @@ -278,7 +271,7 @@ func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) } - tableName = &TableName{database, table} + tableName = &schema.TableName{Schema: database, Table: table} pkColOffset := -1 for i, col := range tableInfo.Columns { @@ -295,7 +288,7 @@ func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, return } -func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, error) { +func (m *Mounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, error) { var databaseName, tableName string var err error getTableName := false @@ -308,7 +301,7 @@ func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, err getTableName = true } - _, _, _, err = m.schema.handleDDL(jobHistory.Job) + _, _, _, err = m.schema.HandleDDL(jobHistory.Job) if err != nil { return nil, errors.Trace(err) } @@ -328,7 +321,7 @@ func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, err }, nil } -func (m *TxnMounter) tryGetTableName(jobHistory *entry.DDLJobHistoryKVEntry) (databaseName string, tableName string, err error) { +func (m *Mounter) tryGetTableName(jobHistory *entry.DDLJobHistoryKVEntry) (databaseName string, tableName string, err error) { if tableId := jobHistory.Job.TableID; tableId > 0 { var exist bool databaseName, tableName, exist = m.schema.SchemaAndTableName(tableId) diff --git a/cdc/txn_test.go b/cdc/txn/txn_test.go similarity index 86% rename from cdc/txn_test.go rename to cdc/txn/txn_test.go index 4ee54b1b5b8..f6b5cb7b9c4 100644 --- a/cdc/txn_test.go +++ b/cdc/txn/txn_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package txn import ( "context" @@ -21,13 +21,13 @@ import ( "testing" "time" - "github.com/pingcap/tidb-cdc/cdc/util" - "github.com/pingcap/check" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-cdc/cdc/entry" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/types" ) @@ -57,12 +57,12 @@ func (t *mockTracker) Frontier() uint64 { var _ = check.Suite(&CollectRawTxnsSuite{}) func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { - var entries []BufferEntry + var entries []kv.KvOrResolved var startTs uint64 = 1024 var i uint64 for i = 0; i < 3; i++ { for j := 0; j < 3; j++ { - e := BufferEntry{ + e := kv.KvOrResolved{ KV: &kv.RawKVEntry{ OpType: kv.OpTypePut, Key: []byte(fmt.Sprintf("key-%d-%d", i, j)), @@ -74,16 +74,16 @@ func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { } // Only add resolved entry for the first 2 transaction for i = 0; i < 2; i++ { - e := BufferEntry{ - Resolved: &ResolvedSpan{Timestamp: startTs + i}, + e := kv.KvOrResolved{ + Resolved: &kv.ResolvedSpan{Timestamp: startTs + i}, } entries = append(entries, e) } nRead := 0 - input := func(ctx context.Context) (BufferEntry, error) { + input := func(ctx context.Context) (kv.KvOrResolved, error) { if nRead >= len(entries) { - return BufferEntry{}, errors.New("End") + return kv.KvOrResolved{}, errors.New("End") } e := entries[nRead] nRead++ @@ -97,24 +97,24 @@ func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { } ctx := context.Background() - err := collectRawTxns(ctx, input, output, &mockTracker{}) + err := CollectRawTxns(ctx, input, output, &mockTracker{}) c.Assert(err, check.ErrorMatches, "End") c.Assert(rawTxns, check.HasLen, 2) - c.Assert(rawTxns[0].ts, check.Equals, startTs) - for i, e := range rawTxns[0].entries { + c.Assert(rawTxns[0].TS, check.Equals, startTs) + for i, e := range rawTxns[0].Entries { c.Assert(e.Ts, check.Equals, startTs) c.Assert(string(e.Key), check.Equals, fmt.Sprintf("key-0-%d", i)) } - c.Assert(rawTxns[1].ts, check.Equals, startTs+1) - for i, e := range rawTxns[1].entries { + c.Assert(rawTxns[1].TS, check.Equals, startTs+1) + for i, e := range rawTxns[1].Entries { c.Assert(e.Ts, check.Equals, startTs+1) c.Assert(string(e.Key), check.Equals, fmt.Sprintf("key-1-%d", i)) } } func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { - var entries []BufferEntry + var entries []kv.KvOrResolved for _, v := range []struct { key []byte ts uint64 @@ -129,13 +129,13 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { {key: []byte("key2-1"), ts: 2}, {ts: 1, isResolvedTs: true}, } { - var e BufferEntry + var e kv.KvOrResolved if v.isResolvedTs { - e = BufferEntry{ - Resolved: &ResolvedSpan{Timestamp: v.ts}, + e = kv.KvOrResolved{ + Resolved: &kv.ResolvedSpan{Timestamp: v.ts}, } } else { - e = BufferEntry{ + e = kv.KvOrResolved{ KV: &kv.RawKVEntry{ OpType: kv.OpTypePut, Key: v.key, @@ -147,9 +147,9 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { } cursor := 0 - input := func(ctx context.Context) (BufferEntry, error) { + input := func(ctx context.Context) (kv.KvOrResolved, error) { if cursor >= len(entries) { - return BufferEntry{}, errors.New("End") + return kv.KvOrResolved{}, errors.New("End") } e := entries[cursor] cursor++ @@ -163,25 +163,25 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { } ctx := context.Background() - // Set up the tracker so that only the last resolve event forwards the global minimum ts + // Set up the tracker so that only the last resolve event forwards the global minimum TS tracker := mockTracker{forwarded: []bool{false, false, true}} - err := collectRawTxns(ctx, input, output, &tracker) + err := CollectRawTxns(ctx, input, output, &tracker) c.Assert(err, check.ErrorMatches, "End") c.Assert(rawTxns, check.HasLen, 1) txn := rawTxns[0] - c.Assert(txn.ts, check.Equals, uint64(1)) - c.Assert(txn.entries, check.HasLen, 3) - c.Assert(string(txn.entries[0].Key), check.Equals, "key1-1") - c.Assert(string(txn.entries[1].Key), check.Equals, "key1-2") - c.Assert(string(txn.entries[2].Key), check.Equals, "key1-3") + c.Assert(txn.TS, check.Equals, uint64(1)) + c.Assert(txn.Entries, check.HasLen, 3) + c.Assert(string(txn.Entries[0].Key), check.Equals, "key1-1") + c.Assert(string(txn.Entries[1].Key), check.Equals, "key1-2") + c.Assert(string(txn.Entries[2].Key), check.Equals, "key1-3") } type mountTxnsSuite struct{} var _ = check.Suite(&mountTxnsSuite{}) -func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *Schema) { +func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Schema) { puller, err := mock.NewMockPuller() c.Assert(err, check.IsNil) var jobs []*model.Job @@ -198,9 +198,9 @@ func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *Schema) } } c.Assert(len(jobs), check.Equals, len(sqls)) - schema, err := NewSchema(jobs, false) + schema, err := schema.NewSchema(jobs, false) c.Assert(err, check.IsNil) - err = schema.handlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) + err = schema.HandlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) c.Assert(err, check.IsNil) return puller, schema } @@ -212,8 +212,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV := puller.MustExec(c, "insert into testDB.test1 values('ttt',6)") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -241,8 +241,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV = puller.MustExec(c, "update testDB.test1 set id = 'vvv' where a = 6") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -278,8 +278,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV = puller.MustExec(c, "delete from testDB.test1 where a = 6") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -304,8 +304,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV := puller.MustExec(c, "insert into testDB.test1 values(777,888)") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -333,8 +333,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV = puller.MustExec(c, "update testDB.test1 set id = 999 where a = 888") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -370,8 +370,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV = puller.MustExec(c, "delete from testDB.test1 where id = 999") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -403,8 +403,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { c.Assert(err, check.IsNil) rawKV := puller.MustExec(c, "alter table testDB.test1 add b int null") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) c.Assert(txn, check.DeepEquals, &Txn{ @@ -420,8 +420,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { // test insert null value rawKV = puller.MustExec(c, "insert into testDB.test1(id,a) values('ttt',6)") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -449,8 +449,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { rawKV = puller.MustExec(c, "insert into testDB.test1(id,a,b) values('kkk',6,7)") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ diff --git a/cmd/debug.go b/cmd/debug.go index 61e923e224a..b633c0d97f5 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -12,7 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/pingcap/tidb-cdc/cdc" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) func init() { diff --git a/cdc/util/overlap_merge.go b/pkg/util/overlap_merge.go similarity index 100% rename from cdc/util/overlap_merge.go rename to pkg/util/overlap_merge.go diff --git a/cdc/util/overlap_merge_test.go b/pkg/util/overlap_merge_test.go similarity index 100% rename from cdc/util/overlap_merge_test.go rename to pkg/util/overlap_merge_test.go diff --git a/cdc/util/util.go b/pkg/util/span.go similarity index 97% rename from cdc/util/util.go rename to pkg/util/span.go index 0837c158fae..83fb8c3d6ef 100644 --- a/cdc/util/util.go +++ b/pkg/util/span.go @@ -14,7 +14,7 @@ type Span struct { } // UpperBoundKey represents the maximum value. -var UpperBoundKey []byte = []byte{255, 255, 255, 255, 255} +var UpperBoundKey = []byte{255, 255, 255, 255, 255} // Hack will set End as UpperBoundKey if End is Nil. func (s Span) Hack() Span { diff --git a/cdc/util/util_test.go b/pkg/util/span_test.go similarity index 97% rename from cdc/util/util_test.go rename to pkg/util/span_test.go index cbc05df803e..2b11ae95ec6 100644 --- a/cdc/util/util_test.go +++ b/pkg/util/span_test.go @@ -1,15 +1,11 @@ package util import ( - "testing" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/check" ) -func Test(t *testing.T) { check.TestingT(t) } - type spanSuite struct{} var _ = check.Suite(&spanSuite{}) diff --git a/pkg/util/string.go b/pkg/util/string.go new file mode 100644 index 00000000000..88d462d7e6a --- /dev/null +++ b/pkg/util/string.go @@ -0,0 +1,30 @@ +package util + +import ( + "fmt" + "strings" +) + +func QuoteSchema(schema string, table string) string { + return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table)) +} + +func QuoteName(name string) string { + return "`" + EscapeName(name) + "`" +} + +func EscapeName(name string) string { + return strings.Replace(name, "`", "``", -1) +} + +func HolderString(n int) string { + var builder strings.Builder + builder.Grow((n-1)*2 + 1) + for i := 0; i < n; i++ { + if i > 0 { + builder.WriteString(",") + } + builder.WriteString("?") + } + return builder.String() +}