Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explain support for CHbenchmark #164

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ type analyzeConfig struct {

// Config is the configuration for ch workload
type Config struct {
Driver string
DBName string
RawQueries string
QueryNames []string
TiFlashReplica int
AnalyzeTable analyzeConfig
RefreshConnWait time.Duration
Driver string
DBName string
RawQueries string
QueryNames []string
TiFlashReplica int
AnalyzeTable analyzeConfig
ExecExplainAnalyze bool
RefreshConnWait time.Duration

EnablePlanReplayer bool
PlanReplayerConfig replayer.PlanReplayerConfig
Expand Down Expand Up @@ -220,16 +221,38 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
w.dumpPlanReplayer(ctx, s, query, queryName)
}

if w.cfg.ExecExplainAnalyze {
query = strings.Replace(query, "/*PLACEHOLDER*/", "explain analyze", 1)
}
start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
w.measurement.Measure(queryName, time.Now().Sub(start), err)
defer w.measurement.Measure(queryName, time.Now().Sub(start), err)
if err != nil {
return fmt.Errorf("execute query %s failed %v", queryName, err)
}
defer rows.Close()

if w.cfg.ExecExplainAnalyze {
table, err := util.RenderExplainAnalyze(rows)
if err != nil {
return err
}
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table)
return nil
}
if err := w.drainQueryResult(queryName, rows); err != nil {
return fmt.Errorf("execute query %s failed %v", queryName, err)
}

return nil
}

func (w *Workloader) drainQueryResult(queryName string, rows *sql.Rows) error {
for rows.Next() {
}
return rows.Err()
}

// Cleanup cleans up workloader
func (w *Workloader) Cleanup(ctx context.Context, threadID int) error {
return nil
Expand Down
5 changes: 5 additions & 0 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func registerCHBenchmark(root *cobra.Command) {
"",
"Name of plan Replayer file dumps")

cmdRun.PersistentFlags().BoolVar(&chConfig.ExecExplainAnalyze,
"use-explain",
false,
"execute explain analyze")

cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().StringVar(&apConnParams, "ap-conn-params", "", "Connection parameters for analytical processing")
cmdRun.Flags().StringSliceVar(&apHosts, "ap-host", nil, "Database host for analytical processing")
Expand Down
7 changes: 4 additions & 3 deletions cmd/go-tpc/rawsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/pingcap/go-tpc/pkg/util"
"github.com/pingcap/go-tpc/rawsql"
"github.com/spf13/cobra"
)
Expand All @@ -29,7 +30,7 @@ func registerRawsql(root *cobra.Command) {
Short: "Run workload",
Run: func(cmd *cobra.Command, args []string) {
if len(queryFiles) == 0 {
fmt.Fprintln(os.Stderr, "empty query files")
util.StdErrLogger.Printf("empty query files")
os.Exit(1)
}

Expand Down Expand Up @@ -74,7 +75,7 @@ func execRawsql(action string) {

// if globalDB == nil
if globalDB == nil {
fmt.Fprintln(os.Stderr, "cannot connect to the database")
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
}

Expand All @@ -89,7 +90,7 @@ func execRawsql(action string) {
for i, filename := range rawsqlConfig.QueryNames {
queryData, err := ioutil.ReadFile(filename)
if err != nil {
fmt.Fprintf(os.Stderr, "read file: %s, err: %v\n", filename, err)
util.StdErrLogger.Printf("read file: %s, err: %v\n", filename, err)
os.Exit(1)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strings"

"github.com/pingcap/go-tpc/pkg/util"
"github.com/pingcap/go-tpc/tpch"
"github.com/spf13/cobra"
)
Expand All @@ -17,7 +18,7 @@ func executeTpch(action string) {
defer closeDB()

if globalDB == nil {
fmt.Fprintln(os.Stderr, "cannot connect to the database")
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"strings"

Expand All @@ -17,6 +18,13 @@ const (
OutputStyleJson = "json"
)

// This logger is goroutine-safe.
var StdErrLogger *log.Logger

func init() {
StdErrLogger = log.New(os.Stderr, "", 0)
}

func RenderString(format string, headers []string, values [][]string) {
if len(values) == 0 {
return
Expand Down
3 changes: 1 addition & 2 deletions rawsql/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"os"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -122,7 +121,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table)
util.StdErrLogger.Printf("explain analyze result of query %s:\n%s\n", queryName, table)
return nil
}

Expand Down
10 changes: 3 additions & 7 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,22 +219,18 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
}
start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("execute query %s failed %v", query, err)
}
defer rows.Close()
w.measurement.Measure(queryName, time.Now().Sub(start), err)

defer w.measurement.Measure(queryName, time.Now().Sub(start), err)
if err != nil {
return fmt.Errorf("execute %s failed %v", queryName, err)
}
defer rows.Close()

if w.cfg.ExecExplainAnalyze {
table, err := util.RenderExplainAnalyze(rows)
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table)
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table)
return nil
}
if err := w.scanQueryResult(queryName, rows); err != nil {
Expand Down