Skip to content

Commit

Permalink
tidb: support connecting to multi intances directly
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <zhongyangguan@gmail.com>
  • Loading branch information
zyguan committed Oct 22, 2022
1 parent 4d727a7 commit 25f724a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Common configurations:
|mysql.password||MySQL Password|
|mysql.db|"test"|MySQL Database|
|tidb.cluster_index|true|Whether to use cluster index, for TiDB only|
|tidb.instances|""|Comma-seperated address list of tidb instances (eg: `tidb-0:4000,tidb-1:4000`)|


### TiKV
Expand Down
63 changes: 55 additions & 8 deletions db/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ package mysql
import (
"bytes"
"context"
"crypto/sha1"
"database/sql"
"database/sql/driver"
"encoding/hex"
"fmt"
"strings"
"sync/atomic"

"github.com/go-sql-driver/mysql"
"github.com/magiconair/properties"
"github.com/pingcap/go-ycsb/pkg/prop"
"github.com/pingcap/go-ycsb/pkg/util"

// mysql package
_ "github.com/go-sql-driver/mysql"
"github.com/magiconair/properties"
"github.com/pingcap/go-ycsb/pkg/ycsb"
)

Expand All @@ -40,8 +42,38 @@ const (
// TODO: support batch and auto commit

tidbClusterIndex = "tidb.cluster_index"
tidbInstances = "tidb.instances"
)

type muxDriver struct {
cursor uint64
instances []string
internal driver.Driver
}

func (drv *muxDriver) Open(name string) (driver.Conn, error) {
k := atomic.AddUint64(&drv.cursor, 1)
return drv.internal.Open(drv.instances[int(k)%len(drv.instances)])
}

func openTiDBInstances(addrs []string, user string, pass string, db string) (*sql.DB, error) {
instances := make([]string, len(addrs))
hash := sha1.New()
for i, addr := range addrs {
hash.Write([]byte("+" + addr))
instances[i] = fmt.Sprintf("%s:%s@tcp(%s)/%s", user, pass, addr, db)
}
digest := hash.Sum(nil)
driver := "tidb:" + hex.EncodeToString(digest[:])
for _, n := range sql.Drivers() {
if n == driver {
return sql.Open(driver, "")
}
}
sql.Register(driver, &muxDriver{instances: instances, internal: &mysql.MySQLDriver{}})
return sql.Open(driver, "")
}

type mysqlCreator struct {
name string
}
Expand Down Expand Up @@ -75,10 +107,25 @@ func (c mysqlCreator) Create(p *properties.Properties) (ycsb.DB, error) {
user := p.GetString(mysqlUser, "root")
password := p.GetString(mysqlPassword, "")
dbName := p.GetString(mysqlDBName, "test")

dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, dbName)
var err error
db, err := sql.Open("mysql", dsn)
tidbList := p.GetString(tidbInstances, "")

var (
db *sql.DB
err error
tidbs []string
)
for _, tidb := range strings.Split(tidbList, ",") {
tidb = strings.TrimSpace(tidb)
if len(tidb) > 0 {
tidbs = append(tidbs, tidb)
}
}
if len(tidbs) > 0 {
db, err = openTiDBInstances(tidbs, user, password, dbName)
} else {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, dbName)
db, err = sql.Open("mysql", dsn)
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 25f724a

Please sign in to comment.