Skip to content

Commit

Permalink
br: add more retry strategy: s3.ReadFile: body reader (#50541) (#50706)
Browse files Browse the repository at this point in the history
close #49942
  • Loading branch information
ti-chi-bot authored Jan 24, 2024
1 parent 03a599b commit c12144a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 16 deletions.
53 changes: 38 additions & 15 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,22 +515,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er

// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, errors.Trace(err)
var (
data []byte
readErr error
)
for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
data, readErr = io.ReadAll(result.Body)
// close the body of response since data has been already read out
result.Body.Close()
// for unit test
failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
log.Info("original error", zap.Error(readErr))
readErr = errors.Errorf("read: connection reset by peer")
})
if readErr != nil {
if isDeadlineExceedError(readErr) || isCancelError(readErr) {
return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
*input.Bucket, *input.Key, retryCnt)
}
continue
}
return data, nil
}
return data, nil
// retry too much, should be failed
return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
rs.options.Bucket, rs.options.Prefix+file)
}

// DeleteFile delete the file in s3 storage
Expand Down Expand Up @@ -989,6 +1008,10 @@ type retryerWithLog struct {
client.DefaultRetryer
}

func isCancelError(err error) bool {
return strings.Contains(err.Error(), "context canceled")
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -1343,3 +1344,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}

func TestS3ReadFileRetryable(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()
errMsg := "just some unrelated error"
expectedErr := errors.New(errMsg)

s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
Return(nil, expectedErr)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
}()
_, err := s.storage.ReadFile(ctx, "file")
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errMsg))
}
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 33,
shard_count = 34,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down

0 comments on commit c12144a

Please sign in to comment.