Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Read entire body before closing connection #8660

Merged
merged 3 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/url"
"os"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -51,6 +52,23 @@ func HelloWorldHandler(status int) http.HandlerFunc {
)
}

// SizedResponseHandler responds with 200 to any request with a body
// exactly the size of the `bytes` argument, where each byte is the
// character 'x'
func SizedResponseHandler(bytes int) http.HandlerFunc {
var body strings.Builder
for i := 0; i < bytes; i++ {
body.WriteString("x")
}

return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
io.WriteString(w, body.String())
},
)
}

// ServerPort takes an httptest.Server and returns its port as a uint16.
func ServerPort(server *httptest.Server) (uint16, error) {
u, err := url.Parse(server.URL)
Expand Down
35 changes: 35 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,41 @@ func TestDownStatuses(t *testing.T) {
}
}

func TestLargeResponse(t *testing.T) {
server := httptest.NewServer(hbtest.SizedResponseHandler(1024 * 1024))
defer server.Close()

configSrc := map[string]interface{}{
"urls": server.URL,
"timeout": "1s",
"check.response.body": "x",
}

config, err := common.NewConfigFrom(configSrc)
require.NoError(t, err)

jobs, err := create("largeresp", config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, 200),
)),
event.Fields,
)
}

func TestHTTPSServer(t *testing.T) {
server := httptest.NewTLSServer(hbtest.HelloWorldHandler(http.StatusOK))
port, err := hbtest.ServerPort(server)
Expand Down
33 changes: 32 additions & 1 deletion heartbeat/monitors/active/http/simple_transp.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if err != nil {
return nil, err
}
defer conn.Close()

requestedGzip := false
if t.DisableCompression &&
Expand Down Expand Up @@ -128,6 +127,21 @@ func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
case ret = <-readerDone:
break
case <-done:
// We need to free resources from the main reader
// We start by closing the conn, which will most likely cause an error
// in the read goroutine (unless we are right on the boundary between timeout and success)
// and will free up both the connection and cause that go routine to terminate.
conn.Close()
// Now we block waiting for that goroutine to finish. We do this synchronously
// because with a closed connection it should return immediately.
// We can ignore the ret.err value because the error is most likely due to us
// prematurely closing the conn
ret := <-readerDone
// If the body has been allocated we need to close it
if ret.resp != nil {
ret.resp.Body.Close()
}
// finally, return the real error. No need to return a response here
return nil, errors.New("http: request timed out while waiting for response")
}
close(readerDone)
Expand All @@ -147,6 +161,22 @@ func (t *SimpleTransport) writeRequest(conn net.Conn, req *http.Request) error {
return err
}

// comboConnReadCloser wraps a ReadCloser that is backed by
// on a net.Conn. It will close the net.Conn when the ReadCloser is closed.
type comboConnReadCloser struct {
conn net.Conn
rc io.ReadCloser
}

func (c comboConnReadCloser) Read(p []byte) (n int, err error) {
return c.rc.Read(p)
}

func (c comboConnReadCloser) Close() error {
defer c.conn.Close()
return c.rc.Close()
}

func (t *SimpleTransport) readResponse(
conn net.Conn,
req *http.Request,
Expand All @@ -157,6 +187,7 @@ func (t *SimpleTransport) readResponse(
if err != nil {
return nil, err
}
resp.Body = comboConnReadCloser{conn, resp.Body}

t.sigStartRead()

Expand Down