diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index 5cbe26a985e..e3fe6ecec8b 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -189,6 +189,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } -func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { - return resp != nil && resp.StatusCode >= 300, nil +func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) { + return !(resp != nil && resp.StatusCode < 300), err } diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index 1cbffcc3d05..202bbde7764 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -18,6 +18,10 @@ package kncloudevents import ( "context" + "fmt" + "math" + "math/rand" + "net" "net/http" "net/http/httptest" "sync/atomic" @@ -180,6 +184,86 @@ func TestHTTPMessageSenderSendWithRetries(t *testing.T) { } } +func TestRetriesOnNetworkErrors(t *testing.T) { + + port := rand.Int31n(math.MaxUint16-1024) + 1024 + n := int32(10) + linear := duckv1.BackoffPolicyLinear + target := fmt.Sprintf("127.0.0.1:%d", port) + + calls := make(chan struct{}) + defer close(calls) + + nCalls := int32(0) + + cont := make(chan struct{}) + defer close(cont) + + go func() { + for range calls { + + nCalls++ + + // Simulate that the target service is back up. + // + // First n/2-1 calls we get connection refused since there is no server running. + // Now we start a server that responds with a retryable error, so we expect that + // the client continues to retry for a different reason. + // + // The last time we return 200, so we don't expect a new retry. + if n/2 == nCalls { + + l, err := net.Listen("tcp", target) + assert.Nil(t, err) + + s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + if n-1 != nCalls { + writer.WriteHeader(http.StatusServiceUnavailable) + return + } + })) + defer s.Close() + + assert.Nil(t, s.Listener.Close()) + + s.Listener = l + + s.Start() + } + cont <- struct{}{} + } + }() + + r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{ + Retry: pointer.Int32Ptr(n), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT0.1S"), + }) + assert.Nil(t, err) + + checkRetry := r.CheckRetry + + r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { + calls <- struct{}{} + <-cont + + return checkRetry(ctx, resp, err) + } + + req, err := http.NewRequest("POST", "http://"+target, nil) + assert.Nil(t, err) + + sender, err := NewHTTPMessageSender(nil, "") + assert.Nil(t, err) + + _, err = sender.SendWithRetries(req, &r) + assert.Nil(t, err) + + // nCalls keeps track of how many times a call to check retry occurs. + // Since the number of request are n + 1 and the last one is successful the expected number of calls are n. + assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls) +} + func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) { t.Parallel()