Skip to content

Commit

Permalink
*: support MySQL backend (pingcap#221)
Browse files Browse the repository at this point in the history
* *: support MySQL backend

* config: use a constant for backend and checkpoint.driver

* kv: address comments

* backend: rename `kv` package to `backend` package
  • Loading branch information
kennytm authored Aug 14, 2019
1 parent e8d463a commit 4eb74ec
Show file tree
Hide file tree
Showing 25 changed files with 510 additions and 42 deletions.
2 changes: 1 addition & 1 deletion cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/kv"
kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/restore"
"github.com/satori/go.uuid"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package backend

import (
"sync/atomic"
Expand Down
11 changes: 10 additions & 1 deletion lightning/kv/backend.go → lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package backend

import (
"context"
Expand Down Expand Up @@ -92,6 +92,11 @@ type AbstractBackend interface {
// value will be used in `Rows.SplitIntoChunks`.
MaxChunkSize() int

// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum, adjusting
// auto-increment ID, and analyze.
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder

Expand Down Expand Up @@ -162,6 +167,10 @@ func (be Backend) NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder {
return be.abstract.NewEncoder(tbl, sqlMode)
}

func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag := makeTag(tableName, engineID)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kv_test
package backend_test

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
"github.com/pingcap/parser/mysql"
uuid "github.com/satori/go.uuid"

"github.com/pingcap/tidb-lightning/lightning/kv"
kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/mock"
)

Expand Down
6 changes: 5 additions & 1 deletion lightning/kv/importer.go → lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package backend

import (
"context"
Expand Down Expand Up @@ -85,6 +85,10 @@ func (*importer) MaxChunkSize() int {
return 31 << 10
}

func (*importer) ShouldPostProcess() bool {
return true
}

// isIgnorableOpenCloseEngineError checks if the error from
// OpenEngine/CloseEngine can be safely ignored.
func isIgnorableOpenCloseEngineError(err error) bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kv_test
package backend_test

import (
"context"
Expand All @@ -10,7 +10,7 @@ import (
kvenc "github.com/pingcap/tidb/util/kvencoder"
uuid "github.com/satori/go.uuid"

"github.com/pingcap/tidb-lightning/lightning/kv"
kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/mock"
)

Expand Down
293 changes: 293 additions & 0 deletions lightning/backend/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
"context"
"database/sql"
"encoding/hex"
"strconv"
"strings"
"time"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"

"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb-lightning/lightning/verification"
)

type mysqlRow string

type mysqlRows []mysqlRow

type mysqlEncoder struct {
mode mysql.SQLMode
}

type mysqlBackend struct {
db *sql.DB
}

// NewMySQLBackend creates a new MySQL backend using the given database.
//
// The backend does not take ownership of `db`. Caller should close `db`
// manually after the backend expired.
func NewMySQLBackend(db *sql.DB) Backend {
return MakeBackend(&mysqlBackend{db: db})
}

func (row mysqlRow) ClassifyAndAppend(data *Rows, checksum *verification.KVChecksum, _ *Rows, _ *verification.KVChecksum) {
rows := (*data).(mysqlRows)
*data = mysqlRows(append(rows, row))
cs := verification.MakeKVChecksum(uint64(len(row)), 1, 0)
checksum.Add(&cs)
}

func (rows mysqlRows) SplitIntoChunks(splitSize int) []Rows {
if len(rows) == 0 {
return nil
}

res := make([]Rows, 0, 1)
i := 0
cumSize := 0

for j, row := range rows {
if i < j && cumSize+len(row) > splitSize {
res = append(res, rows[i:j])
i = j
cumSize = 0
}
cumSize += len(row)
}

return append(res, rows[i:])
}

func (rows mysqlRows) Clear() Rows {
return rows[:0]
}

func (enc mysqlEncoder) appendSQLBytes(sb *strings.Builder, value []byte) {
sb.Grow(2 + len(value))
sb.WriteByte('\'')
if enc.mode.HasNoBackslashEscapesMode() {
for _, b := range value {
if b == '\'' {
sb.WriteString(`''`)
} else {
sb.WriteByte(b)
}
}
} else {
for _, b := range value {
switch b {
case 0:
sb.WriteString(`\0`)
case '\b':
sb.WriteString(`\b`)
case '\n':
sb.WriteString(`\n`)
case '\r':
sb.WriteString(`\r`)
case '\t':
sb.WriteString(`\t`)
case 0x26:
sb.WriteString(`\Z`)
case '\'':
sb.WriteString(`''`)
case '\\':
sb.WriteString(`\\`)
default:
sb.WriteByte(b)
}
}
}
sb.WriteByte('\'')
}

// appendSQL appends the SQL representation of the Datum into the string builder.
// Note that we cannot use Datum.ToString since it doesn't perform SQL escaping.
func (enc mysqlEncoder) appendSQL(sb *strings.Builder, datum *types.Datum) error {
switch datum.Kind() {
case types.KindNull:
sb.WriteString("NULL")

case types.KindMinNotNull:
sb.WriteString("MINVALUE")

case types.KindMaxValue:
sb.WriteString("MAXVALUE")

case types.KindInt64:
// longest int64 = -9223372036854775808 which has 20 characters
var buffer [20]byte
value := strconv.AppendInt(buffer[:0], datum.GetInt64(), 10)
sb.Write(value)

case types.KindUint64, types.KindMysqlEnum, types.KindMysqlSet:
// longest uint64 = 18446744073709551615 which has 20 characters
var buffer [20]byte
value := strconv.AppendUint(buffer[:0], datum.GetUint64(), 10)
sb.Write(value)

case types.KindFloat32, types.KindFloat64:
// float64 has 16 digits of precision, so a buffer size of 32 is more than enough...
var buffer [32]byte
value := strconv.AppendFloat(buffer[:0], datum.GetFloat64(), 'g', -1, 64)
sb.Write(value)

case types.KindString, types.KindBytes:
enc.appendSQLBytes(sb, datum.GetBytes())

case types.KindMysqlJSON:
value, err := datum.GetMysqlJSON().MarshalJSON()
if err != nil {
return err
}
enc.appendSQLBytes(sb, value)

case types.KindBinaryLiteral:
value := datum.GetBinaryLiteral()
sb.Grow(2 + 2*len(value))
sb.WriteString("0x")
hex.NewEncoder(sb).Write(value)

case types.KindMysqlBit:
var buffer [20]byte
intValue, err := datum.GetBinaryLiteral().ToInt(nil)
if err != nil {
return err
}
value := strconv.AppendUint(buffer[:0], intValue, 10)
sb.Write(value)

// time, duration, decimal
default:
value, err := datum.ToString()
if err != nil {
return err
}
sb.WriteByte('\'')
sb.WriteString(value)
sb.WriteByte('\'')
}

return nil
}

func (mysqlEncoder) Close() {}

func (enc mysqlEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _ []int) (Row, error) {
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
for i, field := range row {
if i != 0 {
encoded.WriteByte(',')
}
if err := enc.appendSQL(&encoded, &field); err != nil {
logger.Error("mysql encode failed",
zap.Array("original", rowArrayMarshaler(row)),
zap.Int("originalCol", i),
log.ShortError(err),
)
return nil, err
}
}
encoded.WriteByte(')')
return mysqlRow(encoded.String()), nil
}

func (be *mysqlBackend) Close() {
// *Not* going to close `be.db`. The db object is normally borrowed from a
// TidbManager, so we let the manager to close it.
}

func (be *mysqlBackend) MakeEmptyRows() Rows {
return mysqlRows(nil)
}

func (be *mysqlBackend) RetryImportDelay() time.Duration {
return 0
}

func (be *mysqlBackend) MaxChunkSize() int {
return 1048576
}

func (be *mysqlBackend) ShouldPostProcess() bool {
return false
}

func (be *mysqlBackend) NewEncoder(_ table.Table, mode mysql.SQLMode) Encoder {
return mysqlEncoder{mode: mode}
}

func (be *mysqlBackend) OpenEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) CloseEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) CleanupEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) ImportEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName string, columnNames []string, _ uint64, r Rows) error {
rows := r.(mysqlRows)
if len(rows) == 0 {
return nil
}

var insertStmt strings.Builder
insertStmt.WriteString("INSERT INTO ")
insertStmt.WriteString(tableName)
if len(columnNames) > 0 {
insertStmt.WriteByte('(')
for i, colName := range columnNames {
if i != 0 {
insertStmt.WriteByte(',')
}
common.WriteMySQLIdentifier(&insertStmt, colName)
}
insertStmt.WriteByte(')')
}
insertStmt.WriteString(" VALUES")

// Note: we are not going to do interpolation (prepared statements) to avoid
// complication arised from data length overflow of BIT and BINARY columns

for i, row := range rows {
if i != 0 {
insertStmt.WriteByte(',')
}
insertStmt.WriteString(string(row))
}

// Retry will be done externally, so we're not going to retry here.
_, err := be.db.ExecContext(ctx, insertStmt.String())
return err
}
Loading

0 comments on commit 4eb74ec

Please sign in to comment.