diff --git a/loader/db.go b/loader/db.go index efc200b3c9..ea9dff035a 100644 --- a/loader/db.go +++ b/loader/db.go @@ -152,7 +152,7 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[] func createConn(cfg *config.SubTaskConfig) (*Conn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}) + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()) if err != nil { return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } diff --git a/pkg/baseconn/conn.go b/pkg/baseconn/conn.go index e7edaf5231..423031e8a0 100644 --- a/pkg/baseconn/conn.go +++ b/pkg/baseconn/conn.go @@ -33,14 +33,30 @@ type BaseConn struct { DSN string RetryStrategy retry.Strategy + + RawDBCfg *RawDBConfig +} + +// RawDBConfig contains some low level database config +type RawDBConfig struct { + MaxIdleConns int +} + +// DefaultRawDBConfig returns a default raw database config +func DefaultRawDBConfig() *RawDBConfig { + return &RawDBConfig{ + MaxIdleConns: 2, + } } // NewBaseConn builds BaseConn to connect real DB -func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) { +func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) { db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, terror.ErrDBDriverError.Delegate(err) } + // set max idle connection limit before any database call + db.SetMaxIdleConns(rawDBCfg.MaxIdleConns) err = db.Ping() if err != nil { db.Close() @@ -49,7 +65,7 @@ func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) { if strategy == nil { strategy = &retry.FiniteRetryStrategy{} } - return &BaseConn{db, dbDSN, strategy}, nil + return &BaseConn{db, dbDSN, strategy, rawDBCfg}, nil } // SetRetryStrategy set retry strategy for baseConn @@ -70,6 +86,8 @@ func (conn *BaseConn) ResetConn(tctx *tcontext.Context) error { if err != nil { return terror.ErrDBDriverError.Delegate(err) } + // set max idle connection limit before any database call + db.SetMaxIdleConns(conn.RawDBCfg.MaxIdleConns) err = db.Ping() if err != nil { db.Close() diff --git a/pkg/baseconn/conn_test.go b/pkg/baseconn/conn_test.go index cdfdafa347..4863f592e8 100644 --- a/pkg/baseconn/conn_test.go +++ b/pkg/baseconn/conn_test.go @@ -35,7 +35,7 @@ type testBaseConnSuite struct { } func (t *testBaseConnSuite) TestBaseConn(c *C) { - baseConn, err := NewBaseConn("error dsn", nil) + baseConn, err := NewBaseConn("error dsn", nil, DefaultRawDBConfig()) c.Assert(terror.ErrDBDriverError.Equal(err), IsTrue) tctx := tcontext.Background() @@ -53,7 +53,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - baseConn = &BaseConn{db, "", nil} + baseConn = &BaseConn{db, "", nil, DefaultRawDBConfig()} err = baseConn.SetRetryStrategy(&retry.FiniteRetryStrategy{}) c.Assert(err, IsNil) diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 2b345dbf2f..3463968706 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -91,7 +91,7 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) { mock.ExpectCommit() // pass sqlmock baseConn directly - conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} err = cp.Init(conn) c.Assert(err, IsNil) cp.Clear() diff --git a/syncer/db.go b/syncer/db.go index 98c793fea0..6e2114080c 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -200,10 +200,10 @@ func (conn *Conn) executeSQL(tctx *tcontext.Context, queries []string, args ...[ return conn.executeSQLWithIgnore(tctx, nil, queries, args...) } -func createBaseConn(dbCfg config.DBConfig, timeout string) (*baseconn.BaseConn, error) { +func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.RawDBConfig) (*baseconn.BaseConn, error) { dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s&maxAllowedPacket=%d", dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout, *dbCfg.MaxAllowedPacket) - baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}) + baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg) if err != nil { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } @@ -211,7 +211,7 @@ func createBaseConn(dbCfg config.DBConfig, timeout string) (*baseconn.BaseConn, } func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) { - baseConn, err := createBaseConn(dbCfg, timeout) + baseConn, err := createBaseConn(dbCfg, timeout, baseconn.DefaultRawDBConfig()) if err != nil { return nil, err } @@ -221,14 +221,17 @@ func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) { dbs := make([]*Conn, 0, count) - baseConn, err := createBaseConn(dbCfg, timeout) + rawDBCfg := &baseconn.RawDBConfig{ + MaxIdleConns: cfg.SyncerConfig.WorkerCount, + } + baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg) if err != nil { return nil, err } for i := 0; i < count; i++ { // TODO use *sql.Conn instead of *sql.DB // share db by all conns - bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy} + bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy, rawDBCfg} dbs = append(dbs, &Conn{baseConn: bc, cfg: cfg}) } return dbs, nil diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index fdd0cbd84d..190e7877c1 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -935,8 +935,8 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { syncer := NewSyncer(s.cfg) // use upstream db as mock downstream - syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} + syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} syncer.reset() streamer, err := syncer.streamerProducer.generateStreamer(pos) @@ -1216,14 +1216,14 @@ func (s *testSyncerSuite) TestSharding(c *C) { syncer := NewSyncer(s.cfg) // fromDB mocks upstream db, db mocks downstream db - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} + syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} // mock syncer.Init() function, because we need to pass mock dbs to different members' init syncer.genRouter() - syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}}}) - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}}) + syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) + syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) syncer.reset() events := append(createEvents, s.generateEvents(_case.testEvents, c)...) syncer.streamerProducer = &MockStreamProducer{events} @@ -1362,10 +1362,10 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.DisableCausality = false syncer := NewSyncer(s.cfg) - syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} - syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}, - {cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}} - syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}} + syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} + syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}, + {cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}} + syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}} c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1379,7 +1379,7 @@ func (s *testSyncerSuite) TestRun(c *C) { checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectCommit() - syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}}) + syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}) syncer.reset() events1 := mockBinlogEvents{ mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}},