Skip to content

Commit

Permalink
tpcc: ignore the non-first Duplicated entry error (#77)
Browse files Browse the repository at this point in the history
Signed-off-by: mahjonp <junpeng.man@gmail.com>
  • Loading branch information
mahjonp authored Apr 1, 2021
1 parent 9a6e7c9 commit 78ad36a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
4 changes: 2 additions & 2 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func registerTpcc(root *cobra.Command) {
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareReCommitCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareReCommitDuration, "retry-duration", 10*time.Second, "The duration for each retry")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareRetryCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareRetryInterval, "retry-interval", 10*time.Second, "The interval for each retry")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
22 changes: 12 additions & 10 deletions pkg/load/batch_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ type SQLBatchLoader struct {

// loader retry
retryCount int
retryDuration time.Duration
retryInterval time.Duration
}

// NewSQLBatchLoader creates a batch loader for database connection
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryDuration time.Duration) *SQLBatchLoader {
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryInterval time.Duration) *SQLBatchLoader {
return &SQLBatchLoader{
count: 0,
insertHint: hint,
db: db,
retryCount: retryCount,
retryDuration: retryDuration,
retryInterval: retryInterval,
}
}

Expand Down Expand Up @@ -71,22 +71,24 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error {
var err error
for i := 0; i < 1+b.retryCount; i++ {
_, err = b.db.ExecContext(ctx, b.buf.String())
if err == nil || (strings.Contains(err.Error(), "Error 1062: Duplicate entry") && i == 0) {
if err == nil {
break
}
if strings.Contains(err.Error(), "Error 1062: Duplicate entry") {
if i == 0 {
return fmt.Errorf("exec statement error: %v", err)
}
break
}
if i < b.retryCount {
fmt.Printf("exec statement error: %v, may try again later...\n", err)
time.Sleep(b.retryDuration)
time.Sleep(b.retryInterval)
}
}
if err != nil {
return fmt.Errorf("exec statement error: %v", err)
}

b.count = 0
b.buf.Reset()

return err
return nil
}

// CSVBatchLoader helps us insert in batch
Expand Down
18 changes: 9 additions & 9 deletions tpcc/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (w *Workloader) loadItem(ctx context.Context) error {
s := getTPCCState(ctx)
hint := "INSERT INTO item (i_id, i_im_id, i_name, i_price, i_data) VALUES "

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < maxItems; i++ {
s.Buf.Reset()
Expand All @@ -50,7 +50,7 @@ func (w *Workloader) loadWarehouse(ctx context.Context, warehouse int) error {
s := getTPCCState(ctx)
hint := "INSERT INTO warehouse (w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd) VALUES "

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

wName := randChars(s.R, s.Buf, 6, 10)
wStree1 := randChars(s.R, s.Buf, 10, 20)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (w *Workloader) loadStock(ctx context.Context, warehouse int) error {
s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06,
s_dist_07, s_dist_08, s_dist_09, s_dist_10, s_ytd, s_order_cnt, s_remote_cnt, s_data) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < stockPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (w *Workloader) loadDistrict(ctx context.Context, warehouse int) error {
hint := `INSERT INTO district (d_id, d_w_id, d_name, d_street_1, d_street_2,
d_city, d_state, d_zip, d_tax, d_ytd, d_next_o_id) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < districtPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (w *Workloader) loadCustomer(ctx context.Context, warehouse int, district i
c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim,
c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_delivery_cnt, c_data) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < customerPerDistrict; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -212,7 +212,7 @@ func (w *Workloader) loadHistory(ctx context.Context, warehouse int, district in
s := getTPCCState(ctx)

hint := `INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES `
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

// 1 customer has 1 row
for i := 0; i < customerPerDistrict; i++ {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (w *Workloader) loadOrder(ctx context.Context, warehouse int, district int)
hint := `INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d,
o_carrier_id, o_ol_cnt, o_all_local) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

cids := rand.Perm(orderPerDistrict)
s.R.Shuffle(len(cids), func(i, j int) {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *Workloader) loadNewOrder(ctx context.Context, warehouse int, district i

hint := `INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < newOrderPerDistrict; i++ {
s.Buf.Reset()
Expand All @@ -312,7 +312,7 @@ func (w *Workloader) loadOrderLine(ctx context.Context, warehouse int, district
hint := `INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number,
ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < orderPerDistrict; i++ {
for j := 0; j < olCnts[i]; j++ {
Expand Down
4 changes: 2 additions & 2 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type Config struct {
SpecifiedTables string

// connection, retry count when commiting statement fails, default 0
PrepareReCommitCount int
PrepareReCommitDuration time.Duration
PrepareRetryCount int
PrepareRetryInterval time.Duration
}

// Workloader is TPCC workload
Expand Down

0 comments on commit 78ad36a

Please sign in to comment.