Skip to content

Commit

Permalink
add retry mechanism (pingcap#182)
Browse files Browse the repository at this point in the history
* add retry mechanism

* add retry for aws http

* add timeout for http client

* don't block when one error occured

* fix

* update go.mod from br

* address comment

* address comment again

* fix bug again

* use error group again

Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
lichunzhu and kennytm authored Nov 6, 2020
1 parent 084506e commit 461eeda
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 60 deletions.
15 changes: 9 additions & 6 deletions dumpling/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/docker/go-units v0.4.0
github.com/go-sql-driver/mysql v1.5.0
github.com/pingcap/br v0.0.0-20200925095602-bf9cc603382e
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/br v0.0.0-20201027124415-c2ed897feada
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20200910095337-6b893f12be43
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible
github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200825070655-6b09f3acbb1f
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 // indirect
golang.org/x/tools v0.0.0-20200823205832-c024452afbcd // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)
175 changes: 138 additions & 37 deletions dumpling/go.sum

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion dumpling/v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"text/template"
Expand Down Expand Up @@ -133,7 +134,19 @@ func (config *Config) createExternalStorage(ctx context.Context) (storage.Extern
if err != nil {
return nil, err
}
return storage.Create(ctx, b, false)
httpClient := http.DefaultClient
httpClient.Timeout = 30 * time.Second
maxIdleConnsPerHost := http.DefaultMaxIdleConnsPerHost
if config.Threads > maxIdleConnsPerHost {
maxIdleConnsPerHost = config.Threads
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = maxIdleConnsPerHost
httpClient.Transport = transport

return storage.New(ctx, b, &storage.ExternalStorageOptions{
HTTPClient: httpClient,
})
}

const (
Expand Down
30 changes: 17 additions & 13 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/failpoint"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -233,9 +234,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return nil
}

func dumpDatabases(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
allTables := conf.Tables
var g errgroup.Group
g, ctx := errgroup.WithContext(pCtx)
for dbName, tables := range allTables {
conn := connectPool.getConn()
createDatabaseSQL, err := ShowCreateDatabase(conn, dbName)
Expand All @@ -261,21 +262,24 @@ func dumpDatabases(ctx context.Context, conf *Config, connectPool *connectionsPo
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
err := tableIR.Start(ctx, conn)
if err != nil {
return err
}
return writer.WriteTableData(ctx, tableIR)
retryTime := 1
return utils.WithRetry(ctx, func() error {
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()))
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
retryTime += 1
err := tableIR.Start(ctx, conn)
if err != nil {
return err
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer())
})
}
}
}
if err := g.Wait(); err != nil {
return err
}
return nil
return g.Wait()
}

func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
Expand Down
34 changes: 34 additions & 0 deletions dumpling/v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
)

Expand Down Expand Up @@ -195,3 +196,36 @@ func (s *testDumpSuite) TestDumpTableWhereClause(c *C) {
c.Assert(tbDataRes.Rows().HasNext(), IsFalse)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) {
mockConfig := DefaultConfig()
mockConfig.SortByPk = false
mockConfig.Databases = []string{"test"}
mockConfig.Tables = NewDatabaseTables().AppendTables("test", "t")
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

showCreateDatabase := "CREATE DATABASE `test`"
rows := mock.NewRows([]string{"Database", "Create Database"}).AddRow("test", showCreateDatabase)
mock.ExpectQuery("SHOW CREATE DATABASE `test`").WillReturnRows(rows)
showCreateTableResult := "CREATE TABLE t (a INT)"
rows = mock.NewRows([]string{"Table", "Create Table"}).AddRow("t", showCreateTableResult)
mock.ExpectQuery("SHOW CREATE TABLE `test`.`t`").WillReturnRows(rows)
rows = mock.NewRows([]string{"column_name", "extra"}).AddRow("id", "").AddRow("name", "")
mock.ExpectQuery("SELECT COLUMN_NAME").WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()).WillReturnRows(rows)
rows = mock.NewRows([]string{"a"}).AddRow(1)
mock.ExpectQuery("SELECT (.) FROM `test`.`t` LIMIT 1").WillReturnRows(rows)
rows = mock.NewRows([]string{"a"}).AddRow(1).AddRow(2)
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnError(&mysql.MySQLError{Number: 9001, Message: "pd is timeout"})
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnRows(rows)

mockWriter := newMockWriter()
connectPool := newMockConnectPool(c, db)
err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter)
c.Assert(err, IsNil)

c.Assert(len(mockWriter.databaseMeta), Equals, 1)
c.Assert(mockWriter.databaseMeta["test"], Equals, "CREATE DATABASE `test`")
c.Assert(mockWriter.tableMeta["test.t"], Equals, showCreateTableResult)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}
5 changes: 3 additions & 2 deletions dumpling/v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error {
if err != nil {
return err
}
td.SQLRowIter = nil
td.rows = rows
return nil
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func splitTableDataIntoChunks(
}
// every chunk would have eventual adjustments
estimatedChunks := count / conf.Rows
estimatedStep := new(big.Int).Sub(max,min).Uint64() /estimatedChunks + 1
estimatedStep := new(big.Int).Sub(max, min).Uint64()/estimatedChunks + 1
bigEstimatedStep := new(big.Int).SetUint64(estimatedStep)
cutoff := new(big.Int).Set(min)
nextCutoff := new(big.Int)
Expand All @@ -246,7 +247,7 @@ func splitTableDataIntoChunks(
LOOP:
for max.Cmp(cutoff) >= 0 {
chunkIndex += 1
where := fmt.Sprintf("%s(`%s` >= %d AND `%s` < %d)", nullValueCondition, escapeString(field), cutoff, escapeString(field), nextCutoff.Add(cutoff,bigEstimatedStep))
where := fmt.Sprintf("%s(`%s` >= %d AND `%s` < %d)", nullValueCondition, escapeString(field), cutoff, escapeString(field), nextCutoff.Add(cutoff, bigEstimatedStep))
query = buildSelectQuery(dbName, tableName, selectedField, buildWhereCondition(conf, where), orderByClause)
if len(nullValueCondition) > 0 {
nullValueCondition = ""
Expand Down
47 changes: 47 additions & 0 deletions dumpling/v4/export/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package export

import (
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/dbutil"
)

const (
dumpChunkRetryTime = 3
dumpChunkWaitInterval = 50 * time.Millisecond
dumpChunkMaxWaitInterval = 200 * time.Millisecond
)

func newDumpChunkBackoffer() *dumpChunkBackoffer {
return &dumpChunkBackoffer{
attempt: dumpChunkRetryTime,
delayTime: dumpChunkWaitInterval,
maxDelayTime: dumpChunkMaxWaitInterval,
}
}

type dumpChunkBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
}

func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration {
err = errors.Cause(err)
if _, ok := err.(*mysql.MySQLError); ok && !dbutil.IsRetryableError(err) {
b.attempt = 0
return 0
}
b.delayTime = 2 * b.delayTime
b.attempt--
if b.delayTime > b.maxDelayTime {
return b.maxDelayTime
}
return b.delayTime
}

func (b *dumpChunkBackoffer) Attempt() int {
return b.attempt
}
2 changes: 1 addition & 1 deletion dumpling/v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func buildInterceptFileWriter(s storage.ExternalStorage, path string) (storage.W
if writer == nil {
return
}
log.Debug("tear down lazy file writer...")
log.Debug("tear down lazy file writer...", zap.String("path", fullPath))
err := writer.Close(ctx)
if err != nil {
log.Error("close file failed", zap.String("path", fullPath))
Expand Down

0 comments on commit 461eeda

Please sign in to comment.