From 08841ab5f9fec06077635d548d21961265515698 Mon Sep 17 00:00:00 2001 From: Manu Garg Date: Thu, 21 Nov 2019 12:03:45 -0800 Subject: [PATCH] [HTTP Probe] Add support for running multiple HTTP requests in parallel. Github issue: https://github.com/google/cloudprober/issues/319 PiperOrigin-RevId: 281802925 --- probes/http/http.go | 52 ++++++------- probes/http/http_test.go | 138 ++++++++++++++++++++++++--------- probes/http/proto/config.pb.go | 7 +- probes/http/proto/config.proto | 8 +- probes/http/request.go | 24 +++--- 5 files changed, 147 insertions(+), 82 deletions(-) diff --git a/probes/http/http.go b/probes/http/http.go index 3d0c696d..d0e8116e 100644 --- a/probes/http/http.go +++ b/probes/http/http.go @@ -52,7 +52,7 @@ type Probe struct { // book-keeping params targets []endpoint.Endpoint httpRequests map[string]*http.Request - results map[string]*result + results map[string]*probeResult protocol string method string url string @@ -72,7 +72,7 @@ type Probe struct { statsExportFrequency int64 } -type result struct { +type probeResult struct { total, success, timeouts int64 latency metrics.Value respCodes *metrics.Map @@ -101,10 +101,6 @@ func (p *Probe) Init(name string, opts *options.Options) error { return fmt.Errorf("Invalid Relative URL: %s, must begin with '/'", p.url) } - if p.c.GetRequestsPerProbe() != 1 { - p.l.Warningf("requests_per_probe field is now deprecated and will be removed in future releases.") - } - // Create a transport for our use. This is mostly based on // http.DefaultTransport with some timeouts changed. // TODO(manugarg): Considering cloning DefaultTransport once @@ -112,7 +108,6 @@ func (p *Probe) Init(name string, opts *options.Options) error { dialer := &net.Dialer{ Timeout: p.opts.Timeout, KeepAlive: 30 * time.Second, // TCP keep-alive - DualStack: true, } if p.opts.SourceIP != nil { @@ -182,13 +177,18 @@ func isClientTimeout(err error) bool { } // httpRequest executes an HTTP request and updates the provided result struct. -func (p *Probe) doHTTPRequest(req *http.Request, result *result) { +func (p *Probe) doHTTPRequest(req *http.Request, result *probeResult, resultMu *sync.Mutex) { start := time.Now() - result.total++ resp, err := p.client.Do(req) latency := time.Since(start) + // Note that we take lock on result object outside of the actual request. + resultMu.Lock() + defer resultMu.Unlock() + + result.total++ + if err != nil { if isClientTimeout(err) { p.l.Warning("Target:", req.Host, ", URL:", req.URL.String(), ", http.doHTTPRequest: timeout error: ", err.Error()) @@ -237,7 +237,7 @@ func (p *Probe) updateTargets() { } if p.results == nil { - p.results = make(map[string]*result, len(p.targets)) + p.results = make(map[string]*probeResult, len(p.targets)) } for _, target := range p.targets { @@ -259,7 +259,7 @@ func (p *Probe) updateTargets() { } else { latencyValue = metrics.NewFloat(0) } - p.results[target.Name] = &result{ + p.results[target.Name] = &probeResult{ latency: latencyValue, respCodes: metrics.NewMap("code", metrics.NewInt(0)), respBodies: metrics.NewMap("resp", metrics.NewInt(0)), @@ -275,28 +275,26 @@ func (p *Probe) runProbe(ctx context.Context) { wg := sync.WaitGroup{} for _, target := range p.targets { - req := p.httpRequests[target.Name] + req, result := p.httpRequests[target.Name], p.results[target.Name] if req == nil { continue } - wg.Add(1) + // We launch a separate goroutine for each HTTP request. Since there can be + // multiple requests per probe per target, we use a mutex to protect access + // to per-target result object in doHTTPRequest. Note that result object is + // not accessed concurrently anywhere else -- export of the metrics happens + // when probe is not running. + var resultMu sync.Mutex - // Launch a separate goroutine for each target. - go func(target string, req *http.Request) { - defer wg.Done() - numRequests := int32(0) - for { - p.doHTTPRequest(req.WithContext(reqCtx), p.results[target]) + for numReq := int32(0); numReq < p.c.GetRequestsPerProbe(); numReq++ { + wg.Add(1) - numRequests++ - if numRequests >= p.c.GetRequestsPerProbe() { - break - } - // Sleep for requests_interval_msec before continuing. - time.Sleep(time.Duration(p.c.GetRequestsIntervalMsec()) * time.Millisecond) - } - }(target.Name, req) + go func(req *http.Request, result *probeResult) { + defer wg.Done() + p.doHTTPRequest(req.WithContext(reqCtx), result, &resultMu) + }(req, result) + } } // Wait until all probes are done. diff --git a/probes/http/http_test.go b/probes/http/http_test.go index 74a1575f..31d46f62 100644 --- a/probes/http/http_test.go +++ b/probes/http/http_test.go @@ -17,9 +17,12 @@ package http import ( "bytes" "context" + "errors" "fmt" + "io" "io/ioutil" "net/http" + "strings" "testing" "time" @@ -32,7 +35,9 @@ import ( // The Transport is mocked instead of the Client because Client is not an // interface, but RoundTripper (which Transport implements) is. -type testTransport struct{} +type testTransport struct { + noBody io.ReadCloser +} func newTestTransport() *testTransport { return &testTransport{} @@ -46,11 +51,20 @@ type testReadCloser struct { func (trc *testReadCloser) Read(p []byte) (n int, err error) { return trc.b.Read(p) } + func (trc *testReadCloser) Close() error { return nil } func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if req.URL.Host == "fail-test.com" { + return nil, errors.New("failing for fail-target.com") + } + + if req.Body == nil { + return &http.Response{Body: http.NoBody}, nil + } + b, err := ioutil.ReadAll(req.Body) if err != nil { return nil, err @@ -66,7 +80,7 @@ func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) { func (tt *testTransport) CancelRequest(req *http.Request) {} -func testProbe(opts *options.Options) ([]*result, error) { +func testProbe(opts *options.Options) ([]*probeResult, error) { p := &Probe{} err := p.Init("http_test", opts) if err != nil { @@ -76,23 +90,16 @@ func testProbe(opts *options.Options) ([]*result, error) { p.runProbe(context.Background()) - var results []*result + var results []*probeResult for _, target := range p.targets { results = append(results, p.results[target.Name]) } return results, nil } -func TestRun(t *testing.T) { - methods := []configpb.ProbeConf_Method{ - configpb.ProbeConf_GET, - configpb.ProbeConf_POST, - configpb.ProbeConf_PUT, - configpb.ProbeConf_HEAD, - configpb.ProbeConf_DELETE, - configpb.ProbeConf_PATCH, - configpb.ProbeConf_OPTIONS, - 100, // Should default to configpb.ProbeConf_GET +func TestProbeVariousMethods(t *testing.T) { + mpb := func(s string) *configpb.ProbeConf_Method { + return configpb.ProbeConf_Method(configpb.ProbeConf_Method_value[s]).Enum() } testBody := "Test HTTP Body" @@ -105,39 +112,43 @@ func TestRun(t *testing.T) { {&configpb.ProbeConf{}, "total: 1, success: 1"}, {&configpb.ProbeConf{Protocol: configpb.ProbeConf_HTTPS.Enum()}, "total: 1, success: 1"}, {&configpb.ProbeConf{RequestsPerProbe: proto.Int32(1)}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[0]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[1]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[1], Body: &testBody}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[2]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[2], Body: &testBody}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[3]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[4]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[5]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[6]}, "total: 1, success: 1"}, - {&configpb.ProbeConf{Method: &methods[7]}, "total: 1, success: 1"}, + {&configpb.ProbeConf{RequestsPerProbe: proto.Int32(4)}, "total: 4, success: 4"}, + {&configpb.ProbeConf{Method: mpb("GET")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("POST")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("POST"), Body: &testBody}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("PUT")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("PUT"), Body: &testBody}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("HEAD")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("DELETE")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("PATCH")}, "total: 1, success: 1"}, + {&configpb.ProbeConf{Method: mpb("OPTIONS")}, "total: 1, success: 1"}, {&configpb.ProbeConf{Headers: []*configpb.ProbeConf_Header{{Name: &testHeaderName, Value: &testHeaderValue}}}, "total: 1, success: 1"}, } - for _, test := range tests { - opts := &options.Options{ - Targets: targets.StaticTargets("test.com"), - Interval: 2 * time.Second, - Timeout: time.Second, - ProbeConf: test.input, - } - results, err := testProbe(opts) - if err != nil { - if fmt.Sprintf("error: '%s'", err.Error()) != test.want { - t.Errorf("Unexpected initialization error: %v", err) + for i, test := range tests { + t.Run(fmt.Sprintf("Test_case(%d)_config(%v)", i, test.input), func(t *testing.T) { + opts := &options.Options{ + Targets: targets.StaticTargets("test.com"), + Interval: 2 * time.Second, + Timeout: time.Second, + ProbeConf: test.input, + } + + results, err := testProbe(opts) + if err != nil { + if fmt.Sprintf("error: '%s'", err.Error()) != test.want { + t.Errorf("Unexpected initialization error: %v", err) + } + return } - } else { + for _, result := range results { got := fmt.Sprintf("total: %d, success: %d", result.total, result.success) if got != test.want { t.Errorf("Mismatch got '%s', want '%s'", got, test.want) } } - } + }) } } @@ -181,3 +192,58 @@ func TestProbeWithBody(t *testing.T) { t.Errorf("response map: got=%s, expected=%s", got, expected) } } + +func TestMultipleTargetsMultipleRequests(t *testing.T) { + testTargets := []string{"test.com", "fail-test.com"} + reqPerProbe := int64(6) + opts := &options.Options{ + Targets: targets.StaticTargets(strings.Join(testTargets, ",")), + Interval: 10 * time.Millisecond, + ProbeConf: &configpb.ProbeConf{RequestsPerProbe: proto.Int32(int32(reqPerProbe))}, + } + + p := &Probe{} + err := p.Init("http_test", opts) + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + p.client.Transport = newTestTransport() + + // Verify that Init() created result struct for each target. + for _, tgt := range testTargets { + if _, ok := p.results[tgt]; !ok { + t.Errorf("didn't find results for the target: %s", tgt) + } + } + + p.runProbe(context.Background()) + + wantSuccess := map[string]int64{ + "test.com": reqPerProbe, + "fail-test.com": 0, // Test transport is configured to fail this. + } + + for _, tgt := range testTargets { + if p.results[tgt].total != reqPerProbe { + t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe) + } + if p.results[tgt].success != wantSuccess[tgt] { + t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt]) + } + } + + // Run again + p.runProbe(context.Background()) + + wantSuccess["test.com"] += reqPerProbe + + for _, tgt := range testTargets { + if p.results[tgt].total != 2*reqPerProbe { + t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe) + } + if p.results[tgt].success != wantSuccess[tgt] { + t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt]) + } + } +} diff --git a/probes/http/proto/config.pb.go b/probes/http/proto/config.pb.go index 74bbaf0d..e9a7eb40 100644 --- a/probes/http/proto/config.pb.go +++ b/probes/http/proto/config.pb.go @@ -150,9 +150,10 @@ type ProbeConf struct { // Disable TLS certificate validation. If set to true, any certificate // presented by the server for any host name will be accepted. DisableCertValidation *bool `protobuf:"varint,14,opt,name=disable_cert_validation,json=disableCertValidation" json:"disable_cert_validation,omitempty"` - // Requests per probe (Deprecated). - // NOTE: This field is now deprecated and will be removed after the v0.10.3 - // releases. + // Requests per probe. + // Number of HTTP requests per probe. Requests are executed concurrently and + // each HTTP re contributes to probe results. For example, if you run two + // requests per probe, "total" counter will be incremented by 2. RequestsPerProbe *int32 `protobuf:"varint,98,opt,name=requests_per_probe,json=requestsPerProbe,def=1" json:"requests_per_probe,omitempty"` // How long to wait between two requests to the same target // NOTE: This field is now deprecated and will be removed after the v0.10.3 diff --git a/probes/http/proto/config.proto b/probes/http/proto/config.proto index 06cfa3a7..6ccb6bcb 100644 --- a/probes/http/proto/config.proto +++ b/probes/http/proto/config.proto @@ -66,10 +66,12 @@ message ProbeConf { // presented by the server for any host name will be accepted. optional bool disable_cert_validation = 14; - // Requests per probe (Deprecated). - // NOTE: This field is now deprecated and will be removed after the v0.10.3 - // releases. + // Requests per probe. + // Number of HTTP requests per probe. Requests are executed concurrently and + // each HTTP re contributes to probe results. For example, if you run two + // requests per probe, "total" counter will be incremented by 2. optional int32 requests_per_probe = 98 [default = 1]; + // How long to wait between two requests to the same target // NOTE: This field is now deprecated and will be removed after the v0.10.3 // releases. diff --git a/probes/http/request.go b/probes/http/request.go index 94ecfaa4..c59fb9b7 100644 --- a/probes/http/request.go +++ b/probes/http/request.go @@ -15,7 +15,6 @@ package http import ( - "bytes" "fmt" "io" "net/http" @@ -23,21 +22,19 @@ import ( "github.com/google/cloudprober/targets/endpoint" ) -// requestBody encapsulates the request body and implements the io.ReadCloser() +// requestBody encapsulates the request body and implements the io.Reader() // interface. type requestBody struct { - b *bytes.Reader + b []byte } +// Read implements the io.Reader interface. Instead of using buffered read, +// it simply copies the bytes to the provided slice in one go (depending on +// the input slice capacity) and returns io.EOF. Buffered reads require +// resetting the buffer before re-use, restricting our ability to use the +// request object concurrently. func (rb *requestBody) Read(p []byte) (int, error) { - return rb.b.Read(p) -} - -// Close resets the internal buffer's next read location, to make it ready for -// the next HTTP request. -func (rb *requestBody) Close() error { - rb.b.Seek(0, io.SeekStart) - return nil + return copy(p, rb.b), io.EOF } func (p *Probe) httpRequestForTarget(target endpoint.Endpoint) *http.Request { @@ -62,8 +59,9 @@ func (p *Probe) httpRequestForTarget(target endpoint.Endpoint) *http.Request { url := fmt.Sprintf("%s://%s%s", p.protocol, host, p.url) // Prepare request body - body := &requestBody{ - b: bytes.NewReader([]byte(p.c.GetBody())), + var body io.Reader + if p.c.GetBody() != "" { + body = &requestBody{[]byte(p.c.GetBody())} } req, err := http.NewRequest(p.method, url, body) if err != nil {