Skip to content

Commit

Permalink
Merge e1b7419 into 9254ce7
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Jun 15, 2023
2 parents 9254ce7 + e1b7419 commit 9b18044
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 149 deletions.
1 change: 1 addition & 0 deletions cloudapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (c *Client) do(req *http.Request, v interface{}, attempt int) (retry bool,

defer func() {
if resp != nil {
_, _ = io.Copy(io.Discard, resp.Body)
if cerr := resp.Body.Close(); cerr != nil && err == nil {
err = cerr
}
Expand Down
14 changes: 7 additions & 7 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type pusher interface {
push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error
push(referenceID string, samples *pbcloud.MetricSet) error
}

type metricsFlusher struct {
Expand All @@ -19,10 +19,10 @@ type metricsFlusher struct {
maxSeriesInSingleBatch int
}

// Flush flushes the queued buckets sending them to the remote Cloud service.
// If the number of time series collected is bigger than maximum batch size than
// it splits in chunks.
func (f *metricsFlusher) Flush(ctx context.Context) error {
// flush flushes the queued buckets sending them to the remote Cloud service.
// If the number of time series collected is bigger than maximum batch size
// then it splits in chunks.
func (f *metricsFlusher) flush(_ context.Context) error {
// drain the buffer
buckets := f.bq.PopAll()
if len(buckets) < 1 {
Expand All @@ -46,7 +46,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error {
}

// we hit the chunk size, let's flush
err := f.client.push(ctx, f.referenceID, msb.MetricSet)
err := f.client.push(f.referenceID, msb.MetricSet)
if err != nil {
return err
}
Expand All @@ -58,7 +58,7 @@ func (f *metricsFlusher) Flush(ctx context.Context) error {
}

// send the last (or the unique) MetricSet chunk to the remote service
return f.client.push(ctx, f.referenceID, msb.MetricSet)
return f.client.push(f.referenceID, msb.MetricSet)
}

type metricSetBuilder struct {
Expand Down
4 changes: 2 additions & 2 deletions output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestMetricsFlusherFlushChunk(t *testing.T) {
}
require.Len(t, bq.buckets, tc.series)

err := mf.Flush(context.TODO())
err := mf.flush(context.Background())
require.NoError(t, err)
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
}
Expand All @@ -92,7 +92,7 @@ type pusherMock struct {
pushCalled int
}

func (pm *pusherMock) push(_ context.Context, _ string, _ *pbcloud.MetricSet) error {
func (pm *pusherMock) push(_ string, _ *pbcloud.MetricSet) error {
pm.pushCalled++
return nil
}
16 changes: 10 additions & 6 deletions output/cloud/expv2/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ func TestOutputFlush(t *testing.T) {
defer ts.Close()

// init conifg
c := cloudapi.NewConfig()
c.Host = null.StringFrom(ts.URL)
c.Token = null.StringFrom("my-secret-token")
c.AggregationPeriod = types.NullDurationFrom(3 * time.Second)
c.AggregationWaitPeriod = types.NullDurationFrom(1 * time.Second)
conf := cloudapi.NewConfig()
conf.Host = null.StringFrom(ts.URL)
conf.Token = null.StringFrom("my-secret-token")
conf.AggregationPeriod = types.NullDurationFrom(3 * time.Second)
conf.AggregationWaitPeriod = types.NullDurationFrom(1 * time.Second)

logger := testutils.NewLogger(t)
cc := cloudapi.NewClient(logger, conf.Token.String, conf.Host.String,
"expv2/integration", conf.Timeout.TimeDuration())

// init and start the output
o, err := expv2.New(testutils.NewLogger(t), c)
o, err := expv2.New(logger, conf, cc)
require.NoError(t, err)
o.SetReferenceID("my-test-run-id-123")
require.NoError(t, o.Start())
Expand Down
68 changes: 22 additions & 46 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,45 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"
"strings"

"github.com/klauspost/compress/snappy"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoer interface {
Do(*http.Request) (*http.Response, error)
}

// metricsClient is a Protobuf over HTTP client for sending
// the collected metrics from the Cloud output
// to the remote service.
type metricsClient struct {
httpClient httpDoer
logger logrus.FieldLogger
token string
userAgent string

pushBufferPool sync.Pool
baseURL string
httpClient *cloudapi.Client
baseURL string
}

// newMetricsClient creates and initializes a new MetricsClient.
func newMetricsClient(logger logrus.FieldLogger, host string, token string) (*metricsClient, error) {
if host == "" {
return nil, errors.New("host is required")
}
if token == "" {
return nil, errors.New("token is required")
func newMetricsClient(c *cloudapi.Client) (*metricsClient, error) {
// Unfortunately, the cloudapi.Client works across different versions
// of the API, but it has the v1 harcoded so we need to trim the wrong path
// to be able to replace it with the correct one.
u := c.BaseURL()
if !strings.HasSuffix(u, "/v1") {
return nil, errors.New("a /v1 suffix is expected in the Cloud service's BaseURL path")
}
return &metricsClient{
httpClient: &http.Client{Timeout: 5 * time.Second},
logger: logger,
baseURL: host + "/v2/metrics/",
token: token,
userAgent: "k6cloud/v" + consts.Version,
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
httpClient: c,
baseURL: strings.TrimSuffix(u, "/v1") + "/v2/metrics/",
}, nil
}

// Push pushes the provided metrics the given test run.
func (mc *metricsClient) push(ctx context.Context, referenceID string, samples *pbcloud.MetricSet) error {
// Push the provided metrics for the given test run ID.
func (mc *metricsClient) push(referenceID string, samples *pbcloud.MetricSet) error {
if referenceID == "" {
return errors.New("TestRunID of the test is required")
}
start := time.Now()

b, err := newRequestBody(samples)
if err != nil {
Expand All @@ -73,30 +54,25 @@ func (mc *metricsClient) push(ctx context.Context, referenceID string, samples *
// we don't expect to share this client across different refID
// with a bit of effort we can find a way to just allocate once
url := mc.baseURL + referenceID
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b))
req, err := http.NewRequestWithContext(
context.Background(), http.MethodPost, url, io.NopCloser(bytes.NewReader(b)))
if err != nil {
return err
}

req.Header.Set("User-Agent", mc.userAgent)
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(b)), nil
}

req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("K6-Metrics-Protocol-Version", "2.0")
req.Header.Set("Authorization", "Token "+mc.token)

resp, err := mc.httpClient.Do(req)
err = mc.httpClient.Do(req, nil)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()

if err := cloudapi.CheckResponse(resp); err != nil {
return err
}
mc.logger.WithField("t", time.Since(start)).WithField("size", len(b)).
Debug("Pushed the collected metrics to the Cloud service")
return nil
}

Expand Down
75 changes: 7 additions & 68 deletions output/cloud/expv2/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
package expv2

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)

type httpDoerFunc func(*http.Request) (*http.Response, error)

func (fn httpDoerFunc) Do(r *http.Request) (*http.Response, error) {
return fn(r)
}

func TestMetricsClientPush(t *testing.T) {
t.Parallel()

done := make(chan struct{}, 1)
reqs := 0
h := func(rw http.ResponseWriter, r *http.Request) {
defer close(done)
reqs++

assert.Equal(t, "/v2/metrics/test-ref-id", r.URL.Path)
Expand All @@ -47,13 +35,12 @@ func TestMetricsClientPush(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc, err := newMetricsClient(testutils.NewLogger(t), ts.URL, "fake-token")
c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c)
require.NoError(t, err)
mc.httpClient = ts.Client()

mset := pbcloud.MetricSet{}
err = mc.push(context.TODO(), "test-ref-id", &mset)
<-done
err = mc.push("test-ref-id", &mset)
require.NoError(t, err)
assert.Equal(t, 1, reqs)
}
Expand All @@ -67,58 +54,10 @@ func TestMetricsClientPushUnexpectedStatus(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(h))
defer ts.Close()

mc, err := newMetricsClient(nil, ts.URL, "fake-token")
c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c)
require.NoError(t, err)
mc.httpClient = ts.Client()

err = mc.push(context.TODO(), "test-ref-id", nil)
err = mc.push("test-ref-id", nil)
assert.ErrorContains(t, err, "500 Internal Server Error")
}

func TestMetricsClientPushError(t *testing.T) {
t.Parallel()

httpClientMock := func(_ *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("fake generated error")
}

mc := metricsClient{
httpClient: httpDoerFunc(httpClientMock),
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}

err := mc.push(context.TODO(), "test-ref-id", nil)
assert.ErrorContains(t, err, "fake generated error")
}

func TestMetricsClientPushStructuredError(t *testing.T) {
t.Parallel()

exp := cloudapi.ErrorResponse{Code: 1, Message: "test message"}

httpClientMock := func(_ *http.Request) (*http.Response, error) {
b := `{"error":{"code":1,"message":"test message"}}`
r := &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(bytes.NewBuffer([]byte(b))),
}
exp.Response = r
return r, nil
}

mc := metricsClient{
httpClient: httpDoerFunc(httpClientMock),
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}

err := mc.push(context.TODO(), "test-ref-id", nil)
assert.Equal(t, exp, err)
}
Loading

0 comments on commit 9b18044

Please sign in to comment.