diff --git a/output/cloud/expv2/flush.go b/output/cloud/expv2/flush.go index 933ec23d6d81..2ddc40440704 100644 --- a/output/cloud/expv2/flush.go +++ b/output/cloud/expv2/flush.go @@ -1,14 +1,12 @@ package expv2 import ( - "context" - "go.k6.io/k6/metrics" "go.k6.io/k6/output/cloud/expv2/pbcloud" ) type pusher interface { - push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error + push(referenceID string, samples *pbcloud.MetricSet) error } type metricsFlusher struct { @@ -19,10 +17,10 @@ type metricsFlusher struct { maxSeriesInSingleBatch int } -// Flush flushes the queued buckets sending them to the remote Cloud service. -// If the number of time series collected is bigger than maximum batch size than -// it splits in chunks. -func (f *metricsFlusher) Flush(ctx context.Context) error { +// flush flushes the queued buckets sending them to the remote Cloud service. +// If the number of time series collected is bigger than maximum batch size +// then it splits in chunks. +func (f *metricsFlusher) flush() error { // drain the buffer buckets := f.bq.PopAll() if len(buckets) < 1 { @@ -46,7 +44,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error { } // we hit the chunk size, let's flush - err := f.client.push(ctx, f.referenceID, msb.MetricSet) + err := f.client.push(f.referenceID, msb.MetricSet) if err != nil { return err } @@ -58,7 +56,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error { } // send the last (or the unique) MetricSet chunk to the remote service - return f.client.push(ctx, f.referenceID, msb.MetricSet) + return f.client.push(f.referenceID, msb.MetricSet) } type metricSetBuilder struct { diff --git a/output/cloud/expv2/flush_test.go b/output/cloud/expv2/flush_test.go index e98c24e2e6a1..bf53db7e8021 100644 --- a/output/cloud/expv2/flush_test.go +++ b/output/cloud/expv2/flush_test.go @@ -1,7 +1,6 @@ package expv2 import ( - "context" "strconv" "testing" @@ -82,7 +81,7 @@ func TestMetricsFlusherFlushChunk(t *testing.T) { } require.Len(t, bq.buckets, tc.series) - err := mf.Flush(context.TODO()) + err := mf.flush() require.NoError(t, err) assert.Equal(t, tc.expFlushCalls, pm.pushCalled) } @@ -92,7 +91,7 @@ type pusherMock struct { pushCalled int } -func (pm *pusherMock) push(_ context.Context, _ string, _ *pbcloud.MetricSet) error { +func (pm *pusherMock) push(_ string, _ *pbcloud.MetricSet) error { pm.pushCalled++ return nil } diff --git a/output/cloud/expv2/metrics_client.go b/output/cloud/expv2/metrics_client.go index 09ec19f08586..a36208d72a67 100644 --- a/output/cloud/expv2/metrics_client.go +++ b/output/cloud/expv2/metrics_client.go @@ -5,8 +5,8 @@ import ( "context" "errors" "fmt" + "io" "net/http" - "sync" "time" "github.com/klauspost/compress/snappy" @@ -18,25 +18,21 @@ import ( "go.k6.io/k6/output/cloud/expv2/pbcloud" ) -type httpDoer interface { - Do(*http.Request) (*http.Response, error) -} - // metricsClient is a Protobuf over HTTP client for sending // the collected metrics from the Cloud output // to the remote service. type metricsClient struct { - httpClient httpDoer + httpClient *cloudapi.Client logger logrus.FieldLogger token string userAgent string - - pushBufferPool sync.Pool - baseURL string + baseURL string } // newMetricsClient creates and initializes a new MetricsClient. -func newMetricsClient(logger logrus.FieldLogger, host string, token string) (*metricsClient, error) { +func newMetricsClient( + logger logrus.FieldLogger, host string, token string, timeout time.Duration, +) (*metricsClient, error) { if host == "" { return nil, errors.New("host is required") } @@ -44,21 +40,17 @@ func newMetricsClient(logger logrus.FieldLogger, host string, token string) (*me return nil, errors.New("token is required") } return &metricsClient{ - httpClient: &http.Client{Timeout: 5 * time.Second}, + httpClient: cloudapi.NewClient(logger, token, host, "v"+consts.Version, timeout), logger: logger, baseURL: host + "/v2/metrics/", token: token, userAgent: "k6cloud/v" + consts.Version, - pushBufferPool: sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, - }, }, nil } -// Push pushes the provided metrics the given test run. -func (mc *metricsClient) push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error { +// Push pushes the provided metrics the given test run. Inject a timeout context +// for limiting the request duration. +func (mc *metricsClient) push(referenceID string, samples *pbcloud.MetricSet) error { if referenceID == "" { return errors.New("TestRunID of the test is required") } @@ -73,28 +65,25 @@ func (mc *metricsClient) push(ctx context.Context, referenceID string, samples * // we don't expect to share this client across different refID // with a bit of effort we can find a way to just allocate once url := mc.baseURL + referenceID - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b)) + req, err := http.NewRequestWithContext( + context.Background(), http.MethodPost, url, io.NopCloser(bytes.NewReader(b))) if err != nil { return err } - req.Header.Set("User-Agent", mc.userAgent) + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(b)), nil + } + req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "snappy") req.Header.Set("K6-Metrics-Protocol-Version", "2.0") - req.Header.Set("Authorization", "Token "+mc.token) - resp, err := mc.httpClient.Do(req) + err = mc.httpClient.Do(req, nil) if err != nil { return err } - defer func() { - _ = resp.Body.Close() - }() - if err := cloudapi.CheckResponse(resp); err != nil { - return err - } mc.logger.WithField("t", time.Since(start)).WithField("size", len(b)). Debug("Pushed the collected metrics to the Cloud service") return nil diff --git a/output/cloud/expv2/metrics_client_test.go b/output/cloud/expv2/metrics_client_test.go index ce7aef1386a0..4addd61a474d 100644 --- a/output/cloud/expv2/metrics_client_test.go +++ b/output/cloud/expv2/metrics_client_test.go @@ -1,28 +1,18 @@ package expv2 import ( - "bytes" - "context" - "fmt" "io" "net/http" "net/http/httptest" - "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/output/cloud/expv2/pbcloud" ) -type httpDoerFunc func(*http.Request) (*http.Response, error) - -func (fn httpDoerFunc) Do(r *http.Request) (*http.Response, error) { - return fn(r) -} - func TestMetricsClientPush(t *testing.T) { t.Parallel() @@ -47,12 +37,11 @@ func TestMetricsClientPush(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(h)) defer ts.Close() - mc, err := newMetricsClient(testutils.NewLogger(t), ts.URL, "fake-token") + mc, err := newMetricsClient(testutils.NewLogger(t), ts.URL, "fake-token", 1*time.Second) require.NoError(t, err) - mc.httpClient = ts.Client() mset := pbcloud.MetricSet{} - err = mc.push(context.TODO(), "test-ref-id", &mset) + err = mc.push("test-ref-id", &mset) <-done require.NoError(t, err) assert.Equal(t, 1, reqs) @@ -67,58 +56,9 @@ func TestMetricsClientPushUnexpectedStatus(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(h)) defer ts.Close() - mc, err := newMetricsClient(nil, ts.URL, "fake-token") + mc, err := newMetricsClient(nil, ts.URL, "fake-token", 10*time.Millisecond) require.NoError(t, err) - mc.httpClient = ts.Client() - err = mc.push(context.TODO(), "test-ref-id", nil) + err = mc.push("test-ref-id", nil) assert.ErrorContains(t, err, "500 Internal Server Error") } - -func TestMetricsClientPushError(t *testing.T) { - t.Parallel() - - httpClientMock := func(_ *http.Request) (*http.Response, error) { - return nil, fmt.Errorf("fake generated error") - } - - mc := metricsClient{ - httpClient: httpDoerFunc(httpClientMock), - pushBufferPool: sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, - }, - } - - err := mc.push(context.TODO(), "test-ref-id", nil) - assert.ErrorContains(t, err, "fake generated error") -} - -func TestMetricsClientPushStructuredError(t *testing.T) { - t.Parallel() - - exp := cloudapi.ErrorResponse{Code: 1, Message: "test message"} - - httpClientMock := func(_ *http.Request) (*http.Response, error) { - b := `{"error":{"code":1,"message":"test message"}}` - r := &http.Response{ - StatusCode: http.StatusBadRequest, - Body: io.NopCloser(bytes.NewBuffer([]byte(b))), - } - exp.Response = r - return r, nil - } - - mc := metricsClient{ - httpClient: httpDoerFunc(httpClientMock), - pushBufferPool: sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, - }, - } - - err := mc.push(context.TODO(), "test-ref-id", nil) - assert.Equal(t, exp, err) -} diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 1593deb1d84a..87f3e7b0391f 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -3,7 +3,6 @@ package expv2 import ( - "context" "errors" "fmt" "net/http" @@ -19,6 +18,10 @@ import ( "github.com/sirupsen/logrus" ) +type flusher interface { + flush() error +} + // Output sends result data to the k6 Cloud service. type Output struct { output.SampleBuffer @@ -28,7 +31,7 @@ type Output struct { referenceID string collector *collector - flushing *metricsFlusher + flushing flusher // wg tracks background goroutines wg sync.WaitGroup @@ -46,7 +49,7 @@ type Output struct { func New(logger logrus.FieldLogger, conf cloudapi.Config) (*Output, error) { return &Output{ config: conf, - logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}), + logger: logger.WithField("output", "cloudv2"), abort: make(chan struct{}), stop: make(chan struct{}), }, nil @@ -77,7 +80,8 @@ func (o *Output) Start() error { return fmt.Errorf("failed to initialize the samples collector: %w", err) } - mc, err := newMetricsClient(o.logger, o.config.Host.String, o.config.Token.String) + mc, err := newMetricsClient( + o.logger, o.config.Host.String, o.config.Token.String, o.config.Timeout.TimeDuration()) if err != nil { return fmt.Errorf("failed to initialize the http metrics flush client: %w", err) } @@ -91,10 +95,10 @@ func (o *Output) Start() error { maxSeriesInSingleBatch: int(o.config.MaxMetricSamplesPerPackage.Int64), } - o.periodicInvoke(o.config.MetricPushInterval.TimeDuration(), o.flushMetrics) + o.runFlushWorkers() o.periodicInvoke(o.config.AggregationPeriod.TimeDuration(), o.collectSamples) - o.logger.Debug("Started!") + o.logger.WithField("config", printableConfig(o.config)).Debug("Started!") return nil } @@ -122,6 +126,31 @@ func (o *Output) StopWithTestError(testErr error) error { return nil } +func (o *Output) runFlushWorkers() { + t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) + + for i := int64(0); i < o.config.MetricPushConcurrency.Int64; i++ { + o.wg.Add(1) + go func() { + defer func() { + t.Stop() + o.wg.Done() + }() + + for { + select { + case <-t.C: + o.flushMetrics() + case <-o.stop: + return + case <-o.abort: + return + } + } + }() + } +} + // AddMetricSamples receives the samples streaming. func (o *Output) AddMetricSamples(s []metrics.SampleContainer) { // TODO: this and the next operation are two locking operations, @@ -178,10 +207,7 @@ func (o *Output) collectSamples() { func (o *Output) flushMetrics() { start := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), o.config.MetricPushInterval.TimeDuration()) - defer cancel() - - err := o.flushing.Flush(ctx) + err := o.flushing.flush() if err != nil { o.handleFlushError(err) return @@ -231,3 +257,29 @@ func (o *Output) handleFlushError(err error) { } }) } + +func printableConfig(c cloudapi.Config) map[string]any { + m := map[string]any{ + "host": c.Host.String, + "name": c.Name.String, + "timeout": c.Timeout.String(), + "webAppURL": c.WebAppURL.String, + "projectID": c.ProjectID.Int64, + "pushRefID": c.PushRefID.String, + "stopOnError": c.StopOnError.Bool, + "testRunDetails": c.TestRunDetails.String, + "aggregationPeriod": c.AggregationPeriod.String(), + "aggregationWaitPeriod": c.AggregationWaitPeriod.String(), + "maxMetricSamplesPerPackage": c.MaxMetricSamplesPerPackage.Int64, + "metricPushConcurrency": c.MetricPushConcurrency.Int64, + "metricPushInterval": c.MetricPushInterval.String(), + } + + tokenValue := "***" + if !c.Token.Valid { + tokenValue = "" + } + m["token"] = tokenValue + + return m +} diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 9fdf22d78acc..ec4a4e8730fb 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/http" + "sync" "sync/atomic" "testing" "time" @@ -278,3 +279,132 @@ func TestOutputStopWithTestError(t *testing.T) { require.NoError(t, o.Start()) require.NoError(t, o.StopWithTestError(errors.New("an error"))) } + +func TestOutputFlushMetricsConcurrently(t *testing.T) { + t.Parallel() + + done := make(chan struct{}) + + // It blocks on the first request so it asserts that the flush + // operations continues concurrently if one more tick is sent in the meantime. + // + // The second request unblocks. + var requestsCount int64 + flusherMock := func() { + updated := atomic.AddInt64(&requestsCount, 1) + if updated == 2 { + close(done) + return + } + <-done + } + + o := Output{logger: testutils.NewLogger(t)} + o.config.MetricPushConcurrency = null.IntFrom(2) + o.config.MetricPushInterval = types.NullDurationFrom(1) // loop + o.flushing = flusherFunc(flusherMock) + o.runFlushWorkers() + + select { + case <-time.After(5 * time.Second): + t.Error("timed out") + case <-done: + assert.NotZero(t, atomic.LoadInt64(&requestsCount)) + } +} + +func TestOutputFlushWorkersStop(t *testing.T) { + t.Parallel() + + o := Output{ + logger: testutils.NewLogger(t), + stop: make(chan struct{}), + } + o.config.MetricPushInterval = types.NullDurationFrom(1 * time.Millisecond) + + once := sync.Once{} + flusherMock := func() { + // it asserts that flushers are set and the flush is invoked + once.Do(func() { close(o.stop) }) + } + + o.flushing = flusherFunc(flusherMock) + o.runFlushWorkers() + + // it asserts that all flushers exit + done := make(chan struct{}) + go func() { + defer close(done) + o.wg.Wait() + }() + select { + case <-time.After(time.Second): + t.Error("timed out") + case <-done: + } +} + +func TestOutputFlushWorkersAbort(t *testing.T) { + t.Parallel() + + o := Output{ + logger: testutils.NewLogger(t), + abort: make(chan struct{}), + } + o.config.MetricPushInterval = types.NullDurationFrom(1 * time.Millisecond) + + once := sync.Once{} + flusherMock := func() { + // it asserts that flushers are set and the flush func is invoked + once.Do(func() { close(o.abort) }) + } + + o.flushing = flusherFunc(flusherMock) + o.runFlushWorkers() + + // it asserts that all flushers exit + done := make(chan struct{}) + go func() { + defer close(done) + o.wg.Wait() + }() + select { + case <-time.After(time.Second): + t.Error("timed out") + case <-done: + } +} + +type flusherFunc func() + +func (ff flusherFunc) flush() error { + ff() + return nil +} + +func TestPrintableConfig(t *testing.T) { + t.Parallel() + + c := cloudapi.NewConfig() + c.Host = null.NewString("http://test.host", false) + c.Name = null.NewString("test-name", false) + c.PushRefID = null.NewString("test-id-123", false) + c.StopOnError = null.NewBool(true, false) + c.MetricPushConcurrency = null.NewInt(5, false) + c.AggregationPeriod = types.NewNullDuration(10*time.Second, false) + c.ProjectID = null.NewInt(123, false) + c.Token = null.StringFrom("my personal token") + + exp := map[string]any{ + "host": "http://test.host", + "name": "test-name", + "pushRefID": "test-id-123", + "projectID": int64(123), + "token": "***", + "stopOnError": true, + "aggregationPeriod": "10s", + "metricPushConcurrency": int64(5), + } + + assert.Subset(t, printableConfig(c), exp) +}