Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

tracker: get some session variables from downtream #1032

Merged
merged 29 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9eac402
tracker: get alter-pk and some session variables from downtream
lance6716 Sep 14, 2020
3cb2870
add test
lance6716 Sep 14, 2020
a81e238
fix hound
lance6716 Sep 14, 2020
6121699
Merge branch 'master' into retrieve-tidb
lance6716 Sep 14, 2020
76f66ee
change test
lance6716 Sep 14, 2020
514d419
change config structure
lance6716 Sep 14, 2020
547af04
use more common way to retrieve alter-pk
lance6716 Sep 15, 2020
817df9c
fix CI
lance6716 Sep 15, 2020
4ef9e8d
fix CI
lance6716 Sep 15, 2020
1cca733
Merge branch 'master' of https://github.com/pingcap/dm into retrieve-…
lance6716 Sep 15, 2020
6e2f4bb
Merge branch 'master' into retrieve-tidb
lance6716 Sep 15, 2020
0357f8c
fix CI
lance6716 Sep 15, 2020
80d1005
Merge branch 'retrieve-tidb' of github.com:lance6716/dm into retrieve…
lance6716 Sep 15, 2020
13150c0
add integration test
lance6716 Sep 15, 2020
c0d0a99
Merge branch 'master' of https://github.com/pingcap/dm into retrieve-…
lance6716 Sep 16, 2020
0ace2c9
address comment
lance6716 Sep 16, 2020
ff807c8
remove tidbconfig arg
lance6716 Sep 16, 2020
2d0af11
Merge branch 'master' of https://github.com/pingcap/dm into retrieve-…
lance6716 Sep 16, 2020
0bc3094
fix test
lance6716 Sep 17, 2020
ab8b918
Merge branch 'master' into retrieve-tidb
lance6716 Sep 17, 2020
ed25402
Merge branch 'master' into retrieve-tidb
lance6716 Sep 17, 2020
914c285
address comment
lance6716 Sep 17, 2020
6e63459
Merge branch 'retrieve-tidb' of github.com:lance6716/dm into retrieve…
lance6716 Sep 17, 2020
87eeda5
Merge branch 'master' of https://github.com/pingcap/dm into retrieve-…
lance6716 Sep 18, 2020
6ec2b8d
test CI
lance6716 Sep 18, 2020
86dd372
Merge branch 'master' into retrieve-tidb
lance6716 Sep 21, 2020
8fc1466
address comment
lance6716 Sep 21, 2020
b538de5
Merge branch 'retrieve-tidb' of github.com:lance6716/dm into retrieve…
lance6716 Sep 21, 2020
1a0f80e
Merge branch 'master' into retrieve-tidb
lance6716 Sep 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scop
ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=internal:level=medium], "Message: %s is not a valid `CREATE TABLE` statement"
ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement"
ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for `%s`.`%s` in schema tracker"
ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists"
Expand Down
13 changes: 5 additions & 8 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func (c *RawDBConfig) SetMaxIdleConns(value int) *RawDBConfig {

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
Host string `toml:"host" json:"host" yaml:"host"`
Port int `toml:"port" json:"port" yaml:"port"`
User string `toml:"user" json:"user" yaml:"user"`
Password string `toml:"password" json:"-" yaml:"password"` // omit it for privacy
// deprecated, mysql driver could automatically fetch this value
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`
Session map[string]string `toml:"session" json:"session" yaml:"session"`

Expand Down Expand Up @@ -121,10 +122,6 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove it now?

Copy link
Collaborator Author

@lance6716 lance6716 Sep 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, maybe we could use it in future 🤔

Copy link
Collaborator Author

@lance6716 lance6716 Sep 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so many reference, I prefer leave it here in case of we need it in future

if db.MaxAllowedPacket == nil {
cloneV := defaultMaxAllowedPacket
db.MaxAllowedPacket = &cloneV
}
}

// SubTaskConfig is the configuration for SubTask
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case stage, ok := <-stageCh:
if !ok {
closed = true
break
}
opType, err := w.operateSubTaskStageWithoutConfig(stage)
if err != nil {
Expand All @@ -422,6 +423,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
case err, ok := <-errCh:
if !ok {
closed = true
break
}
// TODO: deal with err
log.L().Error("WatchSubTaskStage received an error", zap.Error(err))
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,12 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44012]
message = "failed to create schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-scheduler-46001]
message = "the scheduler has not started"
description = ""
Expand Down
4 changes: 2 additions & 2 deletions loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
printStatusInterval = time.Second * 5
)

// Status implements SubTaskUnit.Status
// Status implements Unit.Status
func (l *Loader) Status() interface{} {
finishedSize := l.finishedDataSize.Get()
totalSize := l.totalDataSize.Get()
Expand All @@ -41,7 +41,7 @@ func (l *Loader) Status() interface{} {
return s
}

// Error implements SubTaskUnit.Error
// Error implements Unit.Error
func (l *Loader) Error() interface{} {
return &pb.LoadError{}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ var mockDB sqlmock.Sqlmock

// Apply will build BaseDB with DBConfig
func (d *DefaultDBProviderImpl) Apply(config config.DBConfig) (*BaseDB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=%d",
config.User, config.Password, config.Host, config.Port, *config.MaxAllowedPacket)
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)

doFuncInClose := func() {}
if config.Security != nil && len(config.Security.SSLCA) != 0 &&
Expand Down
1 change: 1 addition & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
"unsupported drop integer primary key",
"Unsupported collation",
"Invalid default value for",
"Unsupported drop primary key",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
Expand Down
43 changes: 41 additions & 2 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,67 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
tidbConfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"

"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
)

const (
waitDDLRetryCount = 10
schemaLeaseTime = 10 * time.Millisecond
)

var (
sessionVars = []string{"sql_mode", "tidb_skip_utf8_check"}
)

// Tracker is used to track schema locally.
type Tracker struct {
store kv.Storage
dom *domain.Domain
se session.Session
}

// NewTracker creates a new tracker.
func NewTracker(sessionCfg map[string]string) (*Tracker, error) {
// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream TiDB using `tidbConn`.
func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) {
// NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed,
// we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey
toSet := tidbConfig.NewConfig()
toSet.AlterPrimaryKey = true
tidbConfig.StoreGlobalConfig(toSet)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any test for it? rest LGTM

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if len(sessionCfg) == 0 {
sessionCfg = make(map[string]string)
var ignoredColumn interface{}
for _, k := range sessionVars {
rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k))
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err2 != nil {
return nil, err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
}
sessionCfg[k] = value
}
if err2 = rows.Close(); err2 != nil {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
return nil, err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
}
}
}

store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.MockTiKV))
if err != nil {
return nil, err
Expand Down
103 changes: 86 additions & 17 deletions pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema_test
package schema

import (
"context"
"database/sql"
"encoding/json"
"sort"
"testing"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/schema"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap/zapcore"

"github.com/pingcap/dm/pkg/conn"
)

func Test(t *testing.T) {
Expand All @@ -32,17 +35,78 @@ func Test(t *testing.T) {

var _ = Suite(&trackerSuite{})

type trackerSuite struct{}
var (
defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"}
)

type trackerSuite struct {
baseConn *conn.BaseConn
db *sql.DB
backupKeys []string
}

func (s *trackerSuite) SetUpSuite(c *C) {
s.backupKeys = sessionVars
sessionVars = []string{"sql_mode"}
db, _, err := sqlmock.New()
s.db = db
c.Assert(err, IsNil)
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
s.baseConn = conn.NewBaseConn(con, nil)
}

func (s *trackerSuite) TearDownSuite(c *C) {
s.db.Close()
sessionVars = s.backupKeys
}

func (s *trackerSuite) TestSessionCfg(c *C) {
func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
log.SetLevel(zapcore.ErrorLevel)

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)

// user give correct session config
_, err = NewTracker(defaultTestSessionCfg, baseConn)
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
tracker, err := schema.NewTracker(sessionCfg)
_, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, NotNil)

tracker, err = schema.NewTracker(nil)
// discover session config failed, will return error
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(nil, baseConn)
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", ""))
tracker, err := NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
c.Assert(err, IsNil)

// found session config in downstream
mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(nil, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasStrictMode(), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -51,11 +115,12 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00')")
c.Assert(err, NotNil)

// set session config
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = schema.NewTracker(sessionCfg)
tracker, err = NewTracker(sessionCfg, baseConn)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)
Expand All @@ -75,30 +140,34 @@ func (s *trackerSuite) TestSessionCfg(c *C) {
cts, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, IsNil)
c.Assert(cts, Equals, "CREATE TABLE \"foo\" ( \"a\" varchar(255) NOT NULL, PRIMARY KEY (\"a\")) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")

// test alter primary key
err = tracker.Exec(ctx, "testdb", "alter table \"foo\" drop primary key")
c.Assert(err, IsNil)
}

func (s *trackerSuite) TestDDL(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Table shouldn't exist before initialization.
_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

_, err = tracker.GetCreateTable(context.Background(), "testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
c.Assert(err, IsNil)

_, err = tracker.GetTable("testdb", "foo")
c.Assert(err, ErrorMatches, `.*Table 'testdb\.foo' doesn't exist`)
c.Assert(schema.IsTableNotExists(err), IsTrue)
c.Assert(IsTableNotExists(err), IsTrue)

// Now create the table with 3 columns.
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b varchar(255) as (concat(a, a)), c int)")
Expand Down Expand Up @@ -147,7 +216,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -186,7 +255,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// We cannot create a table without a database.
Expand Down Expand Up @@ -214,7 +283,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down Expand Up @@ -258,7 +327,7 @@ func (aj asJSON) String() string {
func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// Create some sort of complicated table.
Expand Down Expand Up @@ -322,7 +391,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := schema.NewTracker(nil)
tracker, err := NewTracker(defaultTestSessionCfg, s.baseConn)
c.Assert(err, IsNil)

// nothing should exist...
Expand Down
Loading