From 0ac0643f860017eac865d242fa6a21bda8413992 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 14:48:07 +0800 Subject: [PATCH 01/10] remove encoder --- cdc/changefeed.go | 50 ---------------------------- cdc/encoder.go | 85 ----------------------------------------------- cdc/mysql.go | 2 +- cdc/sink.go | 3 +- 4 files changed, 2 insertions(+), 138 deletions(-) delete mode 100644 cdc/encoder.go diff --git a/cdc/changefeed.go b/cdc/changefeed.go index f583935a828..d2eb62e5302 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -30,20 +30,6 @@ import ( "golang.org/x/sync/errgroup" ) -type formatType string - -const ( - optFormat = "format" - - optFormatJSON formatType = "json" -) - -type emitEntry struct { - row *encodeRow - - resolved *ResolvedSpan -} - // ChangeFeedDetail describe the detail of a ChangeFeed type ChangeFeedDetail struct { SinkURI string `json:"sink-uri"` @@ -230,39 +216,3 @@ func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) erro log.Info("Output Txn", zap.Reflect("Txn", txn)) return nil } - -// kvsToRows gets changed kvs from a closure and converts them into sql rows. -// The returned closure is not threadsafe. -func kvsToRows( - detail ChangeFeedDetail, - inputFn func(context.Context) (BufferEntry, error), -) func(context.Context) (*emitEntry, error) { - panic("todo") -} - -// emitEntries connects to a sink, receives rows from a closure, and repeatedly -// emits them to the sink. It returns a closure that may be repeatedly called to -// advance the changefeed and which returns span-level resolved timestamp -// updates. The returned closure is not threadsafe. -func emitEntries( - detail ChangeFeedDetail, - watchedSpans []util.Span, - encoder Encoder, - sink Sink, - inputFn func(context.Context) (*emitEntry, error), -) func(context.Context) ([]ResolvedSpan, error) { - panic("todo") -} - -// emitResolvedTimestamp emits a changefeed-level resolved timestamp -func emitResolvedTimestamp( - ctx context.Context, encoder Encoder, sink Sink, resolved uint64, -) error { - if err := sink.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return err - } - - log.Info("resolved", zap.Uint64("timestamp", resolved)) - - return nil -} diff --git a/cdc/encoder.go b/cdc/encoder.go deleted file mode 100644 index 46b61612bf3..00000000000 --- a/cdc/encoder.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cdc - -import ( - "fmt" - - "github.com/pingcap/errors" - "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/types" -) - -// encodeRow holds all the pieces necessary to encode a row change into a key or -// value. -type encodeRow struct { - // datums is the new value of a changed table row. - datums []types.Datum - - commitTS uint64 - - // deleted is true if row is a deletion. In this case, only the primary - // key columns are guaranteed to be set in `datums`. - deleted bool - - tableInfo *model.TableInfo -} - -type Encoder interface { - // EncodeKey encodes the primary key of the given row. - EncodeKey(encodeRow) ([]byte, error) - - // EncodeValue encodes the primary key of the given row. - EncodeValue(encodeRow) ([]byte, error) - - // EncodeResolvedTimestamp encodes a resolved timestamp payload - EncodeResolvedTimestamp(uint64) ([]byte, error) -} - -func getEncoder(opts map[string]string) (Encoder, error) { - switch formatType(opts[optFormat]) { - case "", optFormatJSON: - return newJSONEncoder(opts) - default: - return nil, errors.Errorf("unknow format: %s", opts[optFormat]) - } -} - -type jsonEncoder struct { -} - -var _ Encoder = &jsonEncoder{} - -func newJSONEncoder(opts map[string]string) (*jsonEncoder, error) { - // TODO - return &jsonEncoder{}, nil -} - -func (e *jsonEncoder) EncodeKey(row encodeRow) ([]byte, error) { - // TODO - str := fmt.Sprintf("%+v", row) - return []byte(str), nil -} - -func (e *jsonEncoder) EncodeValue(row encodeRow) ([]byte, error) { - // TODO - str := fmt.Sprintf("%+v", row) - return []byte(str), nil -} - -func (e *jsonEncoder) EncodeResolvedTimestamp(ts uint64) ([]byte, error) { - // TODO - str := fmt.Sprintf("ts: %v", ts) - return []byte(str), nil -} diff --git a/cdc/mysql.go b/cdc/mysql.go index 1964fb13f32..cd0f4397029 100644 --- a/cdc/mysql.go +++ b/cdc/mysql.go @@ -95,7 +95,7 @@ func filterBySchemaAndTable(txn *Txn) { } } -func (s *mysqlSink) EmitResolvedTimestamp(ctx context.Context, encoder Encoder, resolved uint64) error { +func (s *mysqlSink) EmitResolvedTimestamp(ctx context.Context, resolved uint64) error { return nil } diff --git a/cdc/sink.go b/cdc/sink.go index 99803fe721e..92d468b9f9d 100644 --- a/cdc/sink.go +++ b/cdc/sink.go @@ -27,7 +27,6 @@ type Sink interface { Emit(ctx context.Context, txn Txn) error EmitResolvedTimestamp( ctx context.Context, - encoder Encoder, resolved uint64, ) error // TODO: Add GetLastSuccessTs() uint64 @@ -74,7 +73,7 @@ func (s *writerSink) Emit(ctx context.Context, txn Txn) error { return nil } -func (s *writerSink) EmitResolvedTimestamp(ctx context.Context, encoder Encoder, resolved uint64) error { +func (s *writerSink) EmitResolvedTimestamp(ctx context.Context, resolved uint64) error { fmt.Fprintf(s, "resolved: %d", resolved) return nil } From f7b74c9daf382e73fa4d7b57c553979c417527e6 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 15:06:49 +0800 Subject: [PATCH 02/10] move schema to pkg/schema --- cdc/cdc_test.go | 3 +- cdc/changefeed.go | 5 +- cdc/txn.go | 20 +++---- cdc/txn_test.go | 7 +-- cdc/util.go | 4 +- cdc/schema.go => pkg/schema/schema_picker.go | 54 +++++++++---------- .../schema/schema_picker_test.go | 44 ++++++++------- 7 files changed, 70 insertions(+), 67 deletions(-) rename cdc/schema.go => pkg/schema/schema_picker.go (89%) rename cdc/schema_test.go => pkg/schema/schema_picker_test.go (92%) diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index 73b0579aa91..31e971240e9 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -13,6 +13,7 @@ import ( "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/schema" ) type CDCSuite struct { @@ -40,7 +41,7 @@ func NewCDCSuite() *CDCSuite { panic(err.Error()) } // create a schema - schema, err := NewSchema(jobs, false) + schema, err := schema.NewSchemaPicker(jobs, false) if err != nil { panic(err.Error()) } diff --git a/cdc/changefeed.go b/cdc/changefeed.go index d2eb62e5302..fd62df6ba3f 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -25,6 +25,7 @@ import ( pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -73,7 +74,7 @@ type SubChangeFeed struct { detail ChangeFeedDetail watchs []util.Span - schema *Schema + schema *schema.Picker mounter *TxnMounter // sink is the Sink to write rows to. @@ -96,7 +97,7 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange if err != nil { return nil, err } - schema, err := NewSchema(jobs, false) + schema, err := schema.NewSchemaPicker(jobs, false) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/txn.go b/cdc/txn.go index ed4dabaa462..572af0a061a 100644 --- a/cdc/txn.go +++ b/cdc/txn.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -30,13 +31,6 @@ import ( "go.uber.org/zap" ) -type sqlType int - -const ( - sqlDML sqlType = iota - sqlDDL sqlType = iota -) - // RawTxn represents a complete collection of entries that belong to the same transaction type RawTxn struct { ts uint64 @@ -141,11 +135,11 @@ func collectRawTxns( } type TxnMounter struct { - schema *Schema + schema *schema.Picker loc *time.Location } -func NewTxnMounter(schema *Schema, loc *time.Location) (*TxnMounter, error) { +func NewTxnMounter(schema *schema.Picker, loc *time.Location) (*TxnMounter, error) { m := &TxnMounter{schema: schema, loc: loc} return m, nil } @@ -155,7 +149,7 @@ func (m *TxnMounter) Mount(rawTxn RawTxn) (*Txn, error) { Ts: rawTxn.ts, } var replaceDMLs, deleteDMLs []*DML - err := m.schema.handlePreviousDDLJobIfNeed(rawTxn.ts) + err := m.schema.HandlePreviousDDLJobIfNeed(rawTxn.ts) if err != nil { return nil, errors.Trace(err) } @@ -268,7 +262,7 @@ func (m *TxnMounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { }, nil } -func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *TableName, handleColName string, err error) { +func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *schema.TableName, handleColName string, err error) { tableInfo, exist := m.schema.TableByID(tableId) if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) @@ -278,7 +272,7 @@ func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) } - tableName = &TableName{database, table} + tableName = &schema.TableName{database, table} pkColOffset := -1 for i, col := range tableInfo.Columns { @@ -308,7 +302,7 @@ func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, err getTableName = true } - _, _, _, err = m.schema.handleDDL(jobHistory.Job) + _, _, _, err = m.schema.HandleDDL(jobHistory.Job) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/txn_test.go b/cdc/txn_test.go index 4ee54b1b5b8..35faf6c5dd6 100644 --- a/cdc/txn_test.go +++ b/cdc/txn_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -181,7 +182,7 @@ type mountTxnsSuite struct{} var _ = check.Suite(&mountTxnsSuite{}) -func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *Schema) { +func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Picker) { puller, err := mock.NewMockPuller() c.Assert(err, check.IsNil) var jobs []*model.Job @@ -198,9 +199,9 @@ func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *Schema) } } c.Assert(len(jobs), check.Equals, len(sqls)) - schema, err := NewSchema(jobs, false) + schema, err := schema.NewSchemaPicker(jobs, false) c.Assert(err, check.IsNil) - err = schema.handlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) + err = schema.HandlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) c.Assert(err, check.IsNil) return puller, schema } diff --git a/cdc/util.go b/cdc/util.go index a23e7dc5357..7ff631013d8 100644 --- a/cdc/util.go +++ b/cdc/util.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/errors" ) @@ -81,7 +82,7 @@ func getTableInfo(db *gosql.DB, schema string, table string) (info *tableInfo, e return } -func getTableInfoFromSchema(schema *Schema, schemaName, tableName string) (info *tableInfo, err error) { +func getTableInfoFromSchema(schema *schema.Picker, schemaName, tableName string) (info *tableInfo, err error) { info = new(tableInfo) tableId, exist := schema.GetTableIDByName(schemaName, tableName) if !exist { @@ -141,6 +142,7 @@ func quoteSchema(schema string, table string) string { } func quoteName(name string) string { + return "`" + escapeName(name) + "`" } diff --git a/cdc/schema.go b/pkg/schema/schema_picker.go similarity index 89% rename from cdc/schema.go rename to pkg/schema/schema_picker.go index 2590451fbb2..2cd3ec63d10 100644 --- a/cdc/schema.go +++ b/pkg/schema/schema_picker.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package schema import ( "encoding/json" @@ -26,9 +26,9 @@ import ( const implicitColName = "_tidb_rowid" const implicitColID = -1 -// Schema stores the source TiDB all schema infomations +// Picker stores the source TiDB all schema infomations // schema infomations could be changed by drainer init and ddls appear -type Schema struct { +type Picker struct { tableIDToName map[int64]TableName tableNameToId map[TableName]int64 schemaNameToID map[string]int64 @@ -48,15 +48,15 @@ type Schema struct { currentVersion int64 } -// TableName specify a Schema name and Table name +// TableName specify a Picker name and Table name type TableName struct { Schema string `toml:"db-name" json:"db-name"` Table string `toml:"tbl-name" json:"tbl-name"` } -// NewSchema returns the Schema object -func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { - s := &Schema{ +// NewSchemaPicker returns the Picker object +func NewSchemaPicker(jobs []*model.Job, hasImplicitCol bool) (*Picker, error) { + s := &Picker{ hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), truncateTableID: make(map[int64]struct{}), @@ -72,7 +72,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { return s, nil } -func (s *Schema) String() string { +func (s *Picker) String() string { mp := map[string]interface{}{ "tableIDToName": s.tableIDToName, "tableNameToId": s.tableNameToId, @@ -89,12 +89,12 @@ func (s *Schema) String() string { } // SchemaMetaVersion returns the current schemaversion in drainer -func (s *Schema) SchemaMetaVersion() int64 { +func (s *Picker) SchemaMetaVersion() int64 { return s.schemaMetaVersion } // SchemaAndTableName returns the tableName by table id -func (s *Schema) SchemaAndTableName(id int64) (string, string, bool) { +func (s *Picker) SchemaAndTableName(id int64) (string, string, bool) { tn, ok := s.tableIDToName[id] if !ok { return "", "", false @@ -104,7 +104,7 @@ func (s *Schema) SchemaAndTableName(id int64) (string, string, bool) { } // GetTableIDByName returns the tableId by table schemaName and tableName -func (s *Schema) GetTableIDByName(schemaName string, tableName string) (int64, bool) { +func (s *Picker) GetTableIDByName(schemaName string, tableName string) (int64, bool) { id, ok := s.tableNameToId[TableName{ Schema: schemaName, Table: tableName, @@ -113,13 +113,13 @@ func (s *Schema) GetTableIDByName(schemaName string, tableName string) (int64, b } // SchemaByID returns the DBInfo by schema id -func (s *Schema) SchemaByID(id int64) (val *model.DBInfo, ok bool) { +func (s *Picker) SchemaByID(id int64) (val *model.DBInfo, ok bool) { val, ok = s.schemas[id] return } // SchemaByTableID returns the schema ID by table ID -func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { +func (s *Picker) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { tn, ok := s.tableIDToName[tableID] if !ok { return nil, false @@ -132,13 +132,13 @@ func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { } // TableByID returns the TableInfo by table id -func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool) { +func (s *Picker) TableByID(id int64) (val *model.TableInfo, ok bool) { val, ok = s.tables[id] return } // DropSchema deletes the given DBInfo -func (s *Schema) DropSchema(id int64) (string, error) { +func (s *Picker) DropSchema(id int64) (string, error) { schema, ok := s.schemas[id] if !ok { return "", errors.NotFoundf("schema %d", id) @@ -158,7 +158,7 @@ func (s *Schema) DropSchema(id int64) (string, error) { } // CreateSchema adds new DBInfo -func (s *Schema) CreateSchema(db *model.DBInfo) error { +func (s *Picker) CreateSchema(db *model.DBInfo) error { if _, ok := s.schemas[db.ID]; ok { return errors.AlreadyExistsf("schema %s(%d)", db.Name, db.ID) } @@ -171,7 +171,7 @@ func (s *Schema) CreateSchema(db *model.DBInfo) error { } // DropTable deletes the given TableInfo -func (s *Schema) DropTable(id int64) (string, error) { +func (s *Picker) DropTable(id int64) (string, error) { table, ok := s.tables[id] if !ok { return "", errors.NotFoundf("table %d", id) @@ -191,7 +191,7 @@ func (s *Schema) DropTable(id int64) (string, error) { } // CreateTable creates new TableInfo -func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { +func (s *Picker) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { _, ok := s.tables[table.ID] if ok { return errors.AlreadyExistsf("table %s.%s", schema.Name, table.Name) @@ -211,7 +211,7 @@ func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error } // ReplaceTable replace the table by new tableInfo -func (s *Schema) ReplaceTable(table *model.TableInfo) error { +func (s *Picker) ReplaceTable(table *model.TableInfo) error { _, ok := s.tables[table.ID] if !ok { return errors.NotFoundf("table %s(%d)", table.Name, table.ID) @@ -226,7 +226,7 @@ func (s *Schema) ReplaceTable(table *model.TableInfo) error { return nil } -func (s *Schema) removeTable(tableID int64) error { +func (s *Picker) removeTable(tableID int64) error { schema, ok := s.SchemaByTableID(tableID) if !ok { return errors.NotFoundf("table(%d)'s schema", tableID) @@ -242,13 +242,13 @@ func (s *Schema) removeTable(tableID int64) error { return nil } -func (s *Schema) addJob(job *model.Job) { +func (s *Picker) addJob(job *model.Job) { if len(s.jobs) == 0 || s.jobs[len(s.jobs)-1].BinlogInfo.SchemaVersion < job.BinlogInfo.SchemaVersion { s.jobs = append(s.jobs, job) } } -func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { +func (s *Picker) HandlePreviousDDLJobIfNeed(commitTs uint64) error { var i int var job *model.Job // TODO: Make sure jobs are sorted by BinlogInfo.FinishedTS @@ -265,7 +265,7 @@ func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { continue } - _, _, _, err := s.handleDDL(job) + _, _, _, err := s.HandleDDL(job) if err != nil { return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", job, s) } @@ -276,12 +276,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(commitTs uint64) error { return nil } -// handleDDL has four return values, +// HandleDDL has four return values, // the first value[string]: the schema name // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { +func (s *Picker) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job)) if skipJob(job) { @@ -450,12 +450,12 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, } // IsTruncateTableID returns true if the table id have been truncated by truncate table DDL -func (s *Schema) IsTruncateTableID(id int64) bool { +func (s *Picker) IsTruncateTableID(id int64) bool { _, ok := s.truncateTableID[id] return ok } -func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) { +func (s *Picker) getSchemaTableAndDelete(version int64) (string, string, error) { schemaTable, ok := s.version2SchemaTable[version] if !ok { return "", "", errors.NotFoundf("version: %d", version) diff --git a/cdc/schema_test.go b/pkg/schema/schema_picker_test.go similarity index 92% rename from cdc/schema_test.go rename to pkg/schema/schema_picker_test.go index b46460d6a98..d2568ce613a 100644 --- a/cdc/schema_test.go +++ b/pkg/schema/schema_picker_test.go @@ -11,10 +11,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package schema import ( "fmt" + "testing" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -23,8 +24,11 @@ import ( "github.com/pingcap/tidb/types" ) +// TODO run the test type schemaSuite struct{} +func Test(t *testing.T) { TestingT(t) } + var _ = Suite(&schemaSuite{}) func (t *schemaSuite) TestSchema(c *C) { @@ -59,9 +63,9 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone}) // reconstruct the local schema - schema, err := NewSchema(jobs, false) + schema, err := NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(123) + err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(err, IsNil) // test drop schema @@ -76,18 +80,18 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchema(jobs, false) + schema, err = NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(124) + err = schema.HandlePreviousDDLJobIfNeed(124) c.Assert(err, IsNil) // test create schema already exist error jobs = jobs[:0] jobs = append(jobs, job) jobs = append(jobs, jobDup) - schema, err = NewSchema(jobs, false) + schema, err = NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(125) + err = schema.HandlePreviousDDLJobIfNeed(125) c.Log(err) c.Assert(errors.IsAlreadyExists(err), IsTrue) @@ -104,9 +108,9 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchema(jobs, false) + schema, err = NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(123) + err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(errors.IsNotFound(err), IsTrue) } @@ -202,9 +206,9 @@ func (*schemaSuite) TestTable(c *C) { jobs = append(jobs, job) // reconstruct the local schema - schema, err := NewSchema(jobs, false) + schema, err := NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema.handlePreviousDDLJobIfNeed(126) + err = schema.HandlePreviousDDLJobIfNeed(126) c.Assert(err, IsNil) // check the historical db that constructed above whether in the schema list of local schema @@ -233,9 +237,9 @@ func (*schemaSuite) TestTable(c *C) { Query: "truncate table " + tbName.O, }, ) - schema1, err := NewSchema(jobs, false) + schema1, err := NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema1.handlePreviousDDLJobIfNeed(127) + err = schema1.HandlePreviousDDLJobIfNeed(127) c.Assert(err, IsNil) _, ok = schema1.TableByID(tblInfo1.ID) c.Assert(ok, IsTrue) @@ -255,9 +259,9 @@ func (*schemaSuite) TestTable(c *C) { Query: "drop table " + tbName.O, }, ) - schema2, err := NewSchema(jobs, false) + schema2, err := NewSchemaPicker(jobs, false) c.Assert(err, IsNil) - err = schema2.handlePreviousDDLJobIfNeed(128) + err = schema2.HandlePreviousDDLJobIfNeed(128) c.Assert(err, IsNil) _, ok = schema2.TableByID(tblInfo.ID) @@ -273,7 +277,7 @@ func (*schemaSuite) TestTable(c *C) { } func (t *schemaSuite) TestHandleDDL(c *C) { - schema, err := NewSchema(nil, false) + schema, err := NewSchemaPicker(nil, false) c.Assert(err, IsNil) dbName := model.NewCIStr("Test") colName := model.NewCIStr("A") @@ -282,13 +286,13 @@ func (t *schemaSuite) TestHandleDDL(c *C) { // check rollback done job job := &model.Job{ID: 1, State: model.JobStateRollbackDone} - _, _, sql, err := schema.handleDDL(job) + _, _, sql, err := schema.HandleDDL(job) c.Assert(err, IsNil) c.Assert(sql, Equals, "") // check job.Query is empty job = &model.Job{ID: 1, State: model.JobStateDone} - _, _, sql, err = schema.handleDDL(job) + _, _, sql, err = schema.HandleDDL(job) c.Assert(sql, Equals, "") c.Assert(err, NotNil, Commentf("should return not found job.Query")) @@ -396,8 +400,8 @@ func (t *schemaSuite) TestAddImplicitColumn(c *C) { c.Assert(tbl.Indices[0].Primary, IsTrue) } -func testDoDDLAndCheck(c *C, schema *Schema, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { - schemaName, tableName, resSQL, err := schema.handleDDL(job) +func testDoDDLAndCheck(c *C, schema *Picker, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { + schemaName, tableName, resSQL, err := schema.HandleDDL(job) c.Logf("handle: %s", job.Query) c.Logf("result: %s, %s, %s, %v", schemaName, tableName, resSQL, err) c.Assert(err != nil, Equals, isErr) From a56e8e9eef591e4c59e81cf6d7418fe90ae20423 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 16:08:56 +0800 Subject: [PATCH 03/10] move txn to cdc/txn/ --- cdc/buffer.go | 19 +--- cdc/buffer_test.go | 2 +- cdc/cached_inspector.go | 10 ++- cdc/capture.go | 6 -- cdc/cdc_test.go | 12 ++- cdc/changefeed.go | 11 +-- cdc/kv/client.go | 22 ++++- cdc/mysql.go | 50 ++++++----- cdc/mysql_test.go | 61 ++++++------- cdc/puller.go | 9 +- cdc/sink.go | 7 +- cdc/span_frontier.go | 2 +- cdc/span_frontier_test.go | 2 +- cdc/{ => txn}/txn.go | 50 +++++------ cdc/{ => txn}/txn_test.go | 90 +++++++++---------- cdc/util.go | 42 +-------- cmd/debug.go | 2 +- {cdc => pkg}/util/overlap_merge.go | 0 {cdc => pkg}/util/overlap_merge_test.go | 0 cdc/util/util.go => pkg/util/span.go | 2 +- .../util_test.go => pkg/util/span_test.go | 4 - pkg/util/string.go | 43 +++++++++ 22 files changed, 231 insertions(+), 215 deletions(-) rename cdc/{ => txn}/txn.go (85%) rename cdc/{ => txn}/txn_test.go (87%) rename {cdc => pkg}/util/overlap_merge.go (100%) rename {cdc => pkg}/util/overlap_merge_test.go (100%) rename cdc/util/util.go => pkg/util/span.go (97%) rename cdc/util/util_test.go => pkg/util/span_test.go (97%) create mode 100644 pkg/util/string.go diff --git a/cdc/buffer.go b/cdc/buffer.go index 8009a038a81..3fa6b91e0ec 100644 --- a/cdc/buffer.go +++ b/cdc/buffer.go @@ -4,24 +4,11 @@ import ( "context" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) // buffer entry from kv layer -type BufferEntry struct { - KV *kv.RawKVEntry - Resolved *ResolvedSpan -} - -func (e *BufferEntry) GetValue() interface{} { - if e.KV != nil { - return e.KV - } else if e.Resolved != nil { - return e.Resolved - } else { - return nil - } -} +type BufferEntry = kv.KvOrResolved // Buffer buffers kv entries type Buffer chan BufferEntry @@ -44,7 +31,7 @@ func (b Buffer) AddKVEntry(ctx context.Context, kv *kv.RawKVEntry) error { } func (b Buffer) AddResolved(ctx context.Context, span util.Span, ts uint64) error { - return b.AddEntry(ctx, BufferEntry{Resolved: &ResolvedSpan{Span: span, Timestamp: ts}}) + return b.AddEntry(ctx, BufferEntry{Resolved: &kv.ResolvedSpan{Span: span, Timestamp: ts}}) } func (b Buffer) Get(ctx context.Context) (BufferEntry, error) { diff --git a/cdc/buffer_test.go b/cdc/buffer_test.go index 2e61f15dcd0..d9419a91c98 100644 --- a/cdc/buffer_test.go +++ b/cdc/buffer_test.go @@ -18,7 +18,7 @@ import ( "sync" "time" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" diff --git a/cdc/cached_inspector.go b/cdc/cached_inspector.go index c937933c381..04eb600e880 100644 --- a/cdc/cached_inspector.go +++ b/cdc/cached_inspector.go @@ -13,7 +13,11 @@ package cdc -import "database/sql" +import ( + "database/sql" + + "github.com/pingcap/tidb-cdc/pkg/util" +) type cachedInspector struct { db *sql.DB @@ -32,7 +36,7 @@ func newCachedInspector(db *sql.DB) *cachedInspector { var _ tableInspector = &cachedInspector{} func (i *cachedInspector) Get(schema, table string) (*tableInfo, error) { - key := quoteSchema(schema, table) + key := util.QuoteSchema(schema, table) t, ok := i.cache[key] if !ok { var err error @@ -46,6 +50,6 @@ func (i *cachedInspector) Get(schema, table string) (*tableInfo, error) { } func (i *cachedInspector) Refresh(schema, table string) { - key := quoteSchema(schema, table) + key := util.QuoteSchema(schema, table) delete(i.cache, key) } diff --git a/cdc/capture.go b/cdc/capture.go index a4be4761182..075fa39b1ee 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/roles" - "github.com/pingcap/tidb-cdc/cdc/util" "github.com/pingcap/tidb-cdc/pkg/flags" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store" @@ -36,11 +35,6 @@ const ( CaptureOwnerKey = kv.EtcdKeyBase + "/capture/owner" ) -type ResolvedSpan struct { - Span util.Span - Timestamp uint64 -} - // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules SubChangeFeed on it. type Capture struct { id string diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index 31e971240e9..e40692c4191 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -6,21 +6,25 @@ import ( "fmt" "math" "strconv" + "testing" "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb-cdc/pkg/schema" + "github.com/pingcap/tidb-cdc/pkg/util" ) +func TestSuite(t *testing.T) { TestingT(t) } + type CDCSuite struct { database string puller *mock.MockTiDB mock sqlmock.Sqlmock - mounter *TxnMounter + mounter *txn.Mounter sink Sink } @@ -67,7 +71,7 @@ func NewCDCSuite() *CDCSuite { } cdcSuite.sink = sink - mounter, err := NewTxnMounter(schema, time.Local) + mounter, err := txn.NewTxnMounter(schema, time.Local) if err != nil { panic(err.Error()) } @@ -87,7 +91,7 @@ func (s *CDCSuite) RunAndCheckSync(c *C, execute func(func(string, ...interface{ rawKVs = append(rawKVs, kvs...) } execute(executeSql) - txn, err := s.mounter.Mount(RawTxn{ts: rawKVs[len(rawKVs)-1].Ts, entries: rawKVs}) + txn, err := s.mounter.Mount(txn.RawTxn{TS: rawKVs[len(rawKVs)-1].Ts, Entries: rawKVs}) c.Assert(err, IsNil) err = s.sink.Emit(context.Background(), *txn) c.Assert(err, IsNil) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index fd62df6ba3f..c15c7ac58e7 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -24,8 +24,9 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb-cdc/pkg/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -75,7 +76,7 @@ type SubChangeFeed struct { watchs []util.Span schema *schema.Picker - mounter *TxnMounter + mounter *txn.Mounter // sink is the Sink to write rows to. // Resolved timestamps are never written by Capture @@ -108,7 +109,7 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange } // TODO: get time zone from config - mounter, err := NewTxnMounter(schema, time.UTC) + mounter, err := txn.NewTxnMounter(schema, time.UTC) if err != nil { return nil, errors.Trace(err) } @@ -204,8 +205,8 @@ func (c *SubChangeFeed) startOnSpan(ctx context.Context, span util.Span, errCh c return puller } -func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn RawTxn) error { - log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.entries)) +func (c *SubChangeFeed) writeToSink(context context.Context, rawTxn txn.RawTxn) error { + log.Info("RawTxn", zap.Reflect("RawTxn", rawTxn.Entries)) txn, err := c.mounter.Mount(rawTxn) if err != nil { return errors.Trace(err) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 53323cd060d..34cf580b4ae 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -12,7 +12,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" pd "github.com/pingcap/pd/client" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -35,6 +35,26 @@ const ( type RawKVEntry = RegionFeedValue +type KvOrResolved struct { + KV *RawKVEntry + Resolved *ResolvedSpan +} + +type ResolvedSpan struct { + Span util.Span + Timestamp uint64 +} + +func (e *KvOrResolved) GetValue() interface{} { + if e.KV != nil { + return e.KV + } else if e.Resolved != nil { + return e.Resolved + } else { + return nil + } +} + // RegionFeedEvent from the kv layer. // Only one of the event will be setted. type RegionFeedEvent struct { diff --git a/cdc/mysql.go b/cdc/mysql.go index cd0f4397029..81303356582 100644 --- a/cdc/mysql.go +++ b/cdc/mysql.go @@ -22,6 +22,8 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" @@ -57,7 +59,7 @@ type mysqlSink struct { var _ Sink = &mysqlSink{} -func (s *mysqlSink) Emit(ctx context.Context, txn Txn) error { +func (s *mysqlSink) Emit(ctx context.Context, txn txn.Txn) error { filterBySchemaAndTable(&txn) if len(txn.DMLs) == 0 && txn.DDL == nil { log.Info("Whole txn ignored", zap.Uint64("ts", txn.Ts)) @@ -78,20 +80,20 @@ func (s *mysqlSink) Emit(ctx context.Context, txn Txn) error { return errors.Trace(s.execDMLs(ctx, dmls)) } -func filterBySchemaAndTable(txn *Txn) { +func filterBySchemaAndTable(t *txn.Txn) { toIgnore := regexp.MustCompile("(?i)^(INFORMATION_SCHEMA|PERFORMANCE_SCHEMA|MYSQL)$") - if txn.IsDDL() { - if toIgnore.MatchString(txn.DDL.Database) { - txn.DDL = nil + if t.IsDDL() { + if toIgnore.MatchString(t.DDL.Database) { + t.DDL = nil } } else { - filteredDMLs := make([]*DML, 0, len(txn.DMLs)) - for _, dml := range txn.DMLs { + filteredDMLs := make([]*txn.DML, 0, len(t.DMLs)) + for _, dml := range t.DMLs { if !toIgnore.MatchString(dml.Database) { filteredDMLs = append(filteredDMLs, dml) } } - txn.DMLs = filteredDMLs + t.DMLs = filteredDMLs } } @@ -107,7 +109,7 @@ func (s *mysqlSink) Close() error { return nil } -func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *DDL, maxRetries uint64) error { +func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *txn.DDL, maxRetries uint64) error { retryCfg := backoff.WithMaxRetries( backoff.WithContext( backoff.NewExponentialBackOff(), ctx), @@ -125,7 +127,7 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *DDL, maxRetr }, retryCfg) } -func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { +func (s *mysqlSink) execDDL(ctx context.Context, ddl *txn.DDL) error { shouldSwitchDB := len(ddl.Database) > 0 && ddl.Type != model.ActionCreateSchema tx, err := s.db.BeginTx(ctx, nil) @@ -134,7 +136,7 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { } if shouldSwitchDB { - _, err = tx.ExecContext(ctx, "USE "+quoteName(ddl.Database)+";") + _, err = tx.ExecContext(ctx, "USE "+util.QuoteName(ddl.Database)+";") if err != nil { tx.Rollback() return err @@ -154,18 +156,18 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *DDL) error { return nil } -func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*DML) error { +func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*txn.DML) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } for _, dml := range dmls { - var fPrepare func(*DML) (string, []interface{}, error) + var fPrepare func(*txn.DML) (string, []interface{}, error) switch dml.Tp { - case InsertDMLType, UpdateDMLType: + case txn.InsertDMLType, txn.UpdateDMLType: fPrepare = s.prepareReplace - case DeleteDMLType: + case txn.DeleteDMLType: fPrepare = s.prepareDelete default: return fmt.Errorf("invalid dml type: %v", dml.Tp) @@ -189,8 +191,8 @@ func (s *mysqlSink) execDMLs(ctx context.Context, dmls []*DML) error { return nil } -func (s *mysqlSink) formatDMLs(dmls []*DML) ([]*DML, error) { - result := make([]*DML, 0, len(dmls)) +func (s *mysqlSink) formatDMLs(dmls []*txn.DML) ([]*txn.DML, error) { + result := make([]*txn.DML, 0, len(dmls)) for _, dml := range dmls { tableInfo, ok := s.getTableDefinition(dml.Database, dml.Table) if !ok { @@ -215,16 +217,16 @@ func (s *mysqlSink) getTableDefinition(schema, table string) (*model.TableInfo, return tableInfo, ok } -func (s *mysqlSink) prepareReplace(dml *DML) (string, []interface{}, error) { +func (s *mysqlSink) prepareReplace(dml *txn.DML) (string, []interface{}, error) { info, err := s.tblInspector.Get(dml.Database, dml.Table) if err != nil { return "", nil, err } var builder strings.Builder - cols := "(" + buildColumnList(info.columns) + ")" - tblName := quoteSchema(dml.Database, dml.Table) + cols := "(" + util.BuildColumnList(info.columns) + ")" + tblName := util.QuoteSchema(dml.Database, dml.Table) builder.WriteString("REPLACE INTO " + tblName + cols + " VALUES ") - builder.WriteString("(" + holderString(len(info.columns)) + ");") + builder.WriteString("(" + util.HolderString(len(info.columns)) + ");") args := make([]interface{}, 0, len(info.columns)) for _, name := range info.columns { @@ -238,7 +240,7 @@ func (s *mysqlSink) prepareReplace(dml *DML) (string, []interface{}, error) { return builder.String(), args, nil } -func (s *mysqlSink) prepareDelete(dml *DML) (string, []interface{}, error) { +func (s *mysqlSink) prepareDelete(dml *txn.DML) (string, []interface{}, error) { info, err := s.tblInspector.Get(dml.Database, dml.Table) if err != nil { return "", nil, err @@ -254,9 +256,9 @@ func (s *mysqlSink) prepareDelete(dml *DML) (string, []interface{}, error) { builder.WriteString(" AND ") } if wargs[i].IsNull() { - builder.WriteString(quoteName(colNames[i]) + " IS NULL") + builder.WriteString(util.QuoteName(colNames[i]) + " IS NULL") } else { - builder.WriteString(quoteName(colNames[i]) + " = ?") + builder.WriteString(util.QuoteName(colNames[i]) + " = ?") args = append(args, wargs[i].GetValue()) } } diff --git a/cdc/mysql_test.go b/cdc/mysql_test.go index fa34e21c548..4768a66bd4e 100644 --- a/cdc/mysql_test.go +++ b/cdc/mysql_test.go @@ -18,6 +18,7 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/parser/model" @@ -50,8 +51,8 @@ func (s EmitSuite) TestShouldExecDDL(c *check.C) { tblInspector: dummyInspector{}, } - txn := Txn{ - DDL: &DDL{ + t := txn.Txn{ + DDL: &txn.DDL{ Database: "test", Table: "user", SQL: "CREATE TABLE user (id INT PRIMARY KEY);", @@ -60,11 +61,11 @@ func (s EmitSuite) TestShouldExecDDL(c *check.C) { mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec(txn.DDL.SQL).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(t.DDL.SQL).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -82,8 +83,8 @@ func (s EmitSuite) TestShouldIgnoreCertainDDLError(c *check.C) { tblInspector: dummyInspector{}, } - txn := Txn{ - DDL: &DDL{ + t := txn.Txn{ + DDL: &txn.DDL{ Database: "test", Table: "user", SQL: "CREATE TABLE user (id INT PRIMARY KEY);", @@ -95,10 +96,10 @@ func (s EmitSuite) TestShouldIgnoreCertainDDLError(c *check.C) { ignorable := dmysql.MySQLError{ Number: uint16(infoschema.ErrTableExists.Code()), } - mock.ExpectExec(txn.DDL.SQL).WillReturnError(&ignorable) + mock.ExpectExec(t.DDL.SQL).WillReturnError(&ignorable) // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -161,12 +162,12 @@ func (s EmitSuite) TestShouldExecReplaceInto(c *check.C) { infoGetter: &helper, } - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ { Database: "test", Table: "user", - Tp: InsertDMLType, + Tp: txn.InsertDMLType, Values: map[string]dbtypes.Datum{ "id": dbtypes.NewDatum(42), "name": dbtypes.NewDatum("tester1"), @@ -182,7 +183,7 @@ func (s EmitSuite) TestShouldExecReplaceInto(c *check.C) { mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -202,12 +203,12 @@ func (s EmitSuite) TestShouldExecDelete(c *check.C) { infoGetter: &helper, } - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ { Database: "test", Table: "user", - Tp: DeleteDMLType, + Tp: txn.DeleteDMLType, Values: map[string]dbtypes.Datum{ "id": dbtypes.NewDatum(123), "name": dbtypes.NewDatum("tester1"), @@ -223,7 +224,7 @@ func (s EmitSuite) TestShouldExecDelete(c *check.C) { mock.ExpectCommit() // Execute - err = sink.Emit(context.Background(), txn) + err = sink.Emit(context.Background(), t) // Validate c.Assert(err, check.IsNil) @@ -235,8 +236,8 @@ type FilterSuite struct{} var _ = check.Suite(&FilterSuite{}) func (s *FilterSuite) TestFilterDMLs(c *check.C) { - txn := Txn{ - DMLs: []*DML{ + t := txn.Txn{ + DMLs: []*txn.DML{ {Database: "INFORMATIOn_SCHEmA"}, {Database: "test"}, {Database: "test_mysql"}, @@ -244,21 +245,21 @@ func (s *FilterSuite) TestFilterDMLs(c *check.C) { }, Ts: 213, } - filterBySchemaAndTable(&txn) - c.Assert(txn.Ts, check.Equals, uint64(213)) - c.Assert(txn.DDL, check.IsNil) - c.Assert(txn.DMLs, check.HasLen, 2) - c.Assert(txn.DMLs[0].Database, check.Equals, "test") - c.Assert(txn.DMLs[1].Database, check.Equals, "test_mysql") + filterBySchemaAndTable(&t) + c.Assert(t.Ts, check.Equals, uint64(213)) + c.Assert(t.DDL, check.IsNil) + c.Assert(t.DMLs, check.HasLen, 2) + c.Assert(t.DMLs[0].Database, check.Equals, "test") + c.Assert(t.DMLs[1].Database, check.Equals, "test_mysql") } func (s *FilterSuite) TestFilterDDL(c *check.C) { - txn := Txn{ - DDL: &DDL{Database: "performance_schema"}, + t := txn.Txn{ + DDL: &txn.DDL{Database: "performance_schema"}, Ts: 10234, } - filterBySchemaAndTable(&txn) - c.Assert(txn.Ts, check.Equals, uint64((10234))) - c.Assert(txn.DMLs, check.HasLen, 0) - c.Assert(txn.DDL, check.IsNil) + filterBySchemaAndTable(&t) + c.Assert(t.Ts, check.Equals, uint64((10234))) + c.Assert(t.DMLs, check.HasLen, 0) + c.Assert(t.DDL, check.IsNil) } diff --git a/cdc/puller.go b/cdc/puller.go index 3543ec02516..d140bcb589e 100644 --- a/cdc/puller.go +++ b/cdc/puller.go @@ -20,7 +20,8 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/util" "golang.org/x/sync/errgroup" ) @@ -31,7 +32,7 @@ type Puller struct { spans []util.Span detail ChangeFeedDetail buf Buffer - tsTracker resolveTsTracker + tsTracker txn.ResolveTsTracker } // NewPuller create a new Puller fetch event start from checkpointTS @@ -125,6 +126,6 @@ func (p *Puller) GetResolvedTs() uint64 { return p.tsTracker.Frontier() } -func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, RawTxn) error) error { - return collectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker) +func (p *Puller) CollectRawTxns(ctx context.Context, outputFn func(context.Context, txn.RawTxn) error) error { + return txn.CollectRawTxns(ctx, p.buf.Get, outputFn, p.tsTracker) } diff --git a/cdc/sink.go b/cdc/sink.go index 92d468b9f9d..3fd1097c31a 100644 --- a/cdc/sink.go +++ b/cdc/sink.go @@ -20,11 +20,12 @@ import ( "io" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-cdc/cdc/txn" ) // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { - Emit(ctx context.Context, txn Txn) error + Emit(ctx context.Context, t txn.Txn) error EmitResolvedTimestamp( ctx context.Context, resolved uint64, @@ -68,8 +69,8 @@ type writerSink struct { var _ Sink = &writerSink{} -func (s *writerSink) Emit(ctx context.Context, txn Txn) error { - fmt.Fprintf(s, "commit ts: %d", txn.Ts) +func (s *writerSink) Emit(ctx context.Context, t txn.Txn) error { + fmt.Fprintf(s, "commit ts: %d", t.Ts) return nil } diff --git a/cdc/span_frontier.go b/cdc/span_frontier.go index 677fe01e011..b7b6cef14ad 100644 --- a/cdc/span_frontier.go +++ b/cdc/span_frontier.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/biogo/store/interval" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) // Generic intervals diff --git a/cdc/span_frontier_test.go b/cdc/span_frontier_test.go index 71fdda33815..ff172d44e09 100644 --- a/cdc/span_frontier_test.go +++ b/cdc/span_frontier_test.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/pingcap/check" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) type spanFrontierSuite struct{} diff --git a/cdc/txn.go b/cdc/txn/txn.go similarity index 85% rename from cdc/txn.go rename to cdc/txn/txn.go index 572af0a061a..af4889d1da6 100644 --- a/cdc/txn.go +++ b/cdc/txn/txn.go @@ -11,15 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package txn import ( "context" "sort" "time" - "github.com/pingcap/tidb-cdc/cdc/util" "github.com/pingcap/tidb-cdc/pkg/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -31,10 +31,10 @@ import ( "go.uber.org/zap" ) -// RawTxn represents a complete collection of entries that belong to the same transaction +// RawTxn represents a complete collection of Entries that belong to the same transaction type RawTxn struct { - ts uint64 - entries []*kv.RawKVEntry + TS uint64 + Entries []*kv.RawKVEntry } // DMLType represents the dml type @@ -60,7 +60,7 @@ type DML struct { // TableName returns the fully qualified name of the DML's table func (dml *DML) TableName() string { - return quoteSchema(dml.Database, dml.Table) + return util.QuoteSchema(dml.Database, dml.Table) } // DDL holds the ddl info @@ -84,16 +84,16 @@ func (t Txn) IsDDL() bool { return t.DDL != nil } -type resolveTsTracker interface { +type ResolveTsTracker interface { Forward(span util.Span, ts uint64) bool Frontier() uint64 } -func collectRawTxns( +func CollectRawTxns( ctx context.Context, - inputFn func(context.Context) (BufferEntry, error), + inputFn func(context.Context) (kv.KvOrResolved, error), outputFn func(context.Context, RawTxn) error, - tracker resolveTsTracker, + tracker ResolveTsTracker, ) error { entryGroups := make(map[uint64][]*kv.RawKVEntry) for { @@ -106,9 +106,9 @@ func collectRawTxns( } else if be.Resolved != nil { resolvedTs := be.Resolved.Timestamp // 1. Forward is called in a single thread - // 2. The only way the global minimum resolved ts can be forwarded is that + // 2. The only way the global minimum resolved TS can be forwarded is that // the resolveTs we pass in replaces the original one - // Thus, we can just use resolvedTs here as the new global minimum resolved ts. + // Thus, we can just use resolvedTs here as the new global minimum resolved TS. forwarded := tracker.Forward(be.Resolved.Span, resolvedTs) if !forwarded { continue @@ -122,7 +122,7 @@ func collectRawTxns( } // TODO: Handle the case when readyTsList is empty sort.Slice(readyTxns, func(i, j int) bool { - return readyTxns[i].ts < readyTxns[j].ts + return readyTxns[i].TS < readyTxns[j].TS }) for _, t := range readyTxns { err := outputFn(ctx, t) @@ -134,26 +134,26 @@ func collectRawTxns( } } -type TxnMounter struct { +type Mounter struct { schema *schema.Picker loc *time.Location } -func NewTxnMounter(schema *schema.Picker, loc *time.Location) (*TxnMounter, error) { - m := &TxnMounter{schema: schema, loc: loc} +func NewTxnMounter(schema *schema.Picker, loc *time.Location) (*Mounter, error) { + m := &Mounter{schema: schema, loc: loc} return m, nil } -func (m *TxnMounter) Mount(rawTxn RawTxn) (*Txn, error) { +func (m *Mounter) Mount(rawTxn RawTxn) (*Txn, error) { txn := &Txn{ - Ts: rawTxn.ts, + Ts: rawTxn.TS, } var replaceDMLs, deleteDMLs []*DML - err := m.schema.HandlePreviousDDLJobIfNeed(rawTxn.ts) + err := m.schema.HandlePreviousDDLJobIfNeed(rawTxn.TS) if err != nil { return nil, errors.Trace(err) } - for _, raw := range rawTxn.entries { + for _, raw := range rawTxn.Entries { kvEntry, err := entry.Unmarshal(raw) if err != nil { return nil, errors.Trace(err) @@ -194,7 +194,7 @@ func (m *TxnMounter) Mount(rawTxn RawTxn) (*Txn, error) { return txn, nil } -func (m *TxnMounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { +func (m *Mounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { tableInfo, tableName, handleColName, err := m.fetchTableInfo(row.TableId) if err != nil { return nil, errors.Trace(err) @@ -234,7 +234,7 @@ func (m *TxnMounter) mountRowKVEntry(row *entry.RowKVEntry) (*DML, error) { }, nil } -func (m *TxnMounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { +func (m *Mounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { tableInfo, tableName, _, err := m.fetchTableInfo(idx.TableId) if err != nil { return nil, errors.Trace(err) @@ -262,7 +262,7 @@ func (m *TxnMounter) mountIndexKVEntry(idx *entry.IndexKVEntry) (*DML, error) { }, nil } -func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *schema.TableName, handleColName string, err error) { +func (m *Mounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tableName *schema.TableName, handleColName string, err error) { tableInfo, exist := m.schema.TableByID(tableId) if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) @@ -289,7 +289,7 @@ func (m *TxnMounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, return } -func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, error) { +func (m *Mounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, error) { var databaseName, tableName string var err error getTableName := false @@ -322,7 +322,7 @@ func (m *TxnMounter) mountDDL(jobHistory *entry.DDLJobHistoryKVEntry) (*DDL, err }, nil } -func (m *TxnMounter) tryGetTableName(jobHistory *entry.DDLJobHistoryKVEntry) (databaseName string, tableName string, err error) { +func (m *Mounter) tryGetTableName(jobHistory *entry.DDLJobHistoryKVEntry) (databaseName string, tableName string, err error) { if tableId := jobHistory.Job.TableID; tableId > 0 { var exist bool databaseName, tableName, exist = m.schema.SchemaAndTableName(tableId) diff --git a/cdc/txn_test.go b/cdc/txn/txn_test.go similarity index 87% rename from cdc/txn_test.go rename to cdc/txn/txn_test.go index 35faf6c5dd6..9e2edfe0456 100644 --- a/cdc/txn_test.go +++ b/cdc/txn/txn_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package txn import ( "context" @@ -21,8 +21,8 @@ import ( "testing" "time" - "github.com/pingcap/tidb-cdc/cdc/util" "github.com/pingcap/tidb-cdc/pkg/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -58,12 +58,12 @@ func (t *mockTracker) Frontier() uint64 { var _ = check.Suite(&CollectRawTxnsSuite{}) func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { - var entries []BufferEntry + var entries []kv.KvOrResolved var startTs uint64 = 1024 var i uint64 for i = 0; i < 3; i++ { for j := 0; j < 3; j++ { - e := BufferEntry{ + e := kv.KvOrResolved{ KV: &kv.RawKVEntry{ OpType: kv.OpTypePut, Key: []byte(fmt.Sprintf("key-%d-%d", i, j)), @@ -75,16 +75,16 @@ func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { } // Only add resolved entry for the first 2 transaction for i = 0; i < 2; i++ { - e := BufferEntry{ - Resolved: &ResolvedSpan{Timestamp: startTs + i}, + e := kv.KvOrResolved{ + Resolved: &kv.ResolvedSpan{Timestamp: startTs + i}, } entries = append(entries, e) } nRead := 0 - input := func(ctx context.Context) (BufferEntry, error) { + input := func(ctx context.Context) (kv.KvOrResolved, error) { if nRead >= len(entries) { - return BufferEntry{}, errors.New("End") + return kv.KvOrResolved{}, errors.New("End") } e := entries[nRead] nRead++ @@ -98,24 +98,24 @@ func (cs *CollectRawTxnsSuite) TestShouldOutputTxnsInOrder(c *check.C) { } ctx := context.Background() - err := collectRawTxns(ctx, input, output, &mockTracker{}) + err := CollectRawTxns(ctx, input, output, &mockTracker{}) c.Assert(err, check.ErrorMatches, "End") c.Assert(rawTxns, check.HasLen, 2) - c.Assert(rawTxns[0].ts, check.Equals, startTs) - for i, e := range rawTxns[0].entries { + c.Assert(rawTxns[0].TS, check.Equals, startTs) + for i, e := range rawTxns[0].Entries { c.Assert(e.Ts, check.Equals, startTs) c.Assert(string(e.Key), check.Equals, fmt.Sprintf("key-0-%d", i)) } - c.Assert(rawTxns[1].ts, check.Equals, startTs+1) - for i, e := range rawTxns[1].entries { + c.Assert(rawTxns[1].TS, check.Equals, startTs+1) + for i, e := range rawTxns[1].Entries { c.Assert(e.Ts, check.Equals, startTs+1) c.Assert(string(e.Key), check.Equals, fmt.Sprintf("key-1-%d", i)) } } func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { - var entries []BufferEntry + var entries []kv.KvOrResolved for _, v := range []struct { key []byte ts uint64 @@ -130,13 +130,13 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { {key: []byte("key2-1"), ts: 2}, {ts: 1, isResolvedTs: true}, } { - var e BufferEntry + var e kv.KvOrResolved if v.isResolvedTs { - e = BufferEntry{ - Resolved: &ResolvedSpan{Timestamp: v.ts}, + e = kv.KvOrResolved{ + Resolved: &kv.ResolvedSpan{Timestamp: v.ts}, } } else { - e = BufferEntry{ + e = kv.KvOrResolved{ KV: &kv.RawKVEntry{ OpType: kv.OpTypePut, Key: v.key, @@ -148,9 +148,9 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { } cursor := 0 - input := func(ctx context.Context) (BufferEntry, error) { + input := func(ctx context.Context) (kv.KvOrResolved, error) { if cursor >= len(entries) { - return BufferEntry{}, errors.New("End") + return kv.KvOrResolved{}, errors.New("End") } e := entries[cursor] cursor++ @@ -164,18 +164,18 @@ func (cs *CollectRawTxnsSuite) TestShouldConsiderSpanResolvedTs(c *check.C) { } ctx := context.Background() - // Set up the tracker so that only the last resolve event forwards the global minimum ts + // Set up the tracker so that only the last resolve event forwards the global minimum TS tracker := mockTracker{forwarded: []bool{false, false, true}} - err := collectRawTxns(ctx, input, output, &tracker) + err := CollectRawTxns(ctx, input, output, &tracker) c.Assert(err, check.ErrorMatches, "End") c.Assert(rawTxns, check.HasLen, 1) txn := rawTxns[0] - c.Assert(txn.ts, check.Equals, uint64(1)) - c.Assert(txn.entries, check.HasLen, 3) - c.Assert(string(txn.entries[0].Key), check.Equals, "key1-1") - c.Assert(string(txn.entries[1].Key), check.Equals, "key1-2") - c.Assert(string(txn.entries[2].Key), check.Equals, "key1-3") + c.Assert(txn.TS, check.Equals, uint64(1)) + c.Assert(txn.Entries, check.HasLen, 3) + c.Assert(string(txn.Entries[0].Key), check.Equals, "key1-1") + c.Assert(string(txn.Entries[1].Key), check.Equals, "key1-2") + c.Assert(string(txn.Entries[2].Key), check.Equals, "key1-3") } type mountTxnsSuite struct{} @@ -213,8 +213,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV := puller.MustExec(c, "insert into testDB.test1 values('ttt',6)") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -242,8 +242,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV = puller.MustExec(c, "update testDB.test1 set id = 'vvv' where a = 6") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -279,8 +279,8 @@ func (cs *mountTxnsSuite) TestInsertPkNotHandle(c *check.C) { rawKV = puller.MustExec(c, "delete from testDB.test1 where a = 6") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -305,8 +305,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV := puller.MustExec(c, "insert into testDB.test1 values(777,888)") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -334,8 +334,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV = puller.MustExec(c, "update testDB.test1 set id = 999 where a = 888") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -371,8 +371,8 @@ func (cs *mountTxnsSuite) TestInsertPkIsHandle(c *check.C) { rawKV = puller.MustExec(c, "delete from testDB.test1 where id = 999") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -404,8 +404,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { c.Assert(err, check.IsNil) rawKV := puller.MustExec(c, "alter table testDB.test1 add b int null") txn, err := mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) c.Assert(txn, check.DeepEquals, &Txn{ @@ -421,8 +421,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { // test insert null value rawKV = puller.MustExec(c, "insert into testDB.test1(id,a) values('ttt',6)") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ @@ -450,8 +450,8 @@ func (cs *mountTxnsSuite) TestDDL(c *check.C) { rawKV = puller.MustExec(c, "insert into testDB.test1(id,a,b) values('kkk',6,7)") txn, err = mounter.Mount(RawTxn{ - ts: rawKV[0].Ts, - entries: rawKV, + TS: rawKV[0].Ts, + Entries: rawKV, }) c.Assert(err, check.IsNil) cs.assertTableTxnEquals(c, txn, &Txn{ diff --git a/cdc/util.go b/cdc/util.go index 7ff631013d8..7c192fbfffb 100644 --- a/cdc/util.go +++ b/cdc/util.go @@ -15,11 +15,11 @@ package cdc import ( gosql "database/sql" - "fmt" "strings" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/errors" @@ -137,44 +137,6 @@ func getTableInfoFromSchema(schema *schema.Picker, schemaName, tableName string) return } -func quoteSchema(schema string, table string) string { - return fmt.Sprintf("`%s`.`%s`", escapeName(schema), escapeName(table)) -} - -func quoteName(name string) string { - - return "`" + escapeName(name) + "`" -} - -func escapeName(name string) string { - return strings.Replace(name, "`", "``", -1) -} - -func holderString(n int) string { - var builder strings.Builder - builder.Grow((n-1)*2 + 1) - for i := 0; i < n; i++ { - if i > 0 { - builder.WriteString(",") - } - builder.WriteString("?") - } - return builder.String() -} - -func buildColumnList(names []string) string { - var b strings.Builder - for i, name := range names { - if i > 0 { - b.WriteString(",") - } - b.WriteString(quoteName(name)) - - } - - return b.String() -} - // getColsOfTbl returns a slice of the names of all columns, // generated columns are excluded. // https://dev.mysql.com/doc/mysql-infoschema-excerpt/5.7/en/columns-table.html @@ -259,7 +221,7 @@ func getUniqKeys(db *gosql.DB, schema, table string) (uniqueKeys []indexInfo, er return } -func isTableChanged(ddl *DDL) bool { +func isTableChanged(ddl *txn.DDL) bool { switch ddl.Type { case model.ActionDropTable, model.ActionDropSchema, model.ActionTruncateTable, model.ActionCreateSchema: return false diff --git a/cmd/debug.go b/cmd/debug.go index 61e923e224a..b633c0d97f5 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -12,7 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/pingcap/tidb-cdc/cdc" - "github.com/pingcap/tidb-cdc/cdc/util" + "github.com/pingcap/tidb-cdc/pkg/util" ) func init() { diff --git a/cdc/util/overlap_merge.go b/pkg/util/overlap_merge.go similarity index 100% rename from cdc/util/overlap_merge.go rename to pkg/util/overlap_merge.go diff --git a/cdc/util/overlap_merge_test.go b/pkg/util/overlap_merge_test.go similarity index 100% rename from cdc/util/overlap_merge_test.go rename to pkg/util/overlap_merge_test.go diff --git a/cdc/util/util.go b/pkg/util/span.go similarity index 97% rename from cdc/util/util.go rename to pkg/util/span.go index 0837c158fae..83fb8c3d6ef 100644 --- a/cdc/util/util.go +++ b/pkg/util/span.go @@ -14,7 +14,7 @@ type Span struct { } // UpperBoundKey represents the maximum value. -var UpperBoundKey []byte = []byte{255, 255, 255, 255, 255} +var UpperBoundKey = []byte{255, 255, 255, 255, 255} // Hack will set End as UpperBoundKey if End is Nil. func (s Span) Hack() Span { diff --git a/cdc/util/util_test.go b/pkg/util/span_test.go similarity index 97% rename from cdc/util/util_test.go rename to pkg/util/span_test.go index cbc05df803e..2b11ae95ec6 100644 --- a/cdc/util/util_test.go +++ b/pkg/util/span_test.go @@ -1,15 +1,11 @@ package util import ( - "testing" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/check" ) -func Test(t *testing.T) { check.TestingT(t) } - type spanSuite struct{} var _ = check.Suite(&spanSuite{}) diff --git a/pkg/util/string.go b/pkg/util/string.go new file mode 100644 index 00000000000..31801fb3d1b --- /dev/null +++ b/pkg/util/string.go @@ -0,0 +1,43 @@ +package util + +import ( + "fmt" + "strings" +) + +func QuoteSchema(schema string, table string) string { + return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table)) +} + +func QuoteName(name string) string { + return "`" + EscapeName(name) + "`" +} + +func EscapeName(name string) string { + return strings.Replace(name, "`", "``", -1) +} + +func HolderString(n int) string { + var builder strings.Builder + builder.Grow((n-1)*2 + 1) + for i := 0; i < n; i++ { + if i > 0 { + builder.WriteString(",") + } + builder.WriteString("?") + } + return builder.String() +} + +func BuildColumnList(names []string) string { + var b strings.Builder + for i, name := range names { + if i > 0 { + b.WriteString(",") + } + b.WriteString(QuoteName(name)) + + } + + return b.String() +} From 481fa7cb34ab6197b6d0f56bf77fee48ffdd3f70 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 16:22:44 +0800 Subject: [PATCH 04/10] move sink to cdc/sink/ --- cdc/cdc_test.go | 25 ++++------------ cdc/changefeed.go | 5 ++-- cdc/{ => sink}/cached_inspector.go | 2 +- cdc/{ => sink}/cached_inspector_test.go | 2 +- cdc/{ => sink}/mysql.go | 38 ++++++++++++++++++++++++- cdc/{ => sink}/mysql_test.go | 2 +- cdc/{ => sink}/sink.go | 22 +------------- cdc/{ => sink}/util.go | 2 +- cdc/{ => sink}/util_test.go | 5 +++- 9 files changed, 55 insertions(+), 48 deletions(-) rename cdc/{ => sink}/cached_inspector.go (99%) rename cdc/{ => sink}/cached_inspector_test.go (99%) rename cdc/{ => sink}/mysql.go (92%) rename cdc/{ => sink}/mysql_test.go (99%) rename cdc/{ => sink}/sink.go (82%) rename cdc/{ => sink}/util.go (99%) rename cdc/{ => sink}/util_test.go (96%) diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index e40692c4191..de00a90d94a 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -2,7 +2,6 @@ package cdc import ( "context" - "database/sql" "fmt" "math" "strconv" @@ -13,6 +12,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" + "github.com/pingcap/tidb-cdc/cdc/sink" "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" @@ -25,7 +25,7 @@ type CDCSuite struct { puller *mock.MockTiDB mock sqlmock.Sqlmock mounter *txn.Mounter - sink Sink + sink sink.Sink } var _ = Suite(NewCDCSuite()) @@ -44,8 +44,8 @@ func NewCDCSuite() *CDCSuite { if err != nil { panic(err.Error()) } - // create a schema - schema, err := schema.NewSchemaPicker(jobs, false) + // create a picker + picker, err := schema.NewSchemaPicker(jobs, false) if err != nil { panic(err.Error()) } @@ -56,22 +56,9 @@ func NewCDCSuite() *CDCSuite { } cdcSuite.mock = mock - inspector := &cachedInspector{ - db: db, - cache: make(map[string]*tableInfo), - tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) { - info, err := getTableInfoFromSchema(schema, schemaName, tableName) - return info, err - }, - } - sink := &mysqlSink{ - db: db, - infoGetter: schema, - tblInspector: inspector, - } - cdcSuite.sink = sink + cdcSuite.sink = sink.NewMySQLSinkUsingSchema(db, picker) - mounter, err := txn.NewTxnMounter(schema, time.Local) + mounter, err := txn.NewTxnMounter(picker, time.Local) if err != nil { panic(err.Error()) } diff --git a/cdc/changefeed.go b/cdc/changefeed.go index c15c7ac58e7..97e094e204e 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" + "github.com/pingcap/tidb-cdc/cdc/sink" "github.com/pingcap/tidb-cdc/cdc/txn" "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" @@ -80,7 +81,7 @@ type SubChangeFeed struct { // sink is the Sink to write rows to. // Resolved timestamps are never written by Capture - sink Sink + sink sink.Sink } func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChangeFeed, error) { @@ -103,7 +104,7 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange return nil, errors.Trace(err) } - sink, err := getSink(detail.SinkURI, schema, detail.Opts) + sink, err := sink.NewMySQLSink(detail.SinkURI, schema, detail.Opts) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/cached_inspector.go b/cdc/sink/cached_inspector.go similarity index 99% rename from cdc/cached_inspector.go rename to cdc/sink/cached_inspector.go index 04eb600e880..a7231414d9f 100644 --- a/cdc/cached_inspector.go +++ b/cdc/sink/cached_inspector.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "database/sql" diff --git a/cdc/cached_inspector_test.go b/cdc/sink/cached_inspector_test.go similarity index 99% rename from cdc/cached_inspector_test.go rename to cdc/sink/cached_inspector_test.go index bde0634f246..76ad2daeeaf 100644 --- a/cdc/cached_inspector_test.go +++ b/cdc/sink/cached_inspector_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "database/sql" diff --git a/cdc/mysql.go b/cdc/sink/mysql.go similarity index 92% rename from cdc/mysql.go rename to cdc/sink/mysql.go index 81303356582..bd70e825847 100644 --- a/cdc/mysql.go +++ b/cdc/sink/mysql.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" @@ -23,6 +23,7 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb-cdc/cdc/txn" + "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" @@ -59,6 +60,41 @@ type mysqlSink struct { var _ Sink = &mysqlSink{} +func NewMySQLSink( + sinkURI string, + infoGetter TableInfoGetter, + opts map[string]string, +) (Sink, error) { + // TODO + db, err := sql.Open("mysql", sinkURI) + if err != nil { + return nil, err + } + cachedInspector := newCachedInspector(db) + sink := mysqlSink{ + db: db, + infoGetter: infoGetter, + tblInspector: cachedInspector, + } + return &sink, nil +} + +func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Picker) Sink { + inspector := &cachedInspector{ + db: db, + cache: make(map[string]*tableInfo), + tableGetter: func(_ *sql.DB, schemaName string, tableName string) (*tableInfo, error) { + info, err := getTableInfoFromSchema(picker, schemaName, tableName) + return info, err + }, + } + return &mysqlSink{ + db: db, + infoGetter: picker, + tblInspector: inspector, + } +} + func (s *mysqlSink) Emit(ctx context.Context, txn txn.Txn) error { filterBySchemaAndTable(&txn) if len(txn.DMLs) == 0 && txn.DDL == nil { diff --git a/cdc/mysql_test.go b/cdc/sink/mysql_test.go similarity index 99% rename from cdc/mysql_test.go rename to cdc/sink/mysql_test.go index 4768a66bd4e..b2855869474 100644 --- a/cdc/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" diff --git a/cdc/sink.go b/cdc/sink/sink.go similarity index 82% rename from cdc/sink.go rename to cdc/sink/sink.go index 3fd1097c31a..46b968ac703 100644 --- a/cdc/sink.go +++ b/cdc/sink/sink.go @@ -11,11 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "context" - "database/sql" "fmt" "io" @@ -44,25 +43,6 @@ type TableInfoGetter interface { GetTableIDByName(schema, table string) (int64, bool) } -func getSink( - sinkURI string, - infoGetter TableInfoGetter, - opts map[string]string, -) (Sink, error) { - // TODO - db, err := sql.Open("mysql", sinkURI) - if err != nil { - return nil, err - } - cachedInspector := newCachedInspector(db) - sink := mysqlSink{ - db: db, - infoGetter: infoGetter, - tblInspector: cachedInspector, - } - return &sink, nil -} - type writerSink struct { io.Writer } diff --git a/cdc/util.go b/cdc/sink/util.go similarity index 99% rename from cdc/util.go rename to cdc/sink/util.go index 7c192fbfffb..8dd84362454 100644 --- a/cdc/util.go +++ b/cdc/sink/util.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( gosql "database/sql" diff --git a/cdc/util_test.go b/cdc/sink/util_test.go similarity index 96% rename from cdc/util_test.go rename to cdc/sink/util_test.go index b617bca3de4..61cdf5484af 100644 --- a/cdc/util_test.go +++ b/cdc/sink/util_test.go @@ -11,15 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cdc +package sink import ( "regexp" + "testing" "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" ) +func TestSuite(t *testing.T) { check.TestingT(t) } + type UtilSuite struct{} var _ = check.Suite(&UtilSuite{}) From f1844882acb9266223a0c0592e8e64535d33b6ff Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 16:36:44 +0800 Subject: [PATCH 05/10] move schema to cdc/schema/ --- cdc/cdc_test.go | 2 +- cdc/changefeed.go | 2 +- {pkg => cdc}/schema/schema_picker.go | 0 {pkg => cdc}/schema/schema_picker_test.go | 0 cdc/sink/mysql.go | 22 ++++++++-------------- cdc/sink/util.go | 2 +- cdc/txn/txn.go | 5 ++--- cdc/txn/txn_test.go | 5 ++--- 8 files changed, 15 insertions(+), 23 deletions(-) rename {pkg => cdc}/schema/schema_picker.go (100%) rename {pkg => cdc}/schema/schema_picker_test.go (100%) diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index de00a90d94a..f7d8cbb81af 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -12,9 +12,9 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" + "github.com/pingcap/tidb-cdc/cdc/schema" "github.com/pingcap/tidb-cdc/cdc/sink" "github.com/pingcap/tidb-cdc/cdc/txn" - "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" ) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 97e094e204e..1ec9f3d63c7 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -24,9 +24,9 @@ import ( "github.com/pingcap/log" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb-cdc/cdc/kv" + "github.com/pingcap/tidb-cdc/cdc/schema" "github.com/pingcap/tidb-cdc/cdc/sink" "github.com/pingcap/tidb-cdc/cdc/txn" - "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/store/tikv/oracle" "go.uber.org/zap" diff --git a/pkg/schema/schema_picker.go b/cdc/schema/schema_picker.go similarity index 100% rename from pkg/schema/schema_picker.go rename to cdc/schema/schema_picker.go diff --git a/pkg/schema/schema_picker_test.go b/cdc/schema/schema_picker_test.go similarity index 100% rename from pkg/schema/schema_picker_test.go rename to cdc/schema/schema_picker_test.go diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index bd70e825847..212a4770420 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -20,29 +20,23 @@ import ( "regexp" "strings" + "github.com/cenkalti/backoff" dmysql "github.com/go-sql-driver/mysql" + "go.uber.org/zap" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb-cdc/cdc/schema" "github.com/pingcap/tidb-cdc/cdc/txn" - "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/tidb-cdc/pkg/util" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/table" - - "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/types" - - "github.com/pingcap/errors" - - "github.com/pingcap/parser/model" - - "github.com/cenkalti/backoff" - _ "github.com/pingcap/tidb/types/parser_driver" - - "github.com/pingcap/log" - "go.uber.org/zap" ) type tableInspector interface { diff --git a/cdc/sink/util.go b/cdc/sink/util.go index 8dd84362454..5f002a083df 100644 --- a/cdc/sink/util.go +++ b/cdc/sink/util.go @@ -19,8 +19,8 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-cdc/cdc/schema" "github.com/pingcap/tidb-cdc/cdc/txn" - "github.com/pingcap/tidb-cdc/pkg/schema" "github.com/pingcap/errors" ) diff --git a/cdc/txn/txn.go b/cdc/txn/txn.go index af4889d1da6..4b5317b70c7 100644 --- a/cdc/txn/txn.go +++ b/cdc/txn/txn.go @@ -18,15 +18,14 @@ import ( "sort" "time" - "github.com/pingcap/tidb-cdc/pkg/schema" - "github.com/pingcap/tidb-cdc/pkg/util" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-cdc/cdc/entry" "github.com/pingcap/tidb-cdc/cdc/kv" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/types" "go.uber.org/zap" ) diff --git a/cdc/txn/txn_test.go b/cdc/txn/txn_test.go index 9e2edfe0456..7486704d802 100644 --- a/cdc/txn/txn_test.go +++ b/cdc/txn/txn_test.go @@ -21,14 +21,13 @@ import ( "testing" "time" - "github.com/pingcap/tidb-cdc/pkg/schema" - "github.com/pingcap/tidb-cdc/pkg/util" - "github.com/pingcap/check" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-cdc/cdc/entry" "github.com/pingcap/tidb-cdc/cdc/kv" "github.com/pingcap/tidb-cdc/cdc/mock" + "github.com/pingcap/tidb-cdc/cdc/schema" + "github.com/pingcap/tidb-cdc/pkg/util" "github.com/pingcap/tidb/types" ) From d02ead9052c4550fe538243dbf03d0e314ede6e2 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 17:50:20 +0800 Subject: [PATCH 06/10] package --- cdc/schema/schema_picker.go | 2 +- cdc/sink/mysql.go | 15 ++++++++++++++- pkg/util/string.go | 13 ------------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cdc/schema/schema_picker.go b/cdc/schema/schema_picker.go index 2cd3ec63d10..841928a13d0 100644 --- a/cdc/schema/schema_picker.go +++ b/cdc/schema/schema_picker.go @@ -48,7 +48,7 @@ type Picker struct { currentVersion int64 } -// TableName specify a Picker name and Table name +// TableName specify a Schema name and Table name type TableName struct { Schema string `toml:"db-name" json:"db-name"` Table string `toml:"tbl-name" json:"tbl-name"` diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 212a4770420..e66dcff22ab 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -253,7 +253,7 @@ func (s *mysqlSink) prepareReplace(dml *txn.DML) (string, []interface{}, error) return "", nil, err } var builder strings.Builder - cols := "(" + util.BuildColumnList(info.columns) + ")" + cols := "(" + buildColumnList(info.columns) + ")" tblName := util.QuoteSchema(dml.Database, dml.Table) builder.WriteString("REPLACE INTO " + tblName + cols + " VALUES ") builder.WriteString("(" + util.HolderString(len(info.columns)) + ");") @@ -429,3 +429,16 @@ func getSQLErrCode(err error) (terror.ErrCode, bool) { return terror.ErrCode(mysqlErr.Number), true } + +func buildColumnList(names []string) string { + var b strings.Builder + for i, name := range names { + if i > 0 { + b.WriteString(",") + } + b.WriteString(util.QuoteName(name)) + + } + + return b.String() +} diff --git a/pkg/util/string.go b/pkg/util/string.go index 31801fb3d1b..88d462d7e6a 100644 --- a/pkg/util/string.go +++ b/pkg/util/string.go @@ -28,16 +28,3 @@ func HolderString(n int) string { } return builder.String() } - -func BuildColumnList(names []string) string { - var b strings.Builder - for i, name := range names { - if i > 0 { - b.WriteString(",") - } - b.WriteString(QuoteName(name)) - - } - - return b.String() -} From 0ae2de740eeec8128c9fa784c1b4c38c45470fc7 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 17:56:42 +0800 Subject: [PATCH 07/10] schema holder --- cdc/cdc_test.go | 2 +- cdc/changefeed.go | 4 +-- cdc/schema/schema_picker.go | 46 ++++++++++++++++---------------- cdc/schema/schema_picker_test.go | 18 ++++++------- cdc/sink/mysql.go | 2 +- cdc/sink/util.go | 2 +- cdc/txn/txn.go | 4 +-- cdc/txn/txn_test.go | 4 +-- 8 files changed, 41 insertions(+), 41 deletions(-) diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index f7d8cbb81af..7f1d95c6f14 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -45,7 +45,7 @@ func NewCDCSuite() *CDCSuite { panic(err.Error()) } // create a picker - picker, err := schema.NewSchemaPicker(jobs, false) + picker, err := schema.NewSchemaHolder(jobs, false) if err != nil { panic(err.Error()) } diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 1ec9f3d63c7..5999ab944c5 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -76,7 +76,7 @@ type SubChangeFeed struct { detail ChangeFeedDetail watchs []util.Span - schema *schema.Picker + schema *schema.Holder mounter *txn.Mounter // sink is the Sink to write rows to. @@ -99,7 +99,7 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange if err != nil { return nil, err } - schema, err := schema.NewSchemaPicker(jobs, false) + schema, err := schema.NewSchemaHolder(jobs, false) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/schema/schema_picker.go b/cdc/schema/schema_picker.go index 841928a13d0..85f0c19058d 100644 --- a/cdc/schema/schema_picker.go +++ b/cdc/schema/schema_picker.go @@ -26,9 +26,9 @@ import ( const implicitColName = "_tidb_rowid" const implicitColID = -1 -// Picker stores the source TiDB all schema infomations +// Holder stores the source TiDB all schema infomations // schema infomations could be changed by drainer init and ddls appear -type Picker struct { +type Holder struct { tableIDToName map[int64]TableName tableNameToId map[TableName]int64 schemaNameToID map[string]int64 @@ -54,9 +54,9 @@ type TableName struct { Table string `toml:"tbl-name" json:"tbl-name"` } -// NewSchemaPicker returns the Picker object -func NewSchemaPicker(jobs []*model.Job, hasImplicitCol bool) (*Picker, error) { - s := &Picker{ +// NewSchemaHolder returns the Holder object +func NewSchemaHolder(jobs []*model.Job, hasImplicitCol bool) (*Holder, error) { + s := &Holder{ hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), truncateTableID: make(map[int64]struct{}), @@ -72,7 +72,7 @@ func NewSchemaPicker(jobs []*model.Job, hasImplicitCol bool) (*Picker, error) { return s, nil } -func (s *Picker) String() string { +func (s *Holder) String() string { mp := map[string]interface{}{ "tableIDToName": s.tableIDToName, "tableNameToId": s.tableNameToId, @@ -89,12 +89,12 @@ func (s *Picker) String() string { } // SchemaMetaVersion returns the current schemaversion in drainer -func (s *Picker) SchemaMetaVersion() int64 { +func (s *Holder) SchemaMetaVersion() int64 { return s.schemaMetaVersion } // SchemaAndTableName returns the tableName by table id -func (s *Picker) SchemaAndTableName(id int64) (string, string, bool) { +func (s *Holder) SchemaAndTableName(id int64) (string, string, bool) { tn, ok := s.tableIDToName[id] if !ok { return "", "", false @@ -104,7 +104,7 @@ func (s *Picker) SchemaAndTableName(id int64) (string, string, bool) { } // GetTableIDByName returns the tableId by table schemaName and tableName -func (s *Picker) GetTableIDByName(schemaName string, tableName string) (int64, bool) { +func (s *Holder) GetTableIDByName(schemaName string, tableName string) (int64, bool) { id, ok := s.tableNameToId[TableName{ Schema: schemaName, Table: tableName, @@ -113,13 +113,13 @@ func (s *Picker) GetTableIDByName(schemaName string, tableName string) (int64, b } // SchemaByID returns the DBInfo by schema id -func (s *Picker) SchemaByID(id int64) (val *model.DBInfo, ok bool) { +func (s *Holder) SchemaByID(id int64) (val *model.DBInfo, ok bool) { val, ok = s.schemas[id] return } // SchemaByTableID returns the schema ID by table ID -func (s *Picker) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { +func (s *Holder) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { tn, ok := s.tableIDToName[tableID] if !ok { return nil, false @@ -132,13 +132,13 @@ func (s *Picker) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { } // TableByID returns the TableInfo by table id -func (s *Picker) TableByID(id int64) (val *model.TableInfo, ok bool) { +func (s *Holder) TableByID(id int64) (val *model.TableInfo, ok bool) { val, ok = s.tables[id] return } // DropSchema deletes the given DBInfo -func (s *Picker) DropSchema(id int64) (string, error) { +func (s *Holder) DropSchema(id int64) (string, error) { schema, ok := s.schemas[id] if !ok { return "", errors.NotFoundf("schema %d", id) @@ -158,7 +158,7 @@ func (s *Picker) DropSchema(id int64) (string, error) { } // CreateSchema adds new DBInfo -func (s *Picker) CreateSchema(db *model.DBInfo) error { +func (s *Holder) CreateSchema(db *model.DBInfo) error { if _, ok := s.schemas[db.ID]; ok { return errors.AlreadyExistsf("schema %s(%d)", db.Name, db.ID) } @@ -171,7 +171,7 @@ func (s *Picker) CreateSchema(db *model.DBInfo) error { } // DropTable deletes the given TableInfo -func (s *Picker) DropTable(id int64) (string, error) { +func (s *Holder) DropTable(id int64) (string, error) { table, ok := s.tables[id] if !ok { return "", errors.NotFoundf("table %d", id) @@ -191,7 +191,7 @@ func (s *Picker) DropTable(id int64) (string, error) { } // CreateTable creates new TableInfo -func (s *Picker) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { +func (s *Holder) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { _, ok := s.tables[table.ID] if ok { return errors.AlreadyExistsf("table %s.%s", schema.Name, table.Name) @@ -211,7 +211,7 @@ func (s *Picker) CreateTable(schema *model.DBInfo, table *model.TableInfo) error } // ReplaceTable replace the table by new tableInfo -func (s *Picker) ReplaceTable(table *model.TableInfo) error { +func (s *Holder) ReplaceTable(table *model.TableInfo) error { _, ok := s.tables[table.ID] if !ok { return errors.NotFoundf("table %s(%d)", table.Name, table.ID) @@ -226,7 +226,7 @@ func (s *Picker) ReplaceTable(table *model.TableInfo) error { return nil } -func (s *Picker) removeTable(tableID int64) error { +func (s *Holder) removeTable(tableID int64) error { schema, ok := s.SchemaByTableID(tableID) if !ok { return errors.NotFoundf("table(%d)'s schema", tableID) @@ -242,13 +242,13 @@ func (s *Picker) removeTable(tableID int64) error { return nil } -func (s *Picker) addJob(job *model.Job) { +func (s *Holder) addJob(job *model.Job) { if len(s.jobs) == 0 || s.jobs[len(s.jobs)-1].BinlogInfo.SchemaVersion < job.BinlogInfo.SchemaVersion { s.jobs = append(s.jobs, job) } } -func (s *Picker) HandlePreviousDDLJobIfNeed(commitTs uint64) error { +func (s *Holder) HandlePreviousDDLJobIfNeed(commitTs uint64) error { var i int var job *model.Job // TODO: Make sure jobs are sorted by BinlogInfo.FinishedTS @@ -281,7 +281,7 @@ func (s *Picker) HandlePreviousDDLJobIfNeed(commitTs uint64) error { // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Picker) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { +func (s *Holder) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job)) if skipJob(job) { @@ -450,12 +450,12 @@ func (s *Picker) HandleDDL(job *model.Job) (schemaName string, tableName string, } // IsTruncateTableID returns true if the table id have been truncated by truncate table DDL -func (s *Picker) IsTruncateTableID(id int64) bool { +func (s *Holder) IsTruncateTableID(id int64) bool { _, ok := s.truncateTableID[id] return ok } -func (s *Picker) getSchemaTableAndDelete(version int64) (string, string, error) { +func (s *Holder) getSchemaTableAndDelete(version int64) (string, string, error) { schemaTable, ok := s.version2SchemaTable[version] if !ok { return "", "", errors.NotFoundf("version: %d", version) diff --git a/cdc/schema/schema_picker_test.go b/cdc/schema/schema_picker_test.go index d2568ce613a..695cc9abed3 100644 --- a/cdc/schema/schema_picker_test.go +++ b/cdc/schema/schema_picker_test.go @@ -63,7 +63,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone}) // reconstruct the local schema - schema, err := NewSchemaPicker(jobs, false) + schema, err := NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(err, IsNil) @@ -80,7 +80,7 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchemaPicker(jobs, false) + schema, err = NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(124) c.Assert(err, IsNil) @@ -89,7 +89,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = jobs[:0] jobs = append(jobs, job) jobs = append(jobs, jobDup) - schema, err = NewSchemaPicker(jobs, false) + schema, err = NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(125) c.Log(err) @@ -108,7 +108,7 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchemaPicker(jobs, false) + schema, err = NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(errors.IsNotFound(err), IsTrue) @@ -206,7 +206,7 @@ func (*schemaSuite) TestTable(c *C) { jobs = append(jobs, job) // reconstruct the local schema - schema, err := NewSchemaPicker(jobs, false) + schema, err := NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(126) c.Assert(err, IsNil) @@ -237,7 +237,7 @@ func (*schemaSuite) TestTable(c *C) { Query: "truncate table " + tbName.O, }, ) - schema1, err := NewSchemaPicker(jobs, false) + schema1, err := NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema1.HandlePreviousDDLJobIfNeed(127) c.Assert(err, IsNil) @@ -259,7 +259,7 @@ func (*schemaSuite) TestTable(c *C) { Query: "drop table " + tbName.O, }, ) - schema2, err := NewSchemaPicker(jobs, false) + schema2, err := NewSchemaHolder(jobs, false) c.Assert(err, IsNil) err = schema2.HandlePreviousDDLJobIfNeed(128) c.Assert(err, IsNil) @@ -277,7 +277,7 @@ func (*schemaSuite) TestTable(c *C) { } func (t *schemaSuite) TestHandleDDL(c *C) { - schema, err := NewSchemaPicker(nil, false) + schema, err := NewSchemaHolder(nil, false) c.Assert(err, IsNil) dbName := model.NewCIStr("Test") colName := model.NewCIStr("A") @@ -400,7 +400,7 @@ func (t *schemaSuite) TestAddImplicitColumn(c *C) { c.Assert(tbl.Indices[0].Primary, IsTrue) } -func testDoDDLAndCheck(c *C, schema *Picker, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { +func testDoDDLAndCheck(c *C, schema *Holder, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { schemaName, tableName, resSQL, err := schema.HandleDDL(job) c.Logf("handle: %s", job.Query) c.Logf("result: %s, %s, %s, %v", schemaName, tableName, resSQL, err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index e66dcff22ab..198f8efdf89 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -73,7 +73,7 @@ func NewMySQLSink( return &sink, nil } -func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Picker) Sink { +func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Holder) Sink { inspector := &cachedInspector{ db: db, cache: make(map[string]*tableInfo), diff --git a/cdc/sink/util.go b/cdc/sink/util.go index 5f002a083df..707f37f0b06 100644 --- a/cdc/sink/util.go +++ b/cdc/sink/util.go @@ -82,7 +82,7 @@ func getTableInfo(db *gosql.DB, schema string, table string) (info *tableInfo, e return } -func getTableInfoFromSchema(schema *schema.Picker, schemaName, tableName string) (info *tableInfo, err error) { +func getTableInfoFromSchema(schema *schema.Holder, schemaName, tableName string) (info *tableInfo, err error) { info = new(tableInfo) tableId, exist := schema.GetTableIDByName(schemaName, tableName) if !exist { diff --git a/cdc/txn/txn.go b/cdc/txn/txn.go index 4b5317b70c7..166a29434b9 100644 --- a/cdc/txn/txn.go +++ b/cdc/txn/txn.go @@ -134,11 +134,11 @@ func CollectRawTxns( } type Mounter struct { - schema *schema.Picker + schema *schema.Holder loc *time.Location } -func NewTxnMounter(schema *schema.Picker, loc *time.Location) (*Mounter, error) { +func NewTxnMounter(schema *schema.Holder, loc *time.Location) (*Mounter, error) { m := &Mounter{schema: schema, loc: loc} return m, nil } diff --git a/cdc/txn/txn_test.go b/cdc/txn/txn_test.go index 7486704d802..688960594d9 100644 --- a/cdc/txn/txn_test.go +++ b/cdc/txn/txn_test.go @@ -181,7 +181,7 @@ type mountTxnsSuite struct{} var _ = check.Suite(&mountTxnsSuite{}) -func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Picker) { +func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Holder) { puller, err := mock.NewMockPuller() c.Assert(err, check.IsNil) var jobs []*model.Job @@ -198,7 +198,7 @@ func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.P } } c.Assert(len(jobs), check.Equals, len(sqls)) - schema, err := schema.NewSchemaPicker(jobs, false) + schema, err := schema.NewSchemaHolder(jobs, false) c.Assert(err, check.IsNil) err = schema.HandlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) c.Assert(err, check.IsNil) From 1dce222f1eb07ef7ba935ab3a7f72d2abf0e959a Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 18:06:17 +0800 Subject: [PATCH 08/10] schema --- cdc/cdc_test.go | 2 +- cdc/changefeed.go | 4 +- cdc/schema/{schema_picker.go => schema.go} | 46 +++++++++---------- .../{schema_picker_test.go => schema_test.go} | 18 ++++---- cdc/sink/mysql.go | 2 +- cdc/sink/util.go | 2 +- cdc/txn/txn.go | 4 +- cdc/txn/txn_test.go | 4 +- 8 files changed, 41 insertions(+), 41 deletions(-) rename cdc/schema/{schema_picker.go => schema.go} (90%) rename cdc/schema/{schema_picker_test.go => schema_test.go} (96%) diff --git a/cdc/cdc_test.go b/cdc/cdc_test.go index 7f1d95c6f14..99f75795aec 100644 --- a/cdc/cdc_test.go +++ b/cdc/cdc_test.go @@ -45,7 +45,7 @@ func NewCDCSuite() *CDCSuite { panic(err.Error()) } // create a picker - picker, err := schema.NewSchemaHolder(jobs, false) + picker, err := schema.NewSchema(jobs, false) if err != nil { panic(err.Error()) } diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 5999ab944c5..f8cf9396c45 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -76,7 +76,7 @@ type SubChangeFeed struct { detail ChangeFeedDetail watchs []util.Span - schema *schema.Holder + schema *schema.Schema mounter *txn.Mounter // sink is the Sink to write rows to. @@ -99,7 +99,7 @@ func NewSubChangeFeed(pdEndpoints []string, detail ChangeFeedDetail) (*SubChange if err != nil { return nil, err } - schema, err := schema.NewSchemaHolder(jobs, false) + schema, err := schema.NewSchema(jobs, false) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/schema/schema_picker.go b/cdc/schema/schema.go similarity index 90% rename from cdc/schema/schema_picker.go rename to cdc/schema/schema.go index 85f0c19058d..e0ce436fce7 100644 --- a/cdc/schema/schema_picker.go +++ b/cdc/schema/schema.go @@ -26,9 +26,9 @@ import ( const implicitColName = "_tidb_rowid" const implicitColID = -1 -// Holder stores the source TiDB all schema infomations +// Schema stores the source TiDB all schema infomations // schema infomations could be changed by drainer init and ddls appear -type Holder struct { +type Schema struct { tableIDToName map[int64]TableName tableNameToId map[TableName]int64 schemaNameToID map[string]int64 @@ -54,9 +54,9 @@ type TableName struct { Table string `toml:"tbl-name" json:"tbl-name"` } -// NewSchemaHolder returns the Holder object -func NewSchemaHolder(jobs []*model.Job, hasImplicitCol bool) (*Holder, error) { - s := &Holder{ +// NewSchema returns the Schema object +func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { + s := &Schema{ hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), truncateTableID: make(map[int64]struct{}), @@ -72,7 +72,7 @@ func NewSchemaHolder(jobs []*model.Job, hasImplicitCol bool) (*Holder, error) { return s, nil } -func (s *Holder) String() string { +func (s *Schema) String() string { mp := map[string]interface{}{ "tableIDToName": s.tableIDToName, "tableNameToId": s.tableNameToId, @@ -89,12 +89,12 @@ func (s *Holder) String() string { } // SchemaMetaVersion returns the current schemaversion in drainer -func (s *Holder) SchemaMetaVersion() int64 { +func (s *Schema) SchemaMetaVersion() int64 { return s.schemaMetaVersion } // SchemaAndTableName returns the tableName by table id -func (s *Holder) SchemaAndTableName(id int64) (string, string, bool) { +func (s *Schema) SchemaAndTableName(id int64) (string, string, bool) { tn, ok := s.tableIDToName[id] if !ok { return "", "", false @@ -104,7 +104,7 @@ func (s *Holder) SchemaAndTableName(id int64) (string, string, bool) { } // GetTableIDByName returns the tableId by table schemaName and tableName -func (s *Holder) GetTableIDByName(schemaName string, tableName string) (int64, bool) { +func (s *Schema) GetTableIDByName(schemaName string, tableName string) (int64, bool) { id, ok := s.tableNameToId[TableName{ Schema: schemaName, Table: tableName, @@ -113,13 +113,13 @@ func (s *Holder) GetTableIDByName(schemaName string, tableName string) (int64, b } // SchemaByID returns the DBInfo by schema id -func (s *Holder) SchemaByID(id int64) (val *model.DBInfo, ok bool) { +func (s *Schema) SchemaByID(id int64) (val *model.DBInfo, ok bool) { val, ok = s.schemas[id] return } // SchemaByTableID returns the schema ID by table ID -func (s *Holder) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { +func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { tn, ok := s.tableIDToName[tableID] if !ok { return nil, false @@ -132,13 +132,13 @@ func (s *Holder) SchemaByTableID(tableID int64) (*model.DBInfo, bool) { } // TableByID returns the TableInfo by table id -func (s *Holder) TableByID(id int64) (val *model.TableInfo, ok bool) { +func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool) { val, ok = s.tables[id] return } // DropSchema deletes the given DBInfo -func (s *Holder) DropSchema(id int64) (string, error) { +func (s *Schema) DropSchema(id int64) (string, error) { schema, ok := s.schemas[id] if !ok { return "", errors.NotFoundf("schema %d", id) @@ -158,7 +158,7 @@ func (s *Holder) DropSchema(id int64) (string, error) { } // CreateSchema adds new DBInfo -func (s *Holder) CreateSchema(db *model.DBInfo) error { +func (s *Schema) CreateSchema(db *model.DBInfo) error { if _, ok := s.schemas[db.ID]; ok { return errors.AlreadyExistsf("schema %s(%d)", db.Name, db.ID) } @@ -171,7 +171,7 @@ func (s *Holder) CreateSchema(db *model.DBInfo) error { } // DropTable deletes the given TableInfo -func (s *Holder) DropTable(id int64) (string, error) { +func (s *Schema) DropTable(id int64) (string, error) { table, ok := s.tables[id] if !ok { return "", errors.NotFoundf("table %d", id) @@ -191,7 +191,7 @@ func (s *Holder) DropTable(id int64) (string, error) { } // CreateTable creates new TableInfo -func (s *Holder) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { +func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error { _, ok := s.tables[table.ID] if ok { return errors.AlreadyExistsf("table %s.%s", schema.Name, table.Name) @@ -211,7 +211,7 @@ func (s *Holder) CreateTable(schema *model.DBInfo, table *model.TableInfo) error } // ReplaceTable replace the table by new tableInfo -func (s *Holder) ReplaceTable(table *model.TableInfo) error { +func (s *Schema) ReplaceTable(table *model.TableInfo) error { _, ok := s.tables[table.ID] if !ok { return errors.NotFoundf("table %s(%d)", table.Name, table.ID) @@ -226,7 +226,7 @@ func (s *Holder) ReplaceTable(table *model.TableInfo) error { return nil } -func (s *Holder) removeTable(tableID int64) error { +func (s *Schema) removeTable(tableID int64) error { schema, ok := s.SchemaByTableID(tableID) if !ok { return errors.NotFoundf("table(%d)'s schema", tableID) @@ -242,13 +242,13 @@ func (s *Holder) removeTable(tableID int64) error { return nil } -func (s *Holder) addJob(job *model.Job) { +func (s *Schema) addJob(job *model.Job) { if len(s.jobs) == 0 || s.jobs[len(s.jobs)-1].BinlogInfo.SchemaVersion < job.BinlogInfo.SchemaVersion { s.jobs = append(s.jobs, job) } } -func (s *Holder) HandlePreviousDDLJobIfNeed(commitTs uint64) error { +func (s *Schema) HandlePreviousDDLJobIfNeed(commitTs uint64) error { var i int var job *model.Job // TODO: Make sure jobs are sorted by BinlogInfo.FinishedTS @@ -281,7 +281,7 @@ func (s *Holder) HandlePreviousDDLJobIfNeed(commitTs uint64) error { // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Holder) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { +func (s *Schema) HandleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job)) if skipJob(job) { @@ -450,12 +450,12 @@ func (s *Holder) HandleDDL(job *model.Job) (schemaName string, tableName string, } // IsTruncateTableID returns true if the table id have been truncated by truncate table DDL -func (s *Holder) IsTruncateTableID(id int64) bool { +func (s *Schema) IsTruncateTableID(id int64) bool { _, ok := s.truncateTableID[id] return ok } -func (s *Holder) getSchemaTableAndDelete(version int64) (string, string, error) { +func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) { schemaTable, ok := s.version2SchemaTable[version] if !ok { return "", "", errors.NotFoundf("version: %d", version) diff --git a/cdc/schema/schema_picker_test.go b/cdc/schema/schema_test.go similarity index 96% rename from cdc/schema/schema_picker_test.go rename to cdc/schema/schema_test.go index 695cc9abed3..6102ee01358 100644 --- a/cdc/schema/schema_picker_test.go +++ b/cdc/schema/schema_test.go @@ -63,7 +63,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone}) // reconstruct the local schema - schema, err := NewSchemaHolder(jobs, false) + schema, err := NewSchema(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(err, IsNil) @@ -80,7 +80,7 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchemaHolder(jobs, false) + schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(124) c.Assert(err, IsNil) @@ -89,7 +89,7 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = jobs[:0] jobs = append(jobs, job) jobs = append(jobs, jobDup) - schema, err = NewSchemaHolder(jobs, false) + schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(125) c.Log(err) @@ -108,7 +108,7 @@ func (t *schemaSuite) TestSchema(c *C) { Query: "drop database test", }, ) - schema, err = NewSchemaHolder(jobs, false) + schema, err = NewSchema(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(123) c.Assert(errors.IsNotFound(err), IsTrue) @@ -206,7 +206,7 @@ func (*schemaSuite) TestTable(c *C) { jobs = append(jobs, job) // reconstruct the local schema - schema, err := NewSchemaHolder(jobs, false) + schema, err := NewSchema(jobs, false) c.Assert(err, IsNil) err = schema.HandlePreviousDDLJobIfNeed(126) c.Assert(err, IsNil) @@ -237,7 +237,7 @@ func (*schemaSuite) TestTable(c *C) { Query: "truncate table " + tbName.O, }, ) - schema1, err := NewSchemaHolder(jobs, false) + schema1, err := NewSchema(jobs, false) c.Assert(err, IsNil) err = schema1.HandlePreviousDDLJobIfNeed(127) c.Assert(err, IsNil) @@ -259,7 +259,7 @@ func (*schemaSuite) TestTable(c *C) { Query: "drop table " + tbName.O, }, ) - schema2, err := NewSchemaHolder(jobs, false) + schema2, err := NewSchema(jobs, false) c.Assert(err, IsNil) err = schema2.HandlePreviousDDLJobIfNeed(128) c.Assert(err, IsNil) @@ -277,7 +277,7 @@ func (*schemaSuite) TestTable(c *C) { } func (t *schemaSuite) TestHandleDDL(c *C) { - schema, err := NewSchemaHolder(nil, false) + schema, err := NewSchema(nil, false) c.Assert(err, IsNil) dbName := model.NewCIStr("Test") colName := model.NewCIStr("A") @@ -400,7 +400,7 @@ func (t *schemaSuite) TestAddImplicitColumn(c *C) { c.Assert(tbl.Indices[0].Primary, IsTrue) } -func testDoDDLAndCheck(c *C, schema *Holder, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { +func testDoDDLAndCheck(c *C, schema *Schema, job *model.Job, isErr bool, sql string, expectedSchema string, expectedTable string) { schemaName, tableName, resSQL, err := schema.HandleDDL(job) c.Logf("handle: %s", job.Query) c.Logf("result: %s, %s, %s, %v", schemaName, tableName, resSQL, err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 198f8efdf89..685f6717a65 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -73,7 +73,7 @@ func NewMySQLSink( return &sink, nil } -func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Holder) Sink { +func NewMySQLSinkUsingSchema(db *sql.DB, picker *schema.Schema) Sink { inspector := &cachedInspector{ db: db, cache: make(map[string]*tableInfo), diff --git a/cdc/sink/util.go b/cdc/sink/util.go index 707f37f0b06..d22e4b2e192 100644 --- a/cdc/sink/util.go +++ b/cdc/sink/util.go @@ -82,7 +82,7 @@ func getTableInfo(db *gosql.DB, schema string, table string) (info *tableInfo, e return } -func getTableInfoFromSchema(schema *schema.Holder, schemaName, tableName string) (info *tableInfo, err error) { +func getTableInfoFromSchema(schema *schema.Schema, schemaName, tableName string) (info *tableInfo, err error) { info = new(tableInfo) tableId, exist := schema.GetTableIDByName(schemaName, tableName) if !exist { diff --git a/cdc/txn/txn.go b/cdc/txn/txn.go index 166a29434b9..992b62d8d6c 100644 --- a/cdc/txn/txn.go +++ b/cdc/txn/txn.go @@ -134,11 +134,11 @@ func CollectRawTxns( } type Mounter struct { - schema *schema.Holder + schema *schema.Schema loc *time.Location } -func NewTxnMounter(schema *schema.Holder, loc *time.Location) (*Mounter, error) { +func NewTxnMounter(schema *schema.Schema, loc *time.Location) (*Mounter, error) { m := &Mounter{schema: schema, loc: loc} return m, nil } diff --git a/cdc/txn/txn_test.go b/cdc/txn/txn_test.go index 688960594d9..f6b5cb7b9c4 100644 --- a/cdc/txn/txn_test.go +++ b/cdc/txn/txn_test.go @@ -181,7 +181,7 @@ type mountTxnsSuite struct{} var _ = check.Suite(&mountTxnsSuite{}) -func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Holder) { +func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.Schema) { puller, err := mock.NewMockPuller() c.Assert(err, check.IsNil) var jobs []*model.Job @@ -198,7 +198,7 @@ func setUpPullerAndSchema(c *check.C, sqls ...string) (*mock.MockTiDB, *schema.H } } c.Assert(len(jobs), check.Equals, len(sqls)) - schema, err := schema.NewSchemaHolder(jobs, false) + schema, err := schema.NewSchema(jobs, false) c.Assert(err, check.IsNil) err = schema.HandlePreviousDDLJobIfNeed(jobs[len(jobs)-1].BinlogInfo.FinishedTS) c.Assert(err, check.IsNil) From 87897d77196c68abeae42bd2040fcad43af18f91 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 18:07:29 +0800 Subject: [PATCH 09/10] schema --- cdc/schema/schema_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/schema/schema_test.go b/cdc/schema/schema_test.go index 6102ee01358..c4893ad0a18 100644 --- a/cdc/schema/schema_test.go +++ b/cdc/schema/schema_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/types" ) -// TODO run the test type schemaSuite struct{} func Test(t *testing.T) { TestingT(t) } From bac774846b0012ea66fa5d6f6dce0defa737b9b9 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 22 Oct 2019 22:22:09 +0800 Subject: [PATCH 10/10] Update cdc/txn/txn.go Co-Authored-By: amyangfei --- cdc/txn/txn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/txn/txn.go b/cdc/txn/txn.go index 992b62d8d6c..9ccc84c7886 100644 --- a/cdc/txn/txn.go +++ b/cdc/txn/txn.go @@ -271,7 +271,7 @@ func (m *Mounter) fetchTableInfo(tableId int64) (tableInfo *model.TableInfo, tab if !exist { return nil, nil, "", errors.Errorf("can not find table, id: %d", tableId) } - tableName = &schema.TableName{database, table} + tableName = &schema.TableName{Schema: database, Table: table} pkColOffset := -1 for i, col := range tableInfo.Columns {