From 816bc481fada7998037e7ee2a8cbb8633b0d2f5f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 27 Sep 2023 22:02:47 +0530 Subject: [PATCH] Rewind the request body for retries (#194) Fixes #193 `prepareRequest` in HTTP Sender was returning a `http.Request` https://github.com/open-telemetry/opamp-go/blob/deb33889ff1aac0d542a9fd08f946461cccfc1c7/client/internal/httpsender.go#L201 We have the backoff retry strategy for failed requests; however, underlying transport would close the request body and should be rewound for any subsequent attempt. This change creates a thin wrapper around the `http.Request` that helps to rewind the body before making a request. In the test we hijack and close the underlying connection to simulate the connection error case. --- client/internal/httpsender.go | 35 +++++++++++--- client/internal/httpsender_test.go | 74 ++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index ccb6caf8..21f475e0 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -26,6 +26,23 @@ const defaultPollingIntervalMs = 30 * 1000 // default interval is 30 seconds. const headerContentEncoding = "Content-Encoding" const encodingTypeGZip = "gzip" +type requestWrapper struct { + *http.Request + + bodyReader func() io.ReadCloser +} + +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return io.NopCloser(bytes.NewReader(buf)) + } +} + +func (r *requestWrapper) rewind(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.Request.WithContext(ctx) +} + // HTTPSender allows scheduling messages to send. Once run, it will loop through // a request/response cycle for each message to send and will process all received // responses using a receivedProcessor. If there are no pending messages to send @@ -156,7 +173,8 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response select { case <-timer.C: { - resp, err := h.client.Do(req) + req.rewind(ctx) + resp, err := h.client.Do(req.Request) if err == nil { switch resp.StatusCode { case http.StatusOK: @@ -198,7 +216,7 @@ func recalculateInterval(interval time.Duration, resp *http.Response) time.Durat return interval } -func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error) { +func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error) { msgToSend := h.nextMessage.PopPending() if msgToSend == nil || proto.Equal(msgToSend, &protobufs.AgentToServer{}) { // There is no pending message or the message is empty. @@ -211,7 +229,11 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error) return nil, err } - var body io.Reader + r, err := http.NewRequestWithContext(ctx, OpAMPPlainHTTPMethod, h.url, nil) + if err != nil { + return nil, err + } + req := requestWrapper{Request: r} if h.compressionEnabled { var buf bytes.Buffer @@ -224,17 +246,16 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*http.Request, error) h.logger.Errorf("Failed to close the writer: %v", err) return nil, err } - body = &buf + req.bodyReader = bodyReader(buf.Bytes()) } else { - body = bytes.NewReader(data) + req.bodyReader = bodyReader(data) } - req, err := http.NewRequestWithContext(ctx, OpAMPPlainHTTPMethod, h.url, body) if err != nil { return nil, err } req.Header = h.requestHeader - return req, nil + return &req, nil } func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) { diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index 58a52685..c8e2855e 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -3,13 +3,18 @@ package internal import ( "context" "crypto/tls" + "io" + "net" "net/http" + "net/http/httptest" + "sync" "sync/atomic" "testing" "time" "github.com/open-telemetry/opamp-go/client/types" sharedinternal "github.com/open-telemetry/opamp-go/internal" + "github.com/open-telemetry/opamp-go/internal/testhelpers" "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/assert" ) @@ -98,3 +103,72 @@ EKTcWGekdmdDPsHloRNtsiCa697B2O9IFA== return cert, nil } + +func TestHTTPSenderRetryForFailedRequests(t *testing.T) { + + srv, m := newMockServer(t) + address := testhelpers.GetAvailableLocalAddress() + var connectionAttempts int64 + + var buf []byte + srv.OnRequest = func(w http.ResponseWriter, r *http.Request) { + attempt := atomic.AddInt64(&connectionAttempts, 1) + if attempt == 1 { + hj, ok := w.(http.Hijacker) + if !ok { + t.Error("server doesn't support hijacking") + return + } + conn, _, err := hj.Hijack() + if err != nil { + t.Error(err) + return + } + conn.Close() + } else { + buf, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + url := "http://" + address + sender := NewHTTPSender(&sharedinternal.NopLogger{}) + sender.NextMessage().Update(func(msg *protobufs.AgentToServer) { + msg.AgentDescription = &protobufs.AgentDescription{ + IdentifyingAttributes: []*protobufs.KeyValue{{ + Key: "service.name", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "test-service"}, + }, + }}, + } + }) + sender.callbacks = types.CallbacksStruct{ + OnConnectFunc: func() { + }, + OnConnectFailedFunc: func(_ error) { + }, + } + sender.url = url + var wg sync.WaitGroup + wg.Add(2) + go func() { + sender.sendRequestWithRetries(ctx) + wg.Done() + }() + go func() { + l, err := net.Listen("tcp", address) + assert.NoError(t, err) + ts := httptest.NewUnstartedServer(m) + ts.Listener.Close() + ts.Listener = l + ts.Start() + srv.srv = ts + wg.Done() + }() + wg.Wait() + assert.True(t, len(buf) > 0) + assert.Contains(t, string(buf), "test-service") + cancel() + srv.Close() +}