Skip to content

Commit

Permalink
Refactor csv generator (#36)
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored May 6, 2020
1 parent 47c2889 commit c68473d
Show file tree
Hide file tree
Showing 16 changed files with 690 additions and 355 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ For example:
# Check consistency
./bin/go-tpc tpcc --warehouses 4 check
# Generate csv files
./bin/go-tpc tpcc --warehouses 4 prepare --output data
./bin/go-tpc tpcc --warehouses 4 prepare --output-type csv --output-dir data
# Specified tables when generating csv files
./bin/go-tpc tpcc --warehouses 4 prepare --output data --tables history,orders
./bin/go-tpc tpcc --warehouses 4 prepare --output-type csv --output-dir data --tables history,orders
# Start pprof
./bin/go-tpc tpcc --warehouses 4 prepare --output data --pprof :10111
./bin/go-tpc tpcc --warehouses 4 prepare --output-type csv --output-dir data --pprof :10111
```

### TPC-H
Expand Down
16 changes: 9 additions & 7 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
)

func checkPrepare(ctx context.Context, w workload.Workloader) {
// skip preparation check in csv case
if w.Name() == "tpcc-csv" {
fmt.Println("Skip preparing checking. Please load CSV data into database and check later.")
return
}

var wg sync.WaitGroup
wg.Add(threads)
for i := 0; i < threads; i++ {
Expand Down Expand Up @@ -38,7 +44,7 @@ func execute(ctx context.Context, w workload.Workloader, action string, index in
switch action {
case "prepare":
// Do cleanup only if dropData is set and not generate csv data.
if dropData && !w.DataGen() {
if dropData {
if err := w.Cleanup(ctx, index); err != nil {
return err
}
Expand Down Expand Up @@ -106,12 +112,8 @@ func executeWorkload(ctx context.Context, w workload.Workloader, action string)
wg.Wait()

if action == "prepare" {
if !w.DataGen() {
// For prepare, we must check the data consistency after all prepare finished
checkPrepare(ctx, w)
} else {
fmt.Println("Skip preparing checking. Please load CSV data into database and check later.")
}
// For prepare, we must check the data consistency after all prepare finished
checkPrepare(ctx, w)
}
outputCancel()

Expand Down
21 changes: 19 additions & 2 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"runtime"

"github.com/pingcap/go-tpc/pkg/workload"
"github.com/pingcap/go-tpc/tpcc"
"github.com/spf13/cobra"
)
Expand All @@ -28,7 +29,21 @@ func executeTpcc(action string) {
tpccConfig.DBName = dbName
tpccConfig.Threads = threads
tpccConfig.Isolation = isolationLevel
w, err := tpcc.NewWorkloader(globalDB, &tpccConfig)
var (
w workload.Workloader
err error
)
switch tpccConfig.OutputType {
case "csv", "CSV":
if tpccConfig.OutputDir == "" {
fmt.Printf("Output Directory cannot be empty when generating files")
os.Exit(1)
}
w, err = tpcc.NewCSVWorkloader(globalDB, &tpccConfig)
default:
w, err = tpcc.NewWorkloader(globalDB, &tpccConfig)
}

if err != nil {
fmt.Printf("Failed to init work loader: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -56,7 +71,9 @@ func registerTpcc(root *cobra.Command) {
executeTpcc("prepare")
},
}
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputType, "output-type", "", "Output file type."+
" If empty, then load data to db. Current only support csv")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")

Expand Down
1 change: 0 additions & 1 deletion pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@ type Workloader interface {
Run(ctx context.Context, threadID int) error
Cleanup(ctx context.Context, threadID int) error
Check(ctx context.Context, threadID int) error
DataGen() bool
DBName() string
}
24 changes: 12 additions & 12 deletions tpcc/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (w *Workloader) check(ctx context.Context, threadID int, checkAll bool) err
}

func (w *Workloader) checkCondition1(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

// Entries in the WAREHOUSE and DISTRICT tables must satisfy the relationship:
// W_YTD = sum(D_YTD)
Expand Down Expand Up @@ -94,7 +94,7 @@ func (w *Workloader) checkCondition1(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition2(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

// Entries in the DISTRICT, ORDER, and NEW-ORDER tables must satisfy the relationship:
// D_NEXT_O_ID - 1 = max(O_ID) = max(NO_O_ID)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (w *Workloader) checkCondition2(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition3(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -159,7 +159,7 @@ func (w *Workloader) checkCondition3(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition4(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -189,7 +189,7 @@ func (w *Workloader) checkCondition4(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition5(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -219,7 +219,7 @@ func (w *Workloader) checkCondition5(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition6(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

// For any row in the ORDER table, O_OL_CNT must equal the number of rows in the ORDER-LINE table for the
// corresponding order defined by (O_W_ID, O_D_ID, O_ID) = (OL_W_ID, OL_D_ID, OL_O_ID).
Expand Down Expand Up @@ -257,7 +257,7 @@ WHERE T.o_ol_cnt != T.order_line_count`
}

func (w *Workloader) checkCondition7(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -287,7 +287,7 @@ func (w *Workloader) checkCondition7(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition8(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -317,7 +317,7 @@ func (w *Workloader) checkCondition8(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition9(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -347,7 +347,7 @@ func (w *Workloader) checkCondition9(ctx context.Context, warehouse int) error {
}

func (w *Workloader) checkCondition10(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down Expand Up @@ -392,7 +392,7 @@ func (w *Workloader) checkCondition10(ctx context.Context, warehouse int) error
}

func (w *Workloader) checkCondition11(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

// Entries in the CUSTOMER, ORDER and NEW-ORDER tables must satisfy the relationship:
// (count(*) from ORDER) - (count(*) from NEW-ORDER) = 2100
Expand Down Expand Up @@ -433,7 +433,7 @@ WHERE c_w_id = ? AND order_count - 2100 != new_order_count`
}

func (w *Workloader) checkCondition12(ctx context.Context, warehouse int) error {
s := w.getState(ctx)
s := getTPCCState(ctx)

var diff float64

Expand Down
Loading

0 comments on commit c68473d

Please sign in to comment.