Skip to content

Commit

Permalink
fix(storage): add backoff to gRPC write retries (#11200)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Dec 4, 2024
1 parent 364b639 commit a7db927
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 80 deletions.
8 changes: 4 additions & 4 deletions storage/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,11 @@ func TestBucketRetryer(t *testing.T) {
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
backoff: &gax.Backoff{
backoff: gaxBackoffFromStruct(&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
},
}),
policy: RetryAlways,
maxAttempts: expectedAttempts(5),
shouldRetry: func(err error) bool { return false },
Expand All @@ -1135,9 +1135,9 @@ func TestBucketRetryer(t *testing.T) {
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
backoff: gaxBackoffFromStruct(&gax.Backoff{
Multiplier: 3,
}},
})},
},
{
name: "set policy only",
Expand Down
6 changes: 3 additions & 3 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ func TestRetryMaxAttemptsEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
Expand All @@ -1421,7 +1421,7 @@ func TestTimeoutErrorEmulated(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()
time.Sleep(5 * time.Nanosecond)
config := &retryConfig{backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

// Error may come through as a context.DeadlineExceeded (HTTP) or status.DeadlineExceeded (gRPC)
Expand All @@ -1447,7 +1447,7 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: gaxBackoffFromStruct(&gax.Backoff{Initial: 10 * time.Millisecond})}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
t.Fatalf("GetBucket: got unexpected error %v, want nil", err)
}
Expand Down
134 changes: 114 additions & 20 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"cloud.google.com/go/internal/trace"
gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -1223,7 +1224,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}
}

o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
o, off, err := gw.uploadBuffer(recvd, offset, doneReading, newUploadBufferRetryConfig(gw.settings))
if err != nil {
err = checkCanceled(err)
errorf(err)
Expand Down Expand Up @@ -2091,12 +2092,7 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
// completed.
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}

func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool, retryConfig *uploadBufferRetryConfig) (*storagepb.Object, int64, error) {
var err error
var lastWriteOfEntireObject bool

Expand Down Expand Up @@ -2143,6 +2139,7 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
ctx = setInvocationHeaders(ctx, retryConfig.invocationID, retryConfig.attempts)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
Expand Down Expand Up @@ -2188,7 +2185,11 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
err = retryConfig.retriable(w.ctx, err)

if err == nil {
retryConfig.doBackOff(w.ctx)

// TODO: Add test case for failure modes of querying progress.
writeOffset, err = w.determineOffset(start)
if err != nil {
Expand Down Expand Up @@ -2230,11 +2231,17 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
if !lastWriteOfEntireObject {
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
if err != nil {
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
err = retryConfig.retriable(w.ctx, err)
if err != nil {
return nil, 0, err
}

retryConfig.doBackOff(w.ctx)
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand All @@ -2246,9 +2253,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes

continue sendBytes
}
if err != nil {
return nil, 0, err
}

if resp.GetPersistedSize() != writeOffset {
// Retry if not all bytes were persisted.
Expand All @@ -2274,7 +2278,14 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {

if err != nil {
err = retryConfig.retriable(w.ctx, err)
if err != nil {
return nil, 0, err
}
retryConfig.doBackOff(w.ctx)

writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
Expand All @@ -2283,9 +2294,6 @@ sendBytes: // label this loop so that we can use a continue statement from a nes
w.stream = nil
continue sendBytes
}
if err != nil {
return nil, 0, err
}

obj = resp.GetResource()
}
Expand Down Expand Up @@ -2370,3 +2378,89 @@ func checkCanceled(err error) error {

return err
}

type uploadBufferRetryConfig struct {
attempts int
invocationID string
config *retryConfig
lastErr error
}

func newUploadBufferRetryConfig(settings *settings) *uploadBufferRetryConfig {
config := settings.retry

if config == nil {
config = defaultRetry.clone()
}

if config.shouldRetry == nil {
config.shouldRetry = ShouldRetry
}

if config.backoff == nil {
config.backoff = &gaxBackoff{}
} else {
config.backoff.SetMultiplier(settings.retry.backoff.GetMultiplier())
config.backoff.SetInitial(settings.retry.backoff.GetInitial())
config.backoff.SetMax(settings.retry.backoff.GetMax())
}

return &uploadBufferRetryConfig{
attempts: 1,
invocationID: uuid.New().String(),
config: config,
}
}

// retriable determines if a retry is necessary and if so returns a nil error;
// otherwise it returns the error to be surfaced to the user.
func (retry *uploadBufferRetryConfig) retriable(ctx context.Context, err error) error {
if err == nil {
// a nil err does not need to be retried
return nil
}
if err != context.Canceled && err != context.DeadlineExceeded {
retry.lastErr = err
}

if retry.config.policy == RetryNever {
return err
}

if retry.config.maxAttempts != nil && retry.attempts >= *retry.config.maxAttempts {
return fmt.Errorf("storage: retry failed after %v attempts; last error: %w", retry.attempts, err)
}

retry.attempts++

// Explicitly check context cancellation so that we can distinguish between a
// DEADLINE_EXCEEDED error from the server and a user-set context deadline.
// Unfortunately gRPC will codes.DeadlineExceeded (which may be retryable if it's
// sent by the server) in both cases.
ctxErr := ctx.Err()
if errors.Is(ctxErr, context.Canceled) || errors.Is(ctxErr, context.DeadlineExceeded) {
if retry.lastErr != nil {
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
}
return ctxErr
}

if !retry.config.shouldRetry(err) {
return err
}
return nil
}

// doBackOff pauses for the appropriate amount of time; it should be called after
// encountering a retriable error.
func (retry *uploadBufferRetryConfig) doBackOff(ctx context.Context) error {
p := retry.config.backoff.Pause()

if ctxErr := gax.Sleep(ctx, p); ctxErr != nil {
if retry.lastErr != nil {
return fmt.Errorf("retry failed with %v; last error: %w", ctxErr, retry.lastErr)
}
return ctxErr
}
return nil
}
Loading

0 comments on commit a7db927

Please sign in to comment.