Skip to content

Commit

Permalink
lightning: add timeout for "write" RPC (#48355) (#48397)
Browse files Browse the repository at this point in the history
close #46321, close #48352
  • Loading branch information
ti-chi-bot authored Dec 8, 2023
1 parent 6c98485 commit dee5eb0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
16 changes: 15 additions & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"container/heap"
"context"
goerrors "errors"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (j *regionJob) convertStageTo(stage jobStageTp) {
// we don't need to do cleanup for the pairs written to tikv if encounters an error,
// tikv will take the responsibility to do so.
// TODO: let client-go provide a high-level write interface.
func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
func (local *Backend) writeToTiKV(pCtx context.Context, j *regionJob) (errRet error) {
if j.stage != regionScanned {
return nil
}
Expand All @@ -175,6 +176,19 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
failpoint.Return(err)
})

ctx, cancel := context.WithTimeout(pCtx, 15*time.Minute)
defer cancel()
defer func() {
deadline, ok := ctx.Deadline()
if !ok {
// should not happen
return
}
if goerrors.Is(errRet, context.DeadlineExceeded) && time.Now().After(deadline) {
errRet = common.ErrWriteTooSlow
}
}()

apiVersion := local.tikvCodec.GetAPIVersion()
clientFactory := local.importClientFactory
kvBatchSize := local.KVWriteBatchSize
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,19 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{
drivererr.ErrUnknown.ID(): {},
}

// ErrWriteTooSlow is used to get rid of the gRPC blocking issue.
// there are some strange blocking issues of gRPC like
// https://github.com/pingcap/tidb/issues/48352
// https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭
var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever")

func isSingleRetryableError(err error) bool {
err = errors.Cause(err)

switch err {
case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows:
return false
case mysql.ErrInvalidConn, driver.ErrBadConn:
case mysql.ErrInvalidConn, driver.ErrBadConn, ErrWriteTooSlow:
return true
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(context.Canceled))
require.False(t, IsRetryableError(context.DeadlineExceeded))
require.True(t, IsRetryableError(ErrWriteTooSlow))
require.False(t, IsRetryableError(io.EOF))
require.False(t, IsRetryableError(&net.AddrError{}))
require.False(t, IsRetryableError(&net.DNSError{}))
Expand Down

0 comments on commit dee5eb0

Please sign in to comment.