Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support for connecting to multple instances directly #149

Merged
merged 2 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
var chConfig ch.Config
var (
apConnParams string
apHost string
apPort int
apHosts []string
apPorts []int
)

func registerCHBenchmark(root *cobra.Command) {
Expand Down Expand Up @@ -70,22 +70,20 @@ func registerCHBenchmark(root *cobra.Command) {
var cmdRun = &cobra.Command{
Use: "run",
Short: "Run workload",
PreRun: func(cmd *cobra.Command, args []string) {
if len(apConnParams) == 0 {
apConnParams = connParams
}
if len(apHosts) == 0 {
apHosts = hosts
}
if len(apPorts) == 0 {
apPorts = ports
}
},
Run: func(cmd *cobra.Command, _ []string) {
executeCH("run", func() string {
origConnParams, origHost, origPort := connParams, host, port
defer func() {
connParams, host, port = origConnParams, origHost, origPort
}()
if len(apConnParams) > 0 {
connParams = apConnParams
}
if len(apHost) > 0 {
host = apHost
}
if apPort > 0 {
port = apPort
}
return buildDSN(false)
executeCH("run", func() (*sql.DB, error) {
return newDB(makeTargets(apHosts, apPorts), driver, user, password, dbName, apConnParams)
})
},
}
Expand All @@ -106,13 +104,13 @@ func registerCHBenchmark(root *cobra.Command) {

cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().StringVar(&apConnParams, "ap-conn-params", "", "Connection parameters for analytical processing")
cmdRun.Flags().StringVar(&apHost, "ap-host", "", "Database host for analytical processing")
cmdRun.Flags().IntVar(&apPort, "ap-port", 0, "Database port for analytical processing")
cmdRun.Flags().StringSliceVar(&apHosts, "ap-host", nil, "Database host for analytical processing")
cmdRun.Flags().IntSliceVar(&apPorts, "ap-port", nil, "Database port for analytical processing")
cmd.AddCommand(cmdRun, cmdPrepare)
root.AddCommand(cmd)
}

func executeCH(action string, buildDSNForAP func() string) {
func executeCH(action string, openAP func() (*sql.DB, error)) {
runtime.GOMAXPROCS(maxProcs)

openDB()
Expand All @@ -127,11 +125,7 @@ func executeCH(action string, buildDSNForAP func() string) {
chConfig.Driver = driver
chConfig.DBName = dbName
chConfig.QueryNames = strings.Split(chConfig.RawQueries, ",")
if len(apHost) > 0 {
chConfig.PlanReplayerConfig.Host = apHost
} else {
chConfig.PlanReplayerConfig.Host = host
}
chConfig.PlanReplayerConfig.Host = apHosts[0]
chConfig.PlanReplayerConfig.StatusPort = statusPort

var (
Expand All @@ -143,10 +137,10 @@ func executeCH(action string, buildDSNForAP func() string) {
fmt.Printf("Failed to init tp work loader: %v\n", err)
os.Exit(1)
}
if buildDSNForAP == nil {
if openAP == nil {
ap = ch.NewWorkloader(globalDB, &chConfig)
} else {
db, err := sql.Open(driver, buildDSNForAP())
db, err := openAP()
if err != nil {
fmt.Printf("Failed to open db for analytical processing: %v\n", err)
os.Exit(1)
Expand Down
146 changes: 102 additions & 44 deletions cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,32 @@ package main

import (
"context"
"crypto/sha1"
"database/sql"
sqldrv "database/sql/driver"
"encoding/hex"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

"github.com/pingcap/go-tpc/pkg/util"
"github.com/spf13/cobra"

// mysql package
_ "github.com/go-sql-driver/mysql"
"github.com/go-sql-driver/mysql"
// pg
_ "github.com/lib/pq"
"github.com/lib/pq"
)

var (
dbName string
host string
port int
hosts []string
ports []int
statusPort int
user string
password string
Expand All @@ -41,6 +46,7 @@ var (
maxProcs int
connParams string
outputStyle string
targets []string

globalDB *sql.DB
globalCtx context.Context
Expand All @@ -52,65 +58,96 @@ const (
pgDriver = "postgres"
)

func closeDB() {
if globalDB != nil {
globalDB.Close()
}
globalDB = nil
type MuxDriver struct {
cursor uint64
instances []string
internal sqldrv.Driver
}

func buildDSN(tmp bool) string {
switch driver {
case mysqlDriver:
if tmp {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/", user, password, host, port)
}
// allow multiple statements in one query to allow q15 on the TPC-H
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?multiStatements=true", user, password, host, port, dbName)
if len(connParams) > 0 {
dsn = dsn + "&" + connParams
func (drv *MuxDriver) Open(name string) (sqldrv.Conn, error) {
k := atomic.AddUint64(&drv.cursor, 1)
return drv.internal.Open(drv.instances[int(k)%len(drv.instances)])
}

func makeTargets(hosts []string, ports []int) []string {
targets := make([]string, 0, len(hosts)*len(ports))
for _, host := range hosts {
for _, port := range ports {
targets = append(targets, host+":"+strconv.Itoa(port))
}
return dsn
case pgDriver:
if tmp {
return fmt.Sprintf("postgres://%s:%s@%s:%d/?%s", user, password, host, port, connParams)
}
return targets
}

func newDB(targets []string, driver string, user string, password string, dbName string, connParams string) (*sql.DB, error) {
if len(targets) == 0 {
panic(fmt.Errorf("empty targets"))
}
var (
drv sqldrv.Driver
hash = sha1.New()
names = make([]string, len(targets))
)
hash.Write([]byte(driver))
hash.Write([]byte(user))
hash.Write([]byte(password))
hash.Write([]byte(dbName))
hash.Write([]byte(connParams))
for i, addr := range targets {
hash.Write([]byte(addr))
switch driver {
case mysqlDriver:
// allow multiple statements in one query to allow q15 on the TPC-H
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?multiStatements=true", user, password, addr, dbName)
if len(connParams) > 0 {
dsn = dsn + "&" + connParams
}
names[i] = dsn
drv = &mysql.MySQLDriver{}
case pgDriver:
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s", user, password, addr, dbName)
if len(connParams) > 0 {
dsn = dsn + "?" + connParams
}
names[i] = dsn
drv = &pq.Driver{}
default:
panic(fmt.Errorf("unknown driver: %q", driver))
}
dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", user, password, host, port, dbName)
if len(connParams) > 0 {
dsn = dsn + "?" + connParams
}

if len(names) == 1 {
return sql.Open(driver, names[0])
}
drvName := driver + "+" + hex.EncodeToString(hash.Sum(nil))
for _, n := range sql.Drivers() {
if n == drvName {
return sql.Open(drvName, "")
}
return dsn
default:
panic(fmt.Errorf("unknown driver: %q", driver))
}
sql.Register(drvName, &MuxDriver{instances: names, internal: drv})
return sql.Open(drvName, "")
}

func isDBNotExist(err error) bool {
if err == nil {
return false
}
switch driver {
case mysqlDriver:
return strings.Contains(err.Error(), "Unknown database")
case pgDriver:
msg := err.Error()
return strings.HasPrefix(msg, "pq: database") && strings.HasSuffix(msg, "does not exist")
func closeDB() {
if globalDB != nil {
globalDB.Close()
}
return false
globalDB = nil
}

func openDB() {
var (
tmpDB *sql.DB
err error
)
globalDB, err = sql.Open(driver, buildDSN(false))
globalDB, err = newDB(targets, driver, user, password, dbName, connParams)
if err != nil {
panic(err)
}
if err := globalDB.Ping(); err != nil {
if isDBNotExist(err) {
tmpDB, _ = sql.Open(driver, buildDSN(true))
tmpDB, _ = newDB(targets, driver, user, password, "", connParams)
defer tmpDB.Close()
if _, err := tmpDB.Exec(createDBDDL + dbName); err != nil {
panic(fmt.Errorf("failed to create database, err %v\n", err))
Expand All @@ -123,19 +160,38 @@ func openDB() {
}
}

func isDBNotExist(err error) bool {
if err == nil {
return false
}
switch driver {
case mysqlDriver:
return strings.Contains(err.Error(), "Unknown database")
case pgDriver:
msg := err.Error()
return strings.HasPrefix(msg, "pq: database") && strings.HasSuffix(msg, "does not exist")
}
return false
}

func main() {
var rootCmd = &cobra.Command{
Use: "go-tpc",
Short: "Benchmark database with different workloads",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if len(targets) == 0 {
targets = makeTargets(hosts, ports)
}
},
}
rootCmd.PersistentFlags().IntVar(&maxProcs, "max-procs", 0, "runtime.GOMAXPROCS")
rootCmd.PersistentFlags().StringVar(&pprofAddr, "pprof", "", "Address of pprof endpoint")
rootCmd.PersistentFlags().StringVar(&metricsAddr, "metrics-addr", "", "Address of metrics endpoint")
rootCmd.PersistentFlags().StringVarP(&dbName, "db", "D", "test", "Database name")
rootCmd.PersistentFlags().StringVarP(&host, "host", "H", "127.0.0.1", "Database host")
rootCmd.PersistentFlags().StringSliceVarP(&hosts, "host", "H", []string{"127.0.0.1"}, "Database host")
rootCmd.PersistentFlags().StringVarP(&user, "user", "U", "root", "Database user")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Database password")
rootCmd.PersistentFlags().IntVarP(&port, "port", "P", 4000, "Database port")
rootCmd.PersistentFlags().IntSliceVarP(&ports, "port", "P", []int{4000}, "Database port")
rootCmd.PersistentFlags().IntVarP(&statusPort, "statusPort", "S", 10080, "Database status port")
rootCmd.PersistentFlags().IntVarP(&threads, "threads", "T", 1, "Thread concurrency")
rootCmd.PersistentFlags().IntVarP(&acThreads, "acThreads", "t", 1, "OLAP client concurrency, only for CH-benCHmark")
Expand All @@ -151,6 +207,8 @@ func main() {
5: Snapshot, 6: Serializable, 7: Linerizable`)
rootCmd.PersistentFlags().StringVar(&connParams, "conn-params", "", "session variables, e.g. for TiDB --conn-params tidb_isolation_read_engines='tiflash', For PostgreSQL: --conn-params sslmode=disable")
rootCmd.PersistentFlags().StringVar(&outputStyle, "output", util.OutputStylePlain, "output style, valid values can be { plain | table | json }")
rootCmd.PersistentFlags().StringSliceVar(&targets, "targets", nil, "Target database addresses")
rootCmd.PersistentFlags().MarkHidden("targets")

cobra.EnablePrefixMatching = true

Expand Down
2 changes: 1 addition & 1 deletion cmd/go-tpc/rawsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func execRawsql(action string) {
rawsqlConfig.QueryNames = strings.Split(queryFiles, ",")
rawsqlConfig.Queries = make(map[string]string, len(rawsqlConfig.QueryNames))
rawsqlConfig.RefreshWait = refreshConnWait
rawsqlConfig.PlanReplayerConfig.Host = host
rawsqlConfig.PlanReplayerConfig.Host = hosts[0]
rawsqlConfig.PlanReplayerConfig.StatusPort = statusPort

for i, filename := range rawsqlConfig.QueryNames {
Expand Down
2 changes: 1 addition & 1 deletion cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func executeTpch(action string) {
os.Exit(1)
}

tpchConfig.PlanReplayerConfig.Host = host
tpchConfig.PlanReplayerConfig.Host = hosts[0]
tpchConfig.PlanReplayerConfig.StatusPort = statusPort

tpchConfig.OutputStyle = outputStyle
Expand Down