Skip to content

Commit

Permalink
Retry on network failure
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Nov 3, 2020
1 parent f064837 commit 4e63ba9
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
return retryConfig, nil
}

func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
return resp != nil && resp.StatusCode >= 300, nil
func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) {
return !(resp != nil && resp.StatusCode < 300), err
}
84 changes: 84 additions & 0 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package kncloudevents

import (
"context"
"fmt"
"math"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"sync/atomic"
Expand Down Expand Up @@ -180,6 +184,86 @@ func TestHTTPMessageSenderSendWithRetries(t *testing.T) {
}
}

func TestRetriesOnNetworkErrors(t *testing.T) {

port := rand.Int31n(math.MaxUint16-1024) + 1024
n := int32(10)
linear := duckv1.BackoffPolicyLinear
target := fmt.Sprintf("127.0.0.1:%d", port)

calls := make(chan struct{})
defer close(calls)

nCalls := int32(0)

cont := make(chan struct{})
defer close(cont)

go func() {
for range calls {

nCalls++

// Simulate that the target service is back up.
//
// First n/2-1 calls we get connection refused since there is no server running.
// Now we start a server that responds with a retryable error, so we expect that
// the client continues to retry for a different reason.
//
// The last time we return 200, so we don't expect a new retry.
if n/2 == nCalls {

l, err := net.Listen("tcp", target)
assert.Nil(t, err)

s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if n-1 != nCalls {
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
}))
defer s.Close()

assert.Nil(t, s.Listener.Close())

s.Listener = l

s.Start()
}
cont <- struct{}{}
}
}()

r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{
Retry: pointer.Int32Ptr(n),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT0.1S"),
})
assert.Nil(t, err)

checkRetry := r.CheckRetry

r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
calls <- struct{}{}
<-cont

return checkRetry(ctx, resp, err)
}

req, err := http.NewRequest("POST", "http://"+target, nil)
assert.Nil(t, err)

sender, err := NewHTTPMessageSender(nil, "")
assert.Nil(t, err)

_, err = sender.SendWithRetries(req, &r)
assert.Nil(t, err)

// nCalls keeps track of how many times a call to check retry occurs.
// Since the number of request are n + 1 and the last one is successful the expected number of calls are n.
assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls)
}

func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 4e63ba9

Please sign in to comment.