From faadb1defb360ccac848e003fb97575437eb0513 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Sep 2019 10:43:51 +0800 Subject: [PATCH 1/5] conn: add max idle conn config --- loader/db.go | 5 ++++- pkg/baseconn/conn.go | 15 +++++++++++++-- pkg/baseconn/conn_test.go | 2 +- syncer/db.go | 17 ++++++++++++----- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/loader/db.go b/loader/db.go index efc200b3c9..54dd7ba7c5 100644 --- a/loader/db.go +++ b/loader/db.go @@ -152,7 +152,10 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] func createConn(cfg *config.SubTaskConfig) (*Conn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}) + connCfg := &baseconn.RawDBConfig{ + MaxIdleConns: cfg.LoaderConfig.PoolSize, + } + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, connCfg) if err != nil { return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } diff --git a/pkg/baseconn/conn.go b/pkg/baseconn/conn.go index e7edaf5231..cb28ddbce3 100644 --- a/pkg/baseconn/conn.go +++ b/pkg/baseconn/conn.go @@ -33,14 +33,23 @@ type BaseConn struct { DSN string RetryStrategy retry.Strategy + + RawDBCfg *RawDBConfig +} + +// RawDBConfig contains some low level database config +type RawDBConfig struct { + MaxIdleConns int } // NewBaseConn builds BaseConn to connect real DB -func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) { +func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) { db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, terror.ErrDBDriverError.Delegate(err) } + // set max idle connection limit before any database call + db.SetMaxIdleConns(rawDBCfg.MaxIdleConns) err = db.Ping() if err != nil { db.Close() @@ -49,7 +58,7 @@ func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) { if strategy == nil { strategy = &retry.FiniteRetryStrategy{} } - return &BaseConn{db, dbDSN, strategy}, nil + return &BaseConn{db, dbDSN, strategy, rawDBCfg}, nil } // SetRetryStrategy set retry strategy for baseConn @@ -70,6 +79,8 @@ func (conn *BaseConn) ResetConn(tctx *tcontext.Context) error { if err != nil { return terror.ErrDBDriverError.Delegate(err) } + // set max idle connection limit before any database call + db.SetMaxIdleConns(conn.RawDBCfg.MaxIdleConns) err = db.Ping() if err != nil { db.Close() diff --git a/pkg/baseconn/conn_test.go b/pkg/baseconn/conn_test.go index cdfdafa347..0ce7cd0337 100644 --- a/pkg/baseconn/conn_test.go +++ b/pkg/baseconn/conn_test.go @@ -35,7 +35,7 @@ type testBaseConnSuite struct { } func (t *testBaseConnSuite) TestBaseConn(c *C) { - baseConn, err := NewBaseConn("error dsn", nil) + baseConn, err := NewBaseConn("error dsn", nil, &BaseConnConfig{MaxIdleConns: 2}) c.Assert(terror.ErrDBDriverError.Equal(err), IsTrue) tctx := tcontext.Background() diff --git a/syncer/db.go b/syncer/db.go index 98c793fea0..cd163cb167 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -200,18 +200,24 @@ func (conn *Conn) executeSQL(tctx *tcontext.Context, queries []string, args ...[ return conn.executeSQLWithIgnore(tctx, nil, queries, args...) } -func createBaseConn(dbCfg config.DBConfig, timeout string) (*baseconn.BaseConn, error) { +func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.RawDBConfig) (*baseconn.BaseConn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s&maxAllowedPacket=%d", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout, *dbCfg.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}) + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg) if err != nil { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } return baseConn, nil } +func getRawDBConfig(cfg *config.SubTaskConfig) *baseconn.RawDBConfig { + return &baseconn.RawDBConfig{ + MaxIdleConns: cfg.SyncerConfig.WorkerCount, + } +} + func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - baseConn, err := createBaseConn(dbCfg, timeout) + baseConn, err := createBaseConn(dbCfg, timeout, getRawDBConfig(cfg)) if err != nil { return nil, err } @@ -221,14 +227,15 @@ func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) { dbs := make([]*Conn, 0, count) - baseConn, err := createBaseConn(dbCfg, timeout) + rawDBCfg := getRawDBConfig(cfg) + baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg) if err != nil { return nil, err } for i := 0; i < count; i++ { // TODO use *sql.Conn instead of *sql.DB // share db by all conns - bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy} + bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy, rawDBCfg} dbs = append(dbs, &Conn{baseConn: bc, cfg: cfg}) } return dbs, nil From 7985a05c1dd43f716e497f4919ffd4ae00feb1b5 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Sep 2019 13:29:23 +0800 Subject: [PATCH 2/5] address comment, set loader MaxIdleConns to 2 --- loader/db.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/loader/db.go b/loader/db.go index 54dd7ba7c5..022126e7d3 100644 --- a/loader/db.go +++ b/loader/db.go @@ -152,10 +152,8 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] func createConn(cfg *config.SubTaskConfig) (*Conn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket) - connCfg := &baseconn.RawDBConfig{ - MaxIdleConns: cfg.LoaderConfig.PoolSize, - } - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, connCfg) + rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: 2} + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg) if err != nil { return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } From b2b822f5ec390ef892657f3f5f4726bbffa2e047 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Sep 2019 13:46:08 +0800 Subject: [PATCH 3/5] address comment --- loader/db.go | 3 +-- pkg/baseconn/conn.go | 7 +++++++ syncer/checkpoint_test.go | 2 +- syncer/db.go | 12 ++++-------- syncer/syncer_test.go | 25 +++++++++++++------------ 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/loader/db.go b/loader/db.go index 022126e7d3..ea9dff035a 100644 --- a/loader/db.go +++ b/loader/db.go @@ -152,8 +152,7 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] func createConn(cfg *config.SubTaskConfig) (*Conn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket) - rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: 2} - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg) + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()) if err != nil { return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } diff --git a/pkg/baseconn/conn.go b/pkg/baseconn/conn.go index cb28ddbce3..423031e8a0 100644 --- a/pkg/baseconn/conn.go +++ b/pkg/baseconn/conn.go @@ -42,6 +42,13 @@ type RawDBConfig struct { MaxIdleConns int } +// DefaultRawDBConfig returns a default raw database config +func DefaultRawDBConfig() *RawDBConfig { + return &RawDBConfig{ + MaxIdleConns: 2, + } +} + // NewBaseConn builds BaseConn to connect real DB func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) { db, err := sql.Open("mysql", dbDSN) diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 2b345dbf2f..3463968706 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -91,7 +91,7 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) { mock.ExpectCommit() // pass sqlmock baseConn directly - conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} err = cp.Init(conn) c.Assert(err, IsNil) cp.Clear() diff --git a/syncer/db.go b/syncer/db.go index cd163cb167..87affcb124 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -210,14 +210,8 @@ func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.Ra return baseConn, nil } -func getRawDBConfig(cfg *config.SubTaskConfig) *baseconn.RawDBConfig { - return &baseconn.RawDBConfig{ - MaxIdleConns: cfg.SyncerConfig.WorkerCount, - } -} - func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - baseConn, err := createBaseConn(dbCfg, timeout, getRawDBConfig(cfg)) + baseConn, err := createBaseConn(dbCfg, timeout, &baseconn.RawDBConfig{MaxIdleConns: 2}) if err != nil { return nil, err } @@ -227,7 +221,9 @@ func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) { dbs := make([]*Conn, 0, count) - rawDBCfg := getRawDBConfig(cfg) + rawDBCfg := &baseconn.RawDBConfig{ + MaxIdleConns: cfg.SyncerConfig.WorkerCount, + } baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg) if err != nil { return nil, err diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index fdd0cbd84d..a0ff6763c6 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -935,8 +935,9 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { syncer := NewSyncer(s.cfg) // use upstream db as mock downstream - syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} + rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: baseconn.DefaultMaxIdleConns} + syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} syncer.reset() streamer, err := syncer.streamerProducer.generateStreamer(pos) @@ -1216,14 +1217,14 @@ func (s *testSyncerSuite) TestSharding(c *C) { syncer := NewSyncer(s.cfg) // fromDB mocks upstream db, db mocks downstream db - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} + syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} // mock syncer.Init() function, because we need to pass mock dbs to different members' init syncer.genRouter() - syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}}}) - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}}) + syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) + syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) syncer.reset() events := append(createEvents, s.generateEvents(_case.testEvents, c)...) syncer.streamerProducer = &MockStreamProducer{events} @@ -1362,10 +1363,10 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.DisableCausality = false syncer := NewSyncer(s.cfg) - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}, - {cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}, + {cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} + syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1379,7 +1380,7 @@ func (s *testSyncerSuite) TestRun(c *C) { checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectCommit() - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}}) + syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) syncer.reset() events1 := mockBinlogEvents{ mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}}, From abaf8ff2e561eb92bf3a41b23d4423da190ac561 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Sep 2019 13:49:32 +0800 Subject: [PATCH 4/5] minor fix --- pkg/baseconn/conn_test.go | 5 +++-- syncer/db.go | 2 +- syncer/syncer_test.go | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/baseconn/conn_test.go b/pkg/baseconn/conn_test.go index 0ce7cd0337..33cb99e177 100644 --- a/pkg/baseconn/conn_test.go +++ b/pkg/baseconn/conn_test.go @@ -17,6 +17,7 @@ import ( "errors" "testing" + "github.com/pingcap/dm/pkg/baseconn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" @@ -35,7 +36,7 @@ type testBaseConnSuite struct { } func (t *testBaseConnSuite) TestBaseConn(c *C) { - baseConn, err := NewBaseConn("error dsn", nil, &BaseConnConfig{MaxIdleConns: 2}) + baseConn, err := NewBaseConn("error dsn", nil, DefaultRawDBConfig()) c.Assert(terror.ErrDBDriverError.Equal(err), IsTrue) tctx := tcontext.Background() @@ -53,7 +54,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - baseConn = &BaseConn{db, "", nil} + baseConn = &BaseConn{db, "", nil, DefaultRawDBConfig()} err = baseConn.SetRetryStrategy(&retry.FiniteRetryStrategy{}) c.Assert(err, IsNil) diff --git a/syncer/db.go b/syncer/db.go index 87affcb124..6e2114080c 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -211,7 +211,7 @@ func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.Ra } func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - baseConn, err := createBaseConn(dbCfg, timeout, &baseconn.RawDBConfig{MaxIdleConns: 2}) + baseConn, err := createBaseConn(dbCfg, timeout, baseconn.DefaultRawDBConfig()) if err != nil { return nil, err } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a0ff6763c6..190e7877c1 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -935,7 +935,6 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { syncer := NewSyncer(s.cfg) // use upstream db as mock downstream - rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: baseconn.DefaultMaxIdleConns} syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} syncer.reset() From b66108ab498f10d9bd9dd76241752bbb11cd181c Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 10 Sep 2019 13:53:35 +0800 Subject: [PATCH 5/5] fix test --- pkg/baseconn/conn_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/baseconn/conn_test.go b/pkg/baseconn/conn_test.go index 33cb99e177..4863f592e8 100644 --- a/pkg/baseconn/conn_test.go +++ b/pkg/baseconn/conn_test.go @@ -17,7 +17,6 @@ import ( "errors" "testing" - "github.com/pingcap/dm/pkg/baseconn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror"