Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

conn: add max idle conn config #280

Merged
merged 5 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: 2}
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/baseconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type RawDBConfig struct {
MaxIdleConns int
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig() *RawDBConfig {
return &RawDBConfig{
MaxIdleConns: 2,
}
}

// NewBaseConn builds BaseConn to connect real DB
func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) {
db, err := sql.Open("mysql", dbDSN)
Expand Down
2 changes: 1 addition & 1 deletion syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) {
mock.ExpectCommit()

// pass sqlmock baseConn directly
conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
err = cp.Init(conn)
c.Assert(err, IsNil)
cp.Clear()
Expand Down
12 changes: 4 additions & 8 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,8 @@ func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.Ra
return baseConn, nil
}

func getRawDBConfig(cfg *config.SubTaskConfig) *baseconn.RawDBConfig {
return &baseconn.RawDBConfig{
MaxIdleConns: cfg.SyncerConfig.WorkerCount,
}
}

func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) {
baseConn, err := createBaseConn(dbCfg, timeout, getRawDBConfig(cfg))
baseConn, err := createBaseConn(dbCfg, timeout, &baseconn.RawDBConfig{MaxIdleConns: 2})
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -227,7 +221,9 @@ func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string
func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) {
dbs := make([]*Conn, 0, count)

rawDBCfg := getRawDBConfig(cfg)
rawDBCfg := &baseconn.RawDBConfig{
MaxIdleConns: cfg.SyncerConfig.WorkerCount,
}
baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg)
if err != nil {
return nil, err
Expand Down
25 changes: 13 additions & 12 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,9 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) {

syncer := NewSyncer(s.cfg)
// use upstream db as mock downstream
syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
rawDBCfg := &baseconn.RawDBConfig{MaxIdleConns: baseconn.DefaultMaxIdleConns}
syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.reset()

streamer, err := syncer.streamerProducer.generateStreamer(pos)
Expand Down Expand Up @@ -1216,14 +1217,14 @@ func (s *testSyncerSuite) TestSharding(c *C) {
syncer := NewSyncer(s.cfg)

// fromDB mocks upstream db, db mocks downstream db
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}

// mock syncer.Init() function, because we need to pass mock dbs to different members' init
syncer.genRouter()
syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}})
syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.reset()
events := append(createEvents, s.generateEvents(_case.testEvents, c)...)
syncer.streamerProducer = &MockStreamProducer{events}
Expand Down Expand Up @@ -1362,10 +1363,10 @@ func (s *testSyncerSuite) TestRun(c *C) {
s.cfg.DisableCausality = false

syncer := NewSyncer(s.cfg)
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}},
{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}},
{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
c.Assert(syncer.Type(), Equals, pb.UnitType_Sync)

syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules)
Expand All @@ -1379,7 +1380,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()

syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.reset()
events1 := mockBinlogEvents{
mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}},
Expand Down