From 27f2d00521f9a4bc51723caf9ea8cc0f73d5dac0 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 6 Jan 2022 16:20:45 -0600 Subject: [PATCH] [Heartbeat] Separate http req per task (#29697) This is an attempt to fix the race reported in #29580 by instantiating a separate http request per HTTP task. The theory being that the HTTP library modifies the headers and that the req object is not safe to share. This has passed manual testing using mode: all against endpoints with multiple A records. Tests are not included here due to the tricky nature of testing here, but we will do so in a follow-up --- CHANGELOG.next.asciidoc | 1 + heartbeat/monitors/active/http/task.go | 47 +++++++++++++-------- heartbeat/monitors/active/http/task_test.go | 2 +- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 97ff013a69f..6bd26ec4867 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -102,6 +102,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Fix broken monitors with newer versions of image relying on dup3. {pull}28938[pull] +- Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull] *Metricbeat* diff --git a/heartbeat/monitors/active/http/task.go b/heartbeat/monitors/active/http/task.go index f273b0a4cc7..9ee5f3fa1e8 100644 --- a/heartbeat/monitors/active/http/task.go +++ b/heartbeat/monitors/active/http/task.go @@ -45,6 +45,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) +type requestFactory func() (*http.Request, error) + func newHTTPMonitorHostJob( addr string, config *Config, @@ -54,10 +56,7 @@ func newHTTPMonitorHostJob( validator multiValidator, ) (jobs.Job, error) { - request, err := buildRequest(addr, config, enc) - if err != nil { - return nil, err - } + var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) } return jobs.MakeSimpleJob(func(event *beat.Event) error { var redirects []string @@ -67,7 +66,13 @@ func newHTTPMonitorHostJob( Transport: transport, Timeout: config.Transport.Timeout, } - _, _, err := execPing(event, client, request, body, config.Transport.Timeout, validator, config.Response) + + req, err := reqFactory() + if err != nil { + return fmt.Errorf("could not make http request: %w", err) + } + + _, _, err = execPing(event, client, req, body, config.Transport.Timeout, validator, config.Response) if len(redirects) > 0 { event.PutValue("http.response.redirects", redirects) } @@ -84,17 +89,14 @@ func newHTTPMonitorIPsJob( validator multiValidator, ) (jobs.Job, error) { - req, err := buildRequest(addr, config, enc) - if err != nil { - return nil, err - } + var reqFactory requestFactory = func() (*http.Request, error) { return buildRequest(addr, config, enc) } - hostname, port, err := splitHostnamePort(req) + hostname, port, err := splitHostnamePort(addr) if err != nil { return nil, err } - pingFactory := createPingFactory(config, port, tls, req, body, validator) + pingFactory := createPingFactory(config, port, tls, reqFactory, body, validator) job, err := monitors.MakeByHostJob(hostname, config.Mode, monitors.NewStdResolver(), pingFactory) return job, err @@ -104,14 +106,19 @@ func createPingFactory( config *Config, port uint16, tls *tlscommon.TLSConfig, - request *http.Request, + reqFactory requestFactory, body []byte, validator multiValidator, ) func(*net.IPAddr) jobs.Job { timeout := config.Transport.Timeout - isTLS := request.URL.Scheme == "https" return monitors.MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error { + req, err := reqFactory() + if err != nil { + return fmt.Errorf("could not create http request: %w", err) + } + isTLS := req.URL.Scheme == "https" + addr := net.JoinHostPort(ip.String(), strconv.Itoa(int(port))) d := &dialchain.DialerChain{ Net: dialchain.MakeConstAddrDialer(addr, dialchain.TCPDialer(timeout)), @@ -163,7 +170,7 @@ func createPingFactory( Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}), } - _, end, err := execPing(event, client, request, body, timeout, validator, config.Response) + _, end, err := execPing(event, client, req, body, timeout, validator, config.Response) cbMutex.Lock() defer cbMutex.Unlock() @@ -313,11 +320,15 @@ func execRequest(client *http.Client, req *http.Request) (start time.Time, resp return start, resp, nil } -func splitHostnamePort(requ *http.Request) (string, uint16, error) { - host := requ.URL.Host +func splitHostnamePort(addr string) (string, uint16, error) { + u, err := url.Parse(addr) + if err != nil { + return "", 0, err + } + host := u.Host // Try to add a default port if needed if strings.LastIndex(host, ":") == -1 { - switch requ.URL.Scheme { + switch u.Scheme { case urlSchemaHTTP: host += ":80" case urlSchemaHTTPS: @@ -330,7 +341,7 @@ func splitHostnamePort(requ *http.Request) (string, uint16, error) { } p, err := strconv.ParseUint(port, 10, 16) if err != nil { - return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, requ.URL.Host) + return "", 0, fmt.Errorf("'%v' is no valid port number in '%v'", port, u.Host) } return host, uint16(p), nil } diff --git a/heartbeat/monitors/active/http/task_test.go b/heartbeat/monitors/active/http/task_test.go index 358a4a6ec2b..71b9720cbed 100644 --- a/heartbeat/monitors/active/http/task_test.go +++ b/heartbeat/monitors/active/http/task_test.go @@ -106,7 +106,7 @@ func TestSplitHostnamePort(t *testing.T) { request := &http.Request{ URL: url, } - host, port, err := splitHostnamePort(request) + host, port, err := splitHostnamePort(request.URL.String()) if err != nil { if test.expectedError == nil {