Skip to content

Commit

Permalink
cloud/expv2: Retry and concurrent flush
Browse files Browse the repository at this point in the history
Use the cloudapi Client for supporting retryable requests. The flush is
now concurrent across different flush workers.
  • Loading branch information
codebien committed Jun 13, 2023
1 parent 8e07e8c commit d1900f6
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 115 deletions.
16 changes: 7 additions & 9 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package expv2

import (
"context"

"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type pusher interface {
push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error
push(referenceID string, samples *pbcloud.MetricSet) error
}

type metricsFlusher struct {
Expand All @@ -19,10 +17,10 @@ type metricsFlusher struct {
maxSeriesInSingleBatch int
}

// Flush flushes the queued buckets sending them to the remote Cloud service.
// If the number of time series collected is bigger than maximum batch size than
// it splits in chunks.
func (f *metricsFlusher) Flush(ctx context.Context) error {
// flush flushes the queued buckets sending them to the remote Cloud service.
// If the number of time series collected is bigger than maximum batch size
// then it splits in chunks.
func (f *metricsFlusher) flush() error {
// drain the buffer
buckets := f.bq.PopAll()
if len(buckets) < 1 {
Expand All @@ -46,7 +44,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error {
}

// we hit the chunk size, let's flush
err := f.client.push(ctx, f.referenceID, msb.MetricSet)
err := f.client.push(f.referenceID, msb.MetricSet)
if err != nil {
return err
}
Expand All @@ -58,7 +56,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error {
}

// send the last (or the unique) MetricSet chunk to the remote service
return f.client.push(ctx, f.referenceID, msb.MetricSet)
return f.client.push(f.referenceID, msb.MetricSet)
}

type metricSetBuilder struct {
Expand Down
5 changes: 2 additions & 3 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package expv2

import (
"context"
"strconv"
"testing"

Expand Down Expand Up @@ -82,7 +81,7 @@ func TestMetricsFlusherFlushChunk(t *testing.T) {
}
require.Len(t, bq.buckets, tc.series)

err := mf.Flush(context.TODO())
err := mf.flush()
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
}
Expand All @@ -92,7 +91,7 @@ type pusherMock struct {
pushCalled int
}

func (pm *pusherMock) push(_ context.Context, _ string, _ *pbcloud.MetricSet) error {
func (pm *pusherMock) push(_ string, _ *pbcloud.MetricSet) error {
pm.pushCalled++
return nil
}
45 changes: 17 additions & 28 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/klauspost/compress/snappy"
Expand All @@ -18,47 +18,39 @@ import (
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoer interface {
Do(*http.Request) (*http.Response, error)
}

// metricsClient is a Protobuf over HTTP client for sending
// the collected metrics from the Cloud output
// to the remote service.
type metricsClient struct {
httpClient httpDoer
httpClient *cloudapi.Client
logger logrus.FieldLogger
token string
userAgent string

pushBufferPool sync.Pool
baseURL string
baseURL string
}

// newMetricsClient creates and initializes a new MetricsClient.
func newMetricsClient(logger logrus.FieldLogger, host string, token string) (*metricsClient, error) {
func newMetricsClient(
logger logrus.FieldLogger, host string, token string, timeout time.Duration,
) (*metricsClient, error) {
if host == "" {
return nil, errors.New("host is required")
}
if token == "" {
return nil, errors.New("token is required")
}
return &metricsClient{
httpClient: &http.Client{Timeout: 5 * time.Second},
httpClient: cloudapi.NewClient(logger, token, host, "v"+consts.Version, timeout),
logger: logger,
baseURL: host + "/v2/metrics/",
token: token,
userAgent: "k6cloud/v" + consts.Version,
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}, nil
}

// Push pushes the provided metrics the given test run.
func (mc *metricsClient) push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error {
// Push pushes the provided metrics the given test run. Inject a timeout context
// for limiting the request duration.
func (mc *metricsClient) push(referenceID string, samples *pbcloud.MetricSet) error {
if referenceID == "" {
return errors.New("TestRunID of the test is required")
}
Expand All @@ -73,28 +65,25 @@ func (mc *metricsClient) push(ctx context.Context, referenceID string, samples *
// we don't expect to share this client across different refID
// with a bit of effort we can find a way to just allocate once
url := mc.baseURL + referenceID
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b))
req, err := http.NewRequestWithContext(
context.Background(), http.MethodPost, url, io.NopCloser(bytes.NewReader(b)))
if err != nil {
return err
}

req.Header.Set("User-Agent", mc.userAgent)
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(b)), nil
}

req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("K6-Metrics-Protocol-Version", "2.0")
req.Header.Set("Authorization", "Token "+mc.token)

resp, err := mc.httpClient.Do(req)
err = mc.httpClient.Do(req, nil)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()

if err := cloudapi.CheckResponse(resp); err != nil {
return err
}
mc.logger.WithField("t", time.Since(start)).WithField("size", len(b)).
Debug("Pushed the collected metrics to the Cloud service")
return nil
Expand Down
70 changes: 5 additions & 65 deletions output/cloud/expv2/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package expv2

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoerFunc func(*http.Request) (*http.Response, error)

func (fn httpDoerFunc) Do(r *http.Request) (*http.Response, error) {
return fn(r)
}

func TestMetricsClientPush(t *testing.T) {
t.Parallel()

Expand All @@ -47,12 +37,11 @@ func TestMetricsClientPush(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc, err := newMetricsClient(testutils.NewLogger(t), ts.URL, "fake-token")
mc, err := newMetricsClient(testutils.NewLogger(t), ts.URL, "fake-token", 1*time.Second)
require.NoError(t, err)
mc.httpClient = ts.Client()

mset := pbcloud.MetricSet{}
err = mc.push(context.TODO(), "test-ref-id", &mset)
err = mc.push("test-ref-id", &mset)
<-done
require.NoError(t, err)
assert.Equal(t, 1, reqs)
Expand All @@ -67,58 +56,9 @@ func TestMetricsClientPushUnexpectedStatus(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc, err := newMetricsClient(nil, ts.URL, "fake-token")
mc, err := newMetricsClient(nil, ts.URL, "fake-token", 10*time.Millisecond)
require.NoError(t, err)
mc.httpClient = ts.Client()

err = mc.push(context.TODO(), "test-ref-id", nil)
err = mc.push("test-ref-id", nil)
assert.ErrorContains(t, err, "500 Internal Server Error")
}

func TestMetricsClientPushError(t *testing.T) {
t.Parallel()

httpClientMock := func(_ *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("fake generated error")
}

mc := metricsClient{
httpClient: httpDoerFunc(httpClientMock),
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}

err := mc.push(context.TODO(), "test-ref-id", nil)
assert.ErrorContains(t, err, "fake generated error")
}

func TestMetricsClientPushStructuredError(t *testing.T) {
t.Parallel()

exp := cloudapi.ErrorResponse{Code: 1, Message: "test message"}

httpClientMock := func(_ *http.Request) (*http.Response, error) {
b := `{"error":{"code":1,"message":"test message"}}`
r := &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(bytes.NewBuffer([]byte(b))),
}
exp.Response = r
return r, nil
}

mc := metricsClient{
httpClient: httpDoerFunc(httpClientMock),
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}

err := mc.push(context.TODO(), "test-ref-id", nil)
assert.Equal(t, exp, err)
}
Loading

0 comments on commit d1900f6

Please sign in to comment.