Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tpcc: ignore the non-first Duplicated entry error #77

Merged
merged 1 commit into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func registerTpcc(root *cobra.Command) {
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareReCommitCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareReCommitDuration, "retry-duration", 10*time.Second, "The duration for each retry")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about deprecating this flag first?

cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareRetryCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareRetryInterval, "retry-interval", 10*time.Second, "The interval for each retry")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
22 changes: 12 additions & 10 deletions pkg/load/batch_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ type SQLBatchLoader struct {

// loader retry
retryCount int
retryDuration time.Duration
retryInterval time.Duration
}

// NewSQLBatchLoader creates a batch loader for database connection
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryDuration time.Duration) *SQLBatchLoader {
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryInterval time.Duration) *SQLBatchLoader {
return &SQLBatchLoader{
count: 0,
insertHint: hint,
db: db,
retryCount: retryCount,
retryDuration: retryDuration,
retryInterval: retryInterval,
}
}

Expand Down Expand Up @@ -71,22 +71,24 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error {
var err error
for i := 0; i < 1+b.retryCount; i++ {
_, err = b.db.ExecContext(ctx, b.buf.String())
if err == nil || (strings.Contains(err.Error(), "Error 1062: Duplicate entry") && i == 0) {
if err == nil {
break
}
if strings.Contains(err.Error(), "Error 1062: Duplicate entry") {
if i == 0 {
return fmt.Errorf("exec statement error: %v", err)
}
break
}
if i < b.retryCount {
fmt.Printf("exec statement error: %v, may try again later...\n", err)
time.Sleep(b.retryDuration)
time.Sleep(b.retryInterval)
}
}
if err != nil {
return fmt.Errorf("exec statement error: %v", err)
}

b.count = 0
b.buf.Reset()

return err
return nil
}

// CSVBatchLoader helps us insert in batch
Expand Down
18 changes: 9 additions & 9 deletions tpcc/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (w *Workloader) loadItem(ctx context.Context) error {
s := getTPCCState(ctx)
hint := "INSERT INTO item (i_id, i_im_id, i_name, i_price, i_data) VALUES "

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < maxItems; i++ {
s.Buf.Reset()
Expand All @@ -50,7 +50,7 @@ func (w *Workloader) loadWarehouse(ctx context.Context, warehouse int) error {
s := getTPCCState(ctx)
hint := "INSERT INTO warehouse (w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd) VALUES "

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

wName := randChars(s.R, s.Buf, 6, 10)
wStree1 := randChars(s.R, s.Buf, 10, 20)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (w *Workloader) loadStock(ctx context.Context, warehouse int) error {
s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06,
s_dist_07, s_dist_08, s_dist_09, s_dist_10, s_ytd, s_order_cnt, s_remote_cnt, s_data) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < stockPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (w *Workloader) loadDistrict(ctx context.Context, warehouse int) error {
hint := `INSERT INTO district (d_id, d_w_id, d_name, d_street_1, d_street_2,
d_city, d_state, d_zip, d_tax, d_ytd, d_next_o_id) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < districtPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (w *Workloader) loadCustomer(ctx context.Context, warehouse int, district i
c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim,
c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_delivery_cnt, c_data) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < customerPerDistrict; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -212,7 +212,7 @@ func (w *Workloader) loadHistory(ctx context.Context, warehouse int, district in
s := getTPCCState(ctx)

hint := `INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES `
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

// 1 customer has 1 row
for i := 0; i < customerPerDistrict; i++ {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (w *Workloader) loadOrder(ctx context.Context, warehouse int, district int)
hint := `INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d,
o_carrier_id, o_ol_cnt, o_all_local) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

cids := rand.Perm(orderPerDistrict)
s.R.Shuffle(len(cids), func(i, j int) {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *Workloader) loadNewOrder(ctx context.Context, warehouse int, district i

hint := `INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < newOrderPerDistrict; i++ {
s.Buf.Reset()
Expand All @@ -312,7 +312,7 @@ func (w *Workloader) loadOrderLine(ctx context.Context, warehouse int, district
hint := `INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number,
ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES `

l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareRetryCount, w.cfg.PrepareRetryInterval)

for i := 0; i < orderPerDistrict; i++ {
for j := 0; j < olCnts[i]; j++ {
Expand Down
4 changes: 2 additions & 2 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type Config struct {
SpecifiedTables string

// connection, retry count when commiting statement fails, default 0
PrepareReCommitCount int
PrepareReCommitDuration time.Duration
PrepareRetryCount int
PrepareRetryInterval time.Duration
}

// Workloader is TPCC workload
Expand Down