Skip to content

Commit

Permalink
conn: add max idle conn config (pingcap#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Sep 10, 2019
1 parent 1ec32d7 commit 90c0ace
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 23 deletions.
2 changes: 1 addition & 1 deletion loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (conn *Conn) executeSQL(ctx *tcontext.Context, queries []string, args ...[]
func createConn(cfg *config.SubTaskConfig) (*Conn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&maxAllowedPacket=%d",
cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port, *cfg.To.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{})
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig())
if err != nil {
return nil, terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
Expand Down
22 changes: 20 additions & 2 deletions pkg/baseconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,30 @@ type BaseConn struct {
DSN string

RetryStrategy retry.Strategy

RawDBCfg *RawDBConfig
}

// RawDBConfig contains some low level database config
type RawDBConfig struct {
MaxIdleConns int
}

// DefaultRawDBConfig returns a default raw database config
func DefaultRawDBConfig() *RawDBConfig {
return &RawDBConfig{
MaxIdleConns: 2,
}
}

// NewBaseConn builds BaseConn to connect real DB
func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) {
func NewBaseConn(dbDSN string, strategy retry.Strategy, rawDBCfg *RawDBConfig) (*BaseConn, error) {
db, err := sql.Open("mysql", dbDSN)
if err != nil {
return nil, terror.ErrDBDriverError.Delegate(err)
}
// set max idle connection limit before any database call
db.SetMaxIdleConns(rawDBCfg.MaxIdleConns)
err = db.Ping()
if err != nil {
db.Close()
Expand All @@ -49,7 +65,7 @@ func NewBaseConn(dbDSN string, strategy retry.Strategy) (*BaseConn, error) {
if strategy == nil {
strategy = &retry.FiniteRetryStrategy{}
}
return &BaseConn{db, dbDSN, strategy}, nil
return &BaseConn{db, dbDSN, strategy, rawDBCfg}, nil
}

// SetRetryStrategy set retry strategy for baseConn
Expand All @@ -70,6 +86,8 @@ func (conn *BaseConn) ResetConn(tctx *tcontext.Context) error {
if err != nil {
return terror.ErrDBDriverError.Delegate(err)
}
// set max idle connection limit before any database call
db.SetMaxIdleConns(conn.RawDBCfg.MaxIdleConns)
err = db.Ping()
if err != nil {
db.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/baseconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type testBaseConnSuite struct {
}

func (t *testBaseConnSuite) TestBaseConn(c *C) {
baseConn, err := NewBaseConn("error dsn", nil)
baseConn, err := NewBaseConn("error dsn", nil, DefaultRawDBConfig())
c.Assert(terror.ErrDBDriverError.Equal(err), IsTrue)

tctx := tcontext.Background()
Expand All @@ -53,7 +53,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
baseConn = &BaseConn{db, "", nil}
baseConn = &BaseConn{db, "", nil, DefaultRawDBConfig()}

err = baseConn.SetRetryStrategy(&retry.FiniteRetryStrategy{})
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) {
mock.ExpectCommit()

// pass sqlmock baseConn directly
conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
conn := &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
err = cp.Init(conn)
c.Assert(err, IsNil)
cp.Clear()
Expand Down
13 changes: 8 additions & 5 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,18 @@ func (conn *Conn) executeSQL(tctx *tcontext.Context, queries []string, args ...[
return conn.executeSQLWithIgnore(tctx, nil, queries, args...)
}

func createBaseConn(dbCfg config.DBConfig, timeout string) (*baseconn.BaseConn, error) {
func createBaseConn(dbCfg config.DBConfig, timeout string, rawDBCfg *baseconn.RawDBConfig) (*baseconn.BaseConn, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&readTimeout=%s&maxAllowedPacket=%d",
dbCfg.User, dbCfg.Password, dbCfg.Host, dbCfg.Port, timeout, *dbCfg.MaxAllowedPacket)
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{})
baseConn, err := baseconn.NewBaseConn(dbDSN, &retry.FiniteRetryStrategy{}, rawDBCfg)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
return baseConn, nil
}

func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string) (*Conn, error) {
baseConn, err := createBaseConn(dbCfg, timeout)
baseConn, err := createBaseConn(dbCfg, timeout, baseconn.DefaultRawDBConfig())
if err != nil {
return nil, err
}
Expand All @@ -221,14 +221,17 @@ func createConn(cfg *config.SubTaskConfig, dbCfg config.DBConfig, timeout string
func createConns(cfg *config.SubTaskConfig, dbCfg config.DBConfig, count int, timeout string) ([]*Conn, error) {
dbs := make([]*Conn, 0, count)

baseConn, err := createBaseConn(dbCfg, timeout)
rawDBCfg := &baseconn.RawDBConfig{
MaxIdleConns: cfg.SyncerConfig.WorkerCount,
}
baseConn, err := createBaseConn(dbCfg, timeout, rawDBCfg)
if err != nil {
return nil, err
}
for i := 0; i < count; i++ {
// TODO use *sql.Conn instead of *sql.DB
// share db by all conns
bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy}
bc := &baseconn.BaseConn{baseConn.DB, baseConn.DSN, baseConn.RetryStrategy, rawDBCfg}
dbs = append(dbs, &Conn{baseConn: bc, cfg: cfg})
}
return dbs, nil
Expand Down
24 changes: 12 additions & 12 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,8 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) {

syncer := NewSyncer(s.cfg)
// use upstream db as mock downstream
syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
syncer.fromDB = &Conn{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.reset()

streamer, err := syncer.streamerProducer.generateStreamer(pos)
Expand Down Expand Up @@ -1216,14 +1216,14 @@ func (s *testSyncerSuite) TestSharding(c *C) {
syncer := NewSyncer(s.cfg)

// fromDB mocks upstream db, db mocks downstream db
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{fromDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}

// mock syncer.Init() function, because we need to pass mock dbs to different members' init
syncer.genRouter()
syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}})
syncer.initShardingGroups(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{shardGroupDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.reset()
events := append(createEvents, s.generateEvents(_case.testEvents, c)...)
syncer.streamerProducer = &MockStreamProducer{events}
Expand Down Expand Up @@ -1362,10 +1362,10 @@ func (s *testSyncerSuite) TestRun(c *C) {
s.cfg.DisableCausality = false

syncer := NewSyncer(s.cfg)
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}},
{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}}}
syncer.fromDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
syncer.toDBs = []*Conn{{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}},
{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}}
syncer.ddlDB = &Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{db, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}}
c.Assert(syncer.Type(), Equals, pb.UnitType_Sync)

syncer.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules)
Expand All @@ -1379,7 +1379,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
checkPointMock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1))
checkPointMock.ExpectCommit()

syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}}})
syncer.checkpoint.Init(&Conn{cfg: s.cfg, baseConn: &baseconn.BaseConn{checkPointDB, "", &retry.FiniteRetryStrategy{}, baseconn.DefaultRawDBConfig()}})
syncer.reset()
events1 := mockBinlogEvents{
mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}},
Expand Down

0 comments on commit 90c0ace

Please sign in to comment.