diff --git a/cmd/tidb-lightning-ctl/main.go b/cmd/tidb-lightning-ctl/main.go index c874d200e..627a219b3 100644 --- a/cmd/tidb-lightning-ctl/main.go +++ b/cmd/tidb-lightning-ctl/main.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb-lightning/lightning/config" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" "github.com/pingcap/tidb-lightning/lightning/restore" "github.com/satori/go.uuid" ) diff --git a/lightning/kv/allocator.go b/lightning/backend/allocator.go similarity index 98% rename from lightning/kv/allocator.go rename to lightning/backend/allocator.go index 4207d0542..a5cfd8acb 100644 --- a/lightning/kv/allocator.go +++ b/lightning/backend/allocator.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "sync/atomic" diff --git a/lightning/kv/backend.go b/lightning/backend/backend.go similarity index 96% rename from lightning/kv/backend.go rename to lightning/backend/backend.go index 14cabd622..14c09d871 100644 --- a/lightning/kv/backend.go +++ b/lightning/backend/backend.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "context" @@ -92,6 +92,11 @@ type AbstractBackend interface { // value will be used in `Rows.SplitIntoChunks`. MaxChunkSize() int + // ShouldPostProcess returns whether KV-specific post-processing should be + // performed for this backend. Post-processing includes checksum, adjusting + // auto-increment ID, and analyze. + ShouldPostProcess() bool + // NewEncoder creates an encoder of a TiDB table. NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder @@ -162,6 +167,10 @@ func (be Backend) NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder { return be.abstract.NewEncoder(tbl, sqlMode) } +func (be Backend) ShouldPostProcess() bool { + return be.abstract.ShouldPostProcess() +} + // OpenEngine opens an engine with the given table name and engine ID. func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) { tag := makeTag(tableName, engineID) diff --git a/lightning/kv/backend_test.go b/lightning/backend/backend_test.go similarity index 99% rename from lightning/kv/backend_test.go rename to lightning/backend/backend_test.go index e92dae677..20823bbc7 100644 --- a/lightning/kv/backend_test.go +++ b/lightning/backend/backend_test.go @@ -1,4 +1,4 @@ -package kv_test +package backend_test import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/pingcap/parser/mysql" uuid "github.com/satori/go.uuid" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" "github.com/pingcap/tidb-lightning/mock" ) diff --git a/lightning/kv/importer.go b/lightning/backend/importer.go similarity index 98% rename from lightning/kv/importer.go rename to lightning/backend/importer.go index bcebd2c34..912f416ca 100644 --- a/lightning/kv/importer.go +++ b/lightning/backend/importer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "context" @@ -85,6 +85,10 @@ func (*importer) MaxChunkSize() int { return 31 << 10 } +func (*importer) ShouldPostProcess() bool { + return true +} + // isIgnorableOpenCloseEngineError checks if the error from // OpenEngine/CloseEngine can be safely ignored. func isIgnorableOpenCloseEngineError(err error) bool { diff --git a/lightning/kv/importer_test.go b/lightning/backend/importer_test.go similarity index 98% rename from lightning/kv/importer_test.go rename to lightning/backend/importer_test.go index cb0915a49..56c291ff6 100644 --- a/lightning/kv/importer_test.go +++ b/lightning/backend/importer_test.go @@ -1,4 +1,4 @@ -package kv_test +package backend_test import ( "context" @@ -10,7 +10,7 @@ import ( kvenc "github.com/pingcap/tidb/util/kvencoder" uuid "github.com/satori/go.uuid" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" "github.com/pingcap/tidb-lightning/mock" ) diff --git a/lightning/backend/mysql.go b/lightning/backend/mysql.go new file mode 100644 index 000000000..776dd6821 --- /dev/null +++ b/lightning/backend/mysql.go @@ -0,0 +1,293 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "context" + "database/sql" + "encoding/hex" + "strconv" + "strings" + "time" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + uuid "github.com/satori/go.uuid" + "go.uber.org/zap" + + "github.com/pingcap/tidb-lightning/lightning/common" + "github.com/pingcap/tidb-lightning/lightning/log" + "github.com/pingcap/tidb-lightning/lightning/verification" +) + +type mysqlRow string + +type mysqlRows []mysqlRow + +type mysqlEncoder struct { + mode mysql.SQLMode +} + +type mysqlBackend struct { + db *sql.DB +} + +// NewMySQLBackend creates a new MySQL backend using the given database. +// +// The backend does not take ownership of `db`. Caller should close `db` +// manually after the backend expired. +func NewMySQLBackend(db *sql.DB) Backend { + return MakeBackend(&mysqlBackend{db: db}) +} + +func (row mysqlRow) ClassifyAndAppend(data *Rows, checksum *verification.KVChecksum, _ *Rows, _ *verification.KVChecksum) { + rows := (*data).(mysqlRows) + *data = mysqlRows(append(rows, row)) + cs := verification.MakeKVChecksum(uint64(len(row)), 1, 0) + checksum.Add(&cs) +} + +func (rows mysqlRows) SplitIntoChunks(splitSize int) []Rows { + if len(rows) == 0 { + return nil + } + + res := make([]Rows, 0, 1) + i := 0 + cumSize := 0 + + for j, row := range rows { + if i < j && cumSize+len(row) > splitSize { + res = append(res, rows[i:j]) + i = j + cumSize = 0 + } + cumSize += len(row) + } + + return append(res, rows[i:]) +} + +func (rows mysqlRows) Clear() Rows { + return rows[:0] +} + +func (enc mysqlEncoder) appendSQLBytes(sb *strings.Builder, value []byte) { + sb.Grow(2 + len(value)) + sb.WriteByte('\'') + if enc.mode.HasNoBackslashEscapesMode() { + for _, b := range value { + if b == '\'' { + sb.WriteString(`''`) + } else { + sb.WriteByte(b) + } + } + } else { + for _, b := range value { + switch b { + case 0: + sb.WriteString(`\0`) + case '\b': + sb.WriteString(`\b`) + case '\n': + sb.WriteString(`\n`) + case '\r': + sb.WriteString(`\r`) + case '\t': + sb.WriteString(`\t`) + case 0x26: + sb.WriteString(`\Z`) + case '\'': + sb.WriteString(`''`) + case '\\': + sb.WriteString(`\\`) + default: + sb.WriteByte(b) + } + } + } + sb.WriteByte('\'') +} + +// appendSQL appends the SQL representation of the Datum into the string builder. +// Note that we cannot use Datum.ToString since it doesn't perform SQL escaping. +func (enc mysqlEncoder) appendSQL(sb *strings.Builder, datum *types.Datum) error { + switch datum.Kind() { + case types.KindNull: + sb.WriteString("NULL") + + case types.KindMinNotNull: + sb.WriteString("MINVALUE") + + case types.KindMaxValue: + sb.WriteString("MAXVALUE") + + case types.KindInt64: + // longest int64 = -9223372036854775808 which has 20 characters + var buffer [20]byte + value := strconv.AppendInt(buffer[:0], datum.GetInt64(), 10) + sb.Write(value) + + case types.KindUint64, types.KindMysqlEnum, types.KindMysqlSet: + // longest uint64 = 18446744073709551615 which has 20 characters + var buffer [20]byte + value := strconv.AppendUint(buffer[:0], datum.GetUint64(), 10) + sb.Write(value) + + case types.KindFloat32, types.KindFloat64: + // float64 has 16 digits of precision, so a buffer size of 32 is more than enough... + var buffer [32]byte + value := strconv.AppendFloat(buffer[:0], datum.GetFloat64(), 'g', -1, 64) + sb.Write(value) + + case types.KindString, types.KindBytes: + enc.appendSQLBytes(sb, datum.GetBytes()) + + case types.KindMysqlJSON: + value, err := datum.GetMysqlJSON().MarshalJSON() + if err != nil { + return err + } + enc.appendSQLBytes(sb, value) + + case types.KindBinaryLiteral: + value := datum.GetBinaryLiteral() + sb.Grow(2 + 2*len(value)) + sb.WriteString("0x") + hex.NewEncoder(sb).Write(value) + + case types.KindMysqlBit: + var buffer [20]byte + intValue, err := datum.GetBinaryLiteral().ToInt(nil) + if err != nil { + return err + } + value := strconv.AppendUint(buffer[:0], intValue, 10) + sb.Write(value) + + // time, duration, decimal + default: + value, err := datum.ToString() + if err != nil { + return err + } + sb.WriteByte('\'') + sb.WriteString(value) + sb.WriteByte('\'') + } + + return nil +} + +func (mysqlEncoder) Close() {} + +func (enc mysqlEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _ []int) (Row, error) { + var encoded strings.Builder + encoded.Grow(8 * len(row)) + encoded.WriteByte('(') + for i, field := range row { + if i != 0 { + encoded.WriteByte(',') + } + if err := enc.appendSQL(&encoded, &field); err != nil { + logger.Error("mysql encode failed", + zap.Array("original", rowArrayMarshaler(row)), + zap.Int("originalCol", i), + log.ShortError(err), + ) + return nil, err + } + } + encoded.WriteByte(')') + return mysqlRow(encoded.String()), nil +} + +func (be *mysqlBackend) Close() { + // *Not* going to close `be.db`. The db object is normally borrowed from a + // TidbManager, so we let the manager to close it. +} + +func (be *mysqlBackend) MakeEmptyRows() Rows { + return mysqlRows(nil) +} + +func (be *mysqlBackend) RetryImportDelay() time.Duration { + return 0 +} + +func (be *mysqlBackend) MaxChunkSize() int { + return 1048576 +} + +func (be *mysqlBackend) ShouldPostProcess() bool { + return false +} + +func (be *mysqlBackend) NewEncoder(_ table.Table, mode mysql.SQLMode) Encoder { + return mysqlEncoder{mode: mode} +} + +func (be *mysqlBackend) OpenEngine(context.Context, uuid.UUID) error { + return nil +} + +func (be *mysqlBackend) CloseEngine(context.Context, uuid.UUID) error { + return nil +} + +func (be *mysqlBackend) CleanupEngine(context.Context, uuid.UUID) error { + return nil +} + +func (be *mysqlBackend) ImportEngine(context.Context, uuid.UUID) error { + return nil +} + +func (be *mysqlBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName string, columnNames []string, _ uint64, r Rows) error { + rows := r.(mysqlRows) + if len(rows) == 0 { + return nil + } + + var insertStmt strings.Builder + insertStmt.WriteString("INSERT INTO ") + insertStmt.WriteString(tableName) + if len(columnNames) > 0 { + insertStmt.WriteByte('(') + for i, colName := range columnNames { + if i != 0 { + insertStmt.WriteByte(',') + } + common.WriteMySQLIdentifier(&insertStmt, colName) + } + insertStmt.WriteByte(')') + } + insertStmt.WriteString(" VALUES") + + // Note: we are not going to do interpolation (prepared statements) to avoid + // complication arised from data length overflow of BIT and BINARY columns + + for i, row := range rows { + if i != 0 { + insertStmt.WriteByte(',') + } + insertStmt.WriteString(string(row)) + } + + // Retry will be done externally, so we're not going to retry here. + _, err := be.db.ExecContext(ctx, insertStmt.String()) + return err +} diff --git a/lightning/backend/mysql_test.go b/lightning/backend/mysql_test.go new file mode 100644 index 000000000..350072e31 --- /dev/null +++ b/lightning/backend/mysql_test.go @@ -0,0 +1,85 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend_test + +import ( + "context" + + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + "github.com/pingcap/tidb/types" + + kv "github.com/pingcap/tidb-lightning/lightning/backend" + "github.com/pingcap/tidb-lightning/lightning/log" + "github.com/pingcap/tidb-lightning/lightning/verification" +) + +var _ = Suite(&mysqlSuite{}) + +type mysqlSuite struct { + mockDB sqlmock.Sqlmock + backend kv.Backend +} + +func (s *mysqlSuite) SetUpTest(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + s.mockDB = mock + s.backend = kv.NewMySQLBackend(db) +} + +func (s *mysqlSuite) TearDownTest(c *C) { + s.backend.Close() + c.Assert(s.mockDB.ExpectationsWereMet(), IsNil) +} + +func (s *mysqlSuite) TestWriteRows(c *C) { + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',0x000000abcdef,2557891634,'12.5',51)\\E"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + ctx := context.Background() + logger := log.L() + + engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1) + c.Assert(err, IsNil) + + dataRows := s.backend.MakeEmptyRows() + dataChecksum := verification.MakeKVChecksum(0, 0, 0) + indexRows := s.backend.MakeEmptyRows() + indexChecksum := verification.MakeKVChecksum(0, 0, 0) + + encoder := s.backend.NewEncoder(nil, 0) + row, err := encoder.Encode(logger, []types.Datum{ + types.NewUintDatum(18446744073709551615), + types.NewIntDatum(-9223372036854775808), + types.NewUintDatum(0), + types.Datum{}, + types.NewFloat32Datum(7.5), + types.NewFloat64Datum(5e-324), + types.NewFloat64Datum(1.7976931348623157e+308), + types.NewFloat64Datum(-0.0), + types.NewStringDatum("甲乙丙\r\n\x00\x26'\"\\`"), + types.NewBinaryLiteralDatum(types.NewBinaryLiteralFromUint(0xabcdef, 6)), + types.NewMysqlBitDatum(types.NewBinaryLiteralFromUint(0x98765432, 4)), + types.NewDecimalDatum(types.NewDecFromFloatForTest(12.5)), + types.NewMysqlEnumDatum(types.Enum{Name: "ENUM_NAME", Value: 51}), + }, 1, nil) + c.Assert(err, IsNil) + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + err = engine.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) + c.Assert(err, IsNil) +} diff --git a/lightning/kv/session.go b/lightning/backend/session.go similarity index 99% rename from lightning/kv/session.go rename to lightning/backend/session.go index 036e32dc8..986f12adb 100644 --- a/lightning/kv/session.go +++ b/lightning/backend/session.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "github.com/pingcap/parser/mysql" diff --git a/lightning/kv/session_test.go b/lightning/backend/session_test.go similarity index 98% rename from lightning/kv/session_test.go rename to lightning/backend/session_test.go index c3193f10e..23ff26bb7 100644 --- a/lightning/kv/session_test.go +++ b/lightning/backend/session_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "testing" diff --git a/lightning/kv/sql2kv.go b/lightning/backend/sql2kv.go similarity index 99% rename from lightning/kv/sql2kv.go rename to lightning/backend/sql2kv.go index 8ab59e68d..115b2732d 100644 --- a/lightning/kv/sql2kv.go +++ b/lightning/backend/sql2kv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "github.com/pingcap/errors" diff --git a/lightning/kv/sql2kv_test.go b/lightning/backend/sql2kv_test.go similarity index 99% rename from lightning/kv/sql2kv_test.go rename to lightning/backend/sql2kv_test.go index 05f4e9477..9d3bcc2f7 100644 --- a/lightning/kv/sql2kv_test.go +++ b/lightning/backend/sql2kv_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "errors" diff --git a/lightning/kv/tikv.go b/lightning/backend/tikv.go similarity index 99% rename from lightning/kv/tikv.go rename to lightning/backend/tikv.go index a90a9174e..76c23c193 100644 --- a/lightning/kv/tikv.go +++ b/lightning/backend/tikv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kv +package backend import ( "context" diff --git a/lightning/kv/tikv_test.go b/lightning/backend/tikv_test.go similarity index 96% rename from lightning/kv/tikv_test.go rename to lightning/backend/tikv_test.go index 11f33eb51..4b535cb8b 100644 --- a/lightning/kv/tikv_test.go +++ b/lightning/backend/tikv_test.go @@ -1,4 +1,4 @@ -package kv_test +package backend_test import ( "context" @@ -10,7 +10,7 @@ import ( . "github.com/pingcap/check" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" ) type tikvSuite struct{} diff --git a/lightning/common/util.go b/lightning/common/util.go index 3bf4479d5..5624f81b6 100644 --- a/lightning/common/util.go +++ b/lightning/common/util.go @@ -46,12 +46,12 @@ const ( defaultMaxRetry = 3 ) -func ToDSN(host string, port int, user string, psw string) string { - return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", user, psw, host, port) +func ToDSN(host string, port int, user string, psw string, sqlMode string) string { + return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'", user, psw, host, port, sqlMode) } -func ConnectDB(host string, port int, user string, psw string) (*sql.DB, error) { - dbDSN := ToDSN(host, port, user, psw) +func ConnectDB(host string, port int, user string, psw string, sqlMode string) (*sql.DB, error) { + dbDSN := ToDSN(host, port, user, psw, sqlMode) db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, errors.Trace(err) diff --git a/lightning/config/config.go b/lightning/config/config.go index 815199e5b..acb0304d3 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -37,6 +37,16 @@ const ( ImportMode = "import" // NormalMode defines mode of normal for tikv. NormalMode = "normal" + + // BackendMySQL is a constant for choosing the "MySQL" backend in the configuration. + BackendMySQL = "mysql" + // BackendImporter is a constant for choosing the "Importer" backend in the configuration. + BackendImporter = "importer" + + // CheckpointDriverMySQL is a constant for choosing the "MySQL" checkpoint driver in the configuration. + CheckpointDriverMySQL = "mysql" + // CheckpointDriverFile is a constant for choosing the "File" checkpoint driver in the configuration. + CheckpointDriverFile = "file" ) var defaultConfigPaths = []string{"tidb-lightning.toml", "conf/tidb-lightning.toml"} @@ -119,7 +129,8 @@ type MydumperRuntime struct { } type TikvImporter struct { - Addr string `toml:"addr" json:"addr"` + Addr string `toml:"addr" json:"addr"` + Backend string `toml:"backend" json:"backend"` } type Checkpoint struct { @@ -181,6 +192,9 @@ func NewConfig() *Config { Delimiter: `"`, }, }, + TikvImporter: TikvImporter{ + Backend: BackendImporter, + }, PostRestore: PostRestore{ Checksum: true, }, @@ -200,6 +214,7 @@ func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error { cfg.TiDB.PdAddr = global.TiDB.PdAddr cfg.Mydumper.SourceDir = global.Mydumper.SourceDir cfg.TikvImporter.Addr = global.TikvImporter.Addr + cfg.TikvImporter.Backend = global.TikvImporter.Backend return nil } @@ -290,6 +305,13 @@ func (cfg *Config) Adjust() error { } } + cfg.TikvImporter.Backend = strings.ToLower(cfg.TikvImporter.Backend) + switch cfg.TikvImporter.Backend { + case BackendMySQL, BackendImporter: + default: + return errors.Errorf("invalid config: unsupported `tikv-importer.backend` (%s)", cfg.TikvImporter.Backend) + } + var err error cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode) if err != nil { @@ -353,13 +375,13 @@ func (cfg *Config) Adjust() error { cfg.Checkpoint.Schema = "tidb_lightning_checkpoint" } if len(cfg.Checkpoint.Driver) == 0 { - cfg.Checkpoint.Driver = "file" + cfg.Checkpoint.Driver = CheckpointDriverFile } if len(cfg.Checkpoint.DSN) == 0 { switch cfg.Checkpoint.Driver { - case "mysql": - cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw) - case "file": + case CheckpointDriverMySQL: + cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode) + case CheckpointDriverFile: cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb" } } diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index 9e520392c..af04ef5fd 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -25,6 +25,8 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-lightning/lightning/config" ) @@ -109,6 +111,13 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) { c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") } +func (s *configTestSuite) TestAdjustInvalidBackend(c *C) { + cfg := config.NewConfig() + cfg.TikvImporter.Backend = "no_such_backend" + err := cfg.Adjust() + c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend` \\(no_such_backend\\)") +} + func (s *configTestSuite) TestDecodeError(c *C) { ts, host, port := startMockServer(c, http.StatusOK, "invalid-string") defer ts.Close() @@ -378,10 +387,10 @@ func (s *configTestSuite) TestLoadConfig(c *C) { c.Assert(err, IsNil) taskCfg.Checkpoint.DSN = "" - taskCfg.Checkpoint.Driver = "mysql" + taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL err = taskCfg.Adjust() c.Assert(err, IsNil) - c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8") + c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'") result := taskCfg.String() c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`) @@ -389,7 +398,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) { taskCfg := config.NewConfig() - err := taskCfg.LoadFromGlobal(&config.GlobalConfig { + err := taskCfg.LoadFromGlobal(&config.GlobalConfig{ ConfigFileContent: []byte("invalid toml"), }) c.Assert(err, ErrorMatches, "Near line 1.*") diff --git a/lightning/config/global.go b/lightning/config/global.go index 0356cb896..540fa6386 100644 --- a/lightning/config/global.go +++ b/lightning/config/global.go @@ -49,7 +49,8 @@ type GlobalMydumper struct { } type GlobalImporter struct { - Addr string `toml:"addr" json:"addr"` + Addr string `toml:"addr" json:"addr"` + Backend string `toml:"backend" json:"backend"` } type GlobalConfig struct { @@ -72,6 +73,9 @@ func NewGlobalConfig() *GlobalConfig { StatusPort: 10080, LogLevel: "error", }, + TikvImporter: GlobalImporter{ + Backend: "importer", + }, } } @@ -111,6 +115,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon pdAddr := fs.String("pd-urls", "", "PD endpoint address") dataSrcPath := fs.String("d", "", "Directory of the dump to import") importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer") + backend := fs.String("backend", "", `delivery backend ("importer" or "mysql")`) statusAddr := fs.String("status-addr", "", "the Lightning server address") serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately") @@ -171,6 +176,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon if *statusAddr != "" { cfg.App.StatusAddr = *statusAddr } + if *backend != "" { + cfg.TikvImporter.Backend = *backend + } if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 { cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort) } diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 02d60ea53..fa8127b10 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -41,7 +41,7 @@ import ( . "github.com/pingcap/tidb-lightning/lightning/checkpoints" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-lightning/lightning/metric" "github.com/pingcap/tidb-lightning/lightning/mydump" @@ -153,11 +153,6 @@ type RestoreController struct { } func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config) (*RestoreController, error) { - backend, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr) - if err != nil { - return nil, errors.Trace(err) - } - cpdb, err := OpenCheckpointsDB(ctx, cfg) if err != nil { return nil, errors.Trace(err) @@ -168,6 +163,20 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, return nil, errors.Trace(err) } + var backend kv.Backend + switch cfg.TikvImporter.Backend { + case config.BackendImporter: + var err error + backend, err = kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr) + if err != nil { + return nil, err + } + case config.BackendMySQL: + backend = kv.NewMySQLBackend(tidbMgr.db) + default: + return nil, errors.New("unknown backend: " + cfg.TikvImporter.Backend) + } + rc := &RestoreController{ cfg: cfg, dbMetas: dbMetas, @@ -193,7 +202,7 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, } switch cfg.Checkpoint.Driver { - case "mysql": + case config.CheckpointDriverMySQL: db, err := sql.Open("mysql", cfg.Checkpoint.DSN) if err != nil { return nil, errors.Trace(err) @@ -205,7 +214,7 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, } return cpdb, nil - case "file": + case config.CheckpointDriverFile: return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil default: @@ -924,6 +933,12 @@ func (t *TableRestore) importEngine( } func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error { + if !rc.backend.ShouldPostProcess() { + t.logger.Debug("skip post-processing, not supported by backend") + rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped) + return nil + } + setSessionConcurrencyVars(ctx, rc.tidbMgr.db, rc.cfg.TiDB) // 3. alter table set auto_increment diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 563a33144..165b0eb9b 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -36,7 +36,7 @@ import ( . "github.com/pingcap/tidb-lightning/lightning/checkpoints" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" - "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-lightning/lightning/mydump" "github.com/pingcap/tidb-lightning/lightning/verification" diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index 587b39e72..eea6b3720 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -44,7 +44,7 @@ type TiDBManager struct { } func NewTiDBManager(dsn config.DBStore) (*TiDBManager, error) { - db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw) + db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw, dsn.StrSQLMode) if err != nil { return nil, errors.Trace(err) } diff --git a/mock/backend.go b/mock/backend.go index ef583caf5..11f342d06 100644 --- a/mock/backend.go +++ b/mock/backend.go @@ -10,7 +10,7 @@ import ( context "context" gomock "github.com/golang/mock/gomock" mysql "github.com/pingcap/parser/mysql" - kv "github.com/pingcap/tidb-lightning/lightning/kv" + kv "github.com/pingcap/tidb-lightning/lightning/backend" log "github.com/pingcap/tidb-lightning/lightning/log" verification "github.com/pingcap/tidb-lightning/lightning/verification" table "github.com/pingcap/tidb/table" @@ -149,6 +149,18 @@ func (mr *MockBackendMockRecorder) RetryImportDelay() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RetryImportDelay", reflect.TypeOf((*MockBackend)(nil).RetryImportDelay)) } +// ShouldPostProcess mocks base method +func (m *MockBackend) ShouldPostProcess() bool { + ret := m.ctrl.Call(m, "ShouldPostProcess") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShouldPostProcess indicates an expected call of ShouldPostProcess +func (mr *MockBackendMockRecorder) ShouldPostProcess() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldPostProcess", reflect.TypeOf((*MockBackend)(nil).ShouldPostProcess)) +} + // WriteRows mocks base method func (m *MockBackend) WriteRows(arg0 context.Context, arg1 go_uuid.UUID, arg2 string, arg3 []string, arg4 uint64, arg5 kv.Rows) error { ret := m.ctrl.Call(m, "WriteRows", arg0, arg1, arg2, arg3, arg4, arg5) diff --git a/tests/csv/run.sh b/tests/csv/run.sh index 9072b4f31..8585f0163 100755 --- a/tests/csv/run.sh +++ b/tests/csv/run.sh @@ -2,9 +2,11 @@ set -eu +for BACKEND in importer mysql; do + run_sql 'DROP DATABASE IF EXISTS csv' -run_lightning +run_lightning config --backend $BACKEND run_sql 'SELECT count(*), sum(PROCESSLIST_TIME), sum(THREAD_OS_ID), count(PROCESSLIST_STATE) FROM csv.threads' check_contains 'count(*): 43' @@ -34,3 +36,5 @@ run_sql 'SELECT id FROM csv.empty_strings WHERE a = """"' check_contains 'id: 3' run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""' check_not_contains 'id:' + +done diff --git a/tests/various_types/run.sh b/tests/various_types/run.sh index 3b7b8001c..787c172b3 100755 --- a/tests/various_types/run.sh +++ b/tests/various_types/run.sh @@ -17,9 +17,11 @@ set -eu +for BACKEND in importer mysql; do + run_sql 'DROP DATABASE IF EXISTS vt;' -run_lightning -echo 'Import finished' +run_lightning config --backend $BACKEND +echo Import using $BACKEND finished run_sql 'SELECT count(pk), bin(min(pk)), bin(max(pk)) FROM vt.bit' check_contains 'count(pk): 16' @@ -104,3 +106,5 @@ check_contains 'a: 18446744073709551614' check_contains 'b: -9223372036854775806' check_contains 'c: 99999999999999999999.0' check_contains 'd: 18446744073709551616.0' + +done diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 91562ad85..cb4c0ffe1 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -61,6 +61,9 @@ driver = "file" #keep-after-success = false [tikv-importer] +# Delivery backend, can be "importer" or "mysql". +backend = "importer" +# Address of tikv-importer when the backend is 'importer' addr = "127.0.0.1:8287" [mydumper]