Skip to content

Commit

Permalink
[release-0.18] Retry on network failures (knative#4454) (knative#4457)
Browse files Browse the repository at this point in the history
* Retry on network failures (knative#4454)

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* nethttp -> http

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored and matzew committed Nov 7, 2020
1 parent babd275 commit 234715d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
return retryConfig, nil
}

func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
return resp != nil && resp.StatusCode >= 300, nil
func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) {
return !(resp != nil && resp.StatusCode < 300), err
}
123 changes: 101 additions & 22 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package kncloudevents

import (
"context"
nethttp "net/http"
"net"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -146,60 +147,60 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) {
name: "5 max retry",
config: &RetryConfig{
RetryMax: 5,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 6,
wantErr: false,
},
{
name: "1 max retry",
config: &RetryConfig{
RetryMax: 1,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 2,
wantErr: false,
},
{
name: "with no retryConfig",
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 1,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var n int32
server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) {
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
atomic.AddInt32(&n, 1)
writer.WriteHeader(tt.wantStatus)
}))

sender := &HttpMessageSender{
Client: nethttp.DefaultClient,
Client: http.DefaultClient,
}

request, err := nethttp.NewRequest("POST", server.URL, nil)
request, err := http.NewRequest("POST", server.URL, nil)
assert.Nil(t, err)
got, err := sender.SendWithRetries(request, tt.config)
if (err != nil) != tt.wantErr || got == nil {
t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr)
return
}
if got.StatusCode != nethttp.StatusServiceUnavailable {
t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable)
if got.StatusCode != http.StatusServiceUnavailable {
t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusServiceUnavailable)
return
}
if count := int(atomic.LoadInt32(&n)); count != tt.wantDispatch {
Expand All @@ -209,35 +210,113 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) {
}
}

func TestRetriesOnNetworkErrors(t *testing.T) {

n := int32(10)
linear := duckv1.BackoffPolicyLinear
target := "127.0.0.1:63468"

calls := make(chan struct{})
defer close(calls)

nCalls := int32(0)

cont := make(chan struct{})
defer close(cont)

go func() {
for range calls {

nCalls++
// Simulate that the target service is back up.
//
// First n/2-1 calls we get connection refused since there is no server running.
// Now we start a server that responds with a retryable error, so we expect that
// the client continues to retry for a different reason.
//
// The last time we return 200, so we don't expect a new retry.
if n/2 == nCalls {

l, err := net.Listen("tcp", target)
assert.Nil(t, err)

s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if n-1 != nCalls {
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
}))
defer s.Close() //nolint // defers in this range loop won't run unless the channel gets closed

assert.Nil(t, s.Listener.Close())

s.Listener = l

s.Start()
}
cont <- struct{}{}
}
}()

r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{
Retry: pointer.Int32Ptr(n),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT0.1S"),
})
assert.Nil(t, err)

checkRetry := r.CheckRetry

r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
calls <- struct{}{}
<-cont

return checkRetry(ctx, resp, err)
}

req, err := http.NewRequest("POST", "http://"+target, nil)
assert.Nil(t, err)

sender, err := NewHttpMessageSender(nil, "")
assert.Nil(t, err)

_, err = sender.SendWithRetries(req, &r)
assert.Nil(t, err)

// nCalls keeps track of how many times a call to check retry occurs.
// Since the number of request are n + 1 and the last one is successful the expected number of calls are n.
assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls)
}

func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
t.Parallel()

const wantToSkip = 9
config := &RetryConfig{
RetryMax: wantToSkip,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond * 50 * time.Duration(attemptNum)
},
}

var n uint32
server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) {
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
thisReqN := atomic.AddUint32(&n, 1)
if thisReqN <= wantToSkip {
writer.WriteHeader(nethttp.StatusServiceUnavailable)
writer.WriteHeader(http.StatusServiceUnavailable)
} else {
writer.WriteHeader(nethttp.StatusAccepted)
writer.WriteHeader(http.StatusAccepted)
}
}))

sender := &HttpMessageSender{
Client: nethttp.DefaultClient,
Client: http.DefaultClient,
}

request, err := nethttp.NewRequest("POST", server.URL, nil)
request, err := http.NewRequest("POST", server.URL, nil)
assert.Nil(t, err)

// Create a message similar to the one we send with channels
Expand All @@ -252,8 +331,8 @@ func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
if err != nil {
t.Fatalf("SendWithRetries() error = %v, wantErr nil", err)
}
if got.StatusCode != nethttp.StatusAccepted {
t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusAccepted)
if got.StatusCode != http.StatusAccepted {
t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusAccepted)
}
if count := atomic.LoadUint32(&n); count != wantToSkip+1 {
t.Fatalf("expected %d count got %d", wantToSkip+1, count)
Expand Down

0 comments on commit 234715d

Please sign in to comment.