Skip to content

Commit

Permalink
feat: implement IsRetryableErr for S3ObjectClient (#14174)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds support for `IsRetryableErr`in the `S3ObjectClient`, similar to what is found in the `GCSObjectClient`
  • Loading branch information
paul1r authored Sep 19, 2024
1 parent 2840d48 commit fc90a63
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 2 deletions.
63 changes: 61 additions & 2 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
awscommon "github.com/grafana/dskit/aws"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

amnet "k8s.io/apimachinery/pkg/util/net"

bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
Expand Down Expand Up @@ -532,5 +535,61 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}

// TODO(dannyk): implement for client
func (a *S3ObjectClient) IsRetryableErr(error) bool { return false }
func isTimeoutError(err error) bool {
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}

func isContextErr(err error) bool {
return errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled)
}

// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
// TODO(dannyk): move these out to be generic
// context errors are all client-side
if isContextErr(err) {
return false
}

// connection misconfiguration, or writing on a closed connection
// do NOT retry; this is not a server-side issue
if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) {
return false
}

// this is a server-side timeout
if isTimeoutError(err) {
return true
}

// connection closed (closed before established) or reset (closed after established)
// this is a server-side issue
if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) {
return true
}

if rerr, ok := err.(awserr.RequestFailure); ok {
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusRequestTimeout ||
rerr.StatusCode() == http.StatusGatewayTimeout
}

return false
}

// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
if rerr, ok := err.(awserr.RequestFailure); ok {

// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusTooManyRequests ||
(rerr.StatusCode()/100 == 5) // all 5xx errors are retryable
}

return false
}
func (a *S3ObjectClient) IsRetryableErr(err error) bool {
return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err)
}
104 changes: 104 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -73,6 +75,108 @@ func TestIsObjectNotFoundErr(t *testing.T) {
}
}

func TestIsRetryableErr(t *testing.T) {
tests := []struct {
err error
expected bool
name string
}{
{
name: "IsStorageThrottledErr - Too Many Requests",
err: awserr.NewRequestFailure(
awserr.New("TooManyRequests", "TooManyRequests", nil), 429, "reqId",
),
expected: true,
},
{
name: "IsStorageThrottledErr - 500",
err: awserr.NewRequestFailure(
awserr.New("500", "500", nil), 500, "reqId",
),
expected: true,
},
{
name: "IsStorageThrottledErr - 5xx",
err: awserr.NewRequestFailure(
awserr.New("501", "501", nil), 501, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Request Timeout",
err: awserr.NewRequestFailure(
awserr.New("Request Timeout", "Request Timeout", nil), 408, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Gateway Timeout",
err: awserr.NewRequestFailure(
awserr.New("Gateway Timeout", "Gateway Timeout", nil), 504, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - EOF",
err: io.EOF,
expected: true,
},
{
name: "IsStorageTimeoutErr - Connection Reset",
err: syscall.ECONNRESET,
expected: true,
},
{
name: "IsStorageTimeoutErr - Timeout Error",
err: awserr.NewRequestFailure(
awserr.New("RequestCanceled", "request canceled due to timeout", nil), 408, "request-id",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Closed",
err: net.ErrClosed,
expected: false,
},
{
name: "IsStorageTimeoutErr - Connection Refused",
err: syscall.ECONNREFUSED,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Deadline Exceeded",
err: context.DeadlineExceeded,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Canceled",
err: context.Canceled,
expected: false,
},
{
name: "Not a retryable error",
err: syscall.EINVAL,
expected: false,
},
{
name: "Not found 404",
err: awserr.NewRequestFailure(
awserr.New("404", "404", nil), 404, "reqId",
),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewS3ObjectClient(S3Config{BucketNames: "mybucket"}, hedging.Config{})
require.NoError(t, err)

require.Equal(t, tt.expected, client.IsRetryableErr(tt.err))
})
}
}

func TestRequestMiddleware(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.Header.Get("echo-me"))
Expand Down

0 comments on commit fc90a63

Please sign in to comment.