Skip to content

Commit

Permalink
fix (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Feb 28, 2023
1 parent 9b45464 commit 4ed5a5f
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 53 deletions.
4 changes: 4 additions & 0 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,7 @@ func (w *Workloader) PreparePlanReplayerDump() error {
func (w *Workloader) FinishPlanReplayerDump() error {
return w.PlanReplayerRunner.Finish()
}

func (w *Workloader) Exec(sql string) error {
return nil
}
62 changes: 38 additions & 24 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func checkPrepare(ctx context.Context, w workload.Workloader) {
wg.Wait()
}

func execute(ctx context.Context, w workload.Workloader, action string, threads, index int) error {
func execute(timeoutCtx context.Context, w workload.Workloader, action string, threads, index int) error {
count := totalCount / threads

ctx = w.InitThread(ctx, index)
ctx := w.InitThread(context.Background(), index)
defer w.CleanupThread(ctx, index)

switch action {
Expand All @@ -58,30 +58,8 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads,
return w.Check(ctx, index)
}

enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled()
if enabledDumpPlanReplayer {
err := w.PreparePlanReplayerDump()
if err != nil {
return err
}
defer func() {
err := w.FinishPlanReplayerDump()
if err != nil {
fmt.Printf("[%s] dump plan replayer failed, err%v\n",
time.Now().Format("2006-01-02 15:04:05"), err)
}
}()
}

for i := 0; i < count || count <= 0; i++ {
err := w.Run(ctx, index)

select {
case <-ctx.Done():
return nil
default:
}

if err != nil {
if !silence {
fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), action, err)
Expand All @@ -90,6 +68,11 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads,
return err
}
}
select {
case <-timeoutCtx.Done():
return nil
default:
}
}

return nil
Expand All @@ -115,6 +98,37 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac
}
}
}()
if w.Name() == "tpch" && action == "run" {
err := w.Exec(`create or replace view revenue0 (supplier_no, total_revenue) as
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= '1997-07-01'
and l_shipdate < date_add('1997-07-01', interval '3' month)
group by
l_suppkey;`)
if err != nil {
panic(fmt.Sprintf("a fatal occurred when preparing view data: %v", err))
}
}
enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled()
if enabledDumpPlanReplayer {
err := w.PreparePlanReplayerDump()
if err != nil {
fmt.Printf("[%s] prepare plan replayer failed, err%v\n",
time.Now().Format("2006-01-02 15:04:05"), err)
}
defer func() {
err = w.FinishPlanReplayerDump()
if err != nil {
fmt.Printf("[%s] dump plan replayer failed, err%v\n",
time.Now().Format("2006-01-02 15:04:05"), err)
}
}()
}

for i := 0; i < threads; i++ {
go func(index int) {
Expand Down
28 changes: 20 additions & 8 deletions pkg/plan-replayer/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,22 @@ type PlanReplayerConfig struct {
}

type PlanReplayerRunner struct {
Config PlanReplayerConfig
zf *os.File
zw struct {
sync.Mutex
sync.Mutex
prepared bool
finished bool
Config PlanReplayerConfig
zf *os.File
zw struct {
writer *zip.Writer
}
}

func (r *PlanReplayerRunner) Prepare() error {
r.Lock()
defer r.Unlock()
if r.prepared {
return nil
}
if r.Config.PlanReplayerDir == "" {
dir, err := os.Getwd()
if err != nil {
Expand All @@ -53,20 +60,27 @@ func (r *PlanReplayerRunner) Prepare() error {
r.zf = zf
// Create zip writer
r.zw.writer = zip.NewWriter(zf)
r.prepared = true
return nil
}

func (r *PlanReplayerRunner) Finish() error {
r.zw.Lock()
r.Lock()
defer r.Unlock()
if r.finished {
return nil
}
err := r.zw.writer.Close()
if err != nil {
return err
}
r.zw.Unlock()
r.finished = true
return r.zf.Close()
}

func (r *PlanReplayerRunner) Dump(ctx context.Context, conn *sql.Conn, query, queryName string) error {
r.Lock()
defer r.Unlock()
rows, err := conn.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("execute query %s failed %v", query, err)
Expand Down Expand Up @@ -111,8 +125,6 @@ func (r *PlanReplayerRunner) writeDataIntoZW(b []byte, queryName string) error {
return err
}
key := base64.URLEncoding.EncodeToString(k)
r.zw.Lock()
defer r.zw.Unlock()
wr, err := r.zw.writer.Create(fmt.Sprintf("%v_%v_%v.zip",
queryName, time.Now().Format("2006-01-02-15:04:05"), key))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Workloader interface {
IsPlanReplayerDumpEnabled() bool
PreparePlanReplayerDump() error
FinishPlanReplayerDump() error
Exec(sql string) error
}
4 changes: 4 additions & 0 deletions rawsql/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,7 @@ func (w *Workloader) PreparePlanReplayerDump() error {
func (w *Workloader) FinishPlanReplayerDump() error {
return w.PlanReplayerRunner.Finish()
}

func (w *Workloader) Exec(sql string) error {
return nil
}
4 changes: 4 additions & 0 deletions tpcc/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,7 @@ func (c *CSVWorkLoader) PreparePlanReplayerDump() error {
func (c *CSVWorkLoader) FinishPlanReplayerDump() error {
return nil
}

func (c *CSVWorkLoader) Exec(sql string) error {
return nil
}
4 changes: 4 additions & 0 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,7 @@ func (w *Workloader) PreparePlanReplayerDump() error {
func (w *Workloader) FinishPlanReplayerDump() error {
return nil
}

func (w *Workloader) Exec(sql string) error {
return nil
}
29 changes: 8 additions & 21 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,27 +209,6 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {

queryName := w.cfg.QueryNames[s.queryIdx%len(w.cfg.QueryNames)]
query := query(w.cfg.Driver, queryName)

if queryName == "q15" {
_, err := w.db.Exec(`create view revenue0 (supplier_no, total_revenue) as
select
l_suppkey,
sum(l_extendedprice * (1 - l_discount))
from
lineitem
where
l_shipdate >= '1997-07-01'
and l_shipdate < date_add('1997-07-01', interval '3' month)
group by
l_suppkey;`)
if err != nil {
return err
}
defer func() {
w.db.Exec("drop view revenue0;")
}()
}

// only for driver == mysql and EnablePlanReplayer == true
if w.cfg.EnablePlanReplayer && w.cfg.Driver == "mysql" {
w.dumpPlanReplayer(ctx, s, query, queryName)
Expand Down Expand Up @@ -337,3 +316,11 @@ func (w *Workloader) PreparePlanReplayerDump() error {
func (w *Workloader) FinishPlanReplayerDump() error {
return w.PlanReplayerRunner.Finish()
}

func (w *Workloader) Exec(sql string) error {
_, err := w.db.Exec(sql)
if err != nil {
return err
}
return nil
}

0 comments on commit 4ed5a5f

Please sign in to comment.