Skip to content

Commit

Permalink
output/cloud: Versioning (#3041)
Browse files Browse the repository at this point in the history
The output acts now as a gateway starting the required versioned output based on the newly defined Version config option.
  • Loading branch information
codebien authored May 16, 2023
1 parent 311d94f commit bc11542
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 5 deletions.
6 changes: 5 additions & 1 deletion cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
TestRunDetails null.String `json:"testRunDetails" envconfig:"K6_CLOUD_TEST_RUN_DETAILS"`
NoCompress null.Bool `json:"noCompress" envconfig:"K6_CLOUD_NO_COMPRESS"`
StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"`
APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"`

MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"`

Expand Down Expand Up @@ -149,6 +150,7 @@ func NewConfig() Config {
MetricPushConcurrency: null.NewInt(1, false),
MaxMetricSamplesPerPackage: null.NewInt(100000, false),
Timeout: types.NewNullDuration(1*time.Minute, false),
APIVersion: null.NewInt(1, false),
// Aggregation is disabled by default, since AggregationPeriod has no default value
// but if it's enabled manually or from the cloud service, those are the default values it will use:
AggregationCalcInterval: types.NewNullDuration(3*time.Second, false),
Expand Down Expand Up @@ -200,6 +202,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.Timeout.Valid {
c.Timeout = cfg.Timeout
}
if cfg.APIVersion.Valid {
c.APIVersion = cfg.APIVersion
}
if cfg.MaxMetricSamplesPerPackage.Valid {
c.MaxMetricSamplesPerPackage = cfg.MaxMetricSamplesPerPackage
}
Expand All @@ -209,7 +214,6 @@ func (c Config) Apply(cfg Config) Config {
if cfg.MetricPushConcurrency.Valid {
c.MetricPushConcurrency = cfg.MetricPushConcurrency
}

if cfg.AggregationPeriod.Valid {
c.AggregationPeriod = cfg.AggregationPeriod
}
Expand Down
1 change: 1 addition & 0 deletions cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestConfigApply(t *testing.T) {
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
Timeout: types.NewNullDuration(5*time.Second, true),
APIVersion: null.NewInt(2, true),
MaxMetricSamplesPerPackage: null.NewInt(2, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
Expand Down
20 changes: 16 additions & 4 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ type versionedOutput interface {
AddMetricSamples(samples []metrics.SampleContainer)
}

type apiVersion int64

const (
apiVersionUndefined apiVersion = iota
apiVersion1
// apiVersion2 // TODO: add version 2
)

// Output sends result data to the k6 Cloud service.
type Output struct {
versionedOutput
Expand Down Expand Up @@ -159,12 +167,11 @@ func (out *Output) Start() error {
thresholds[name] = append(thresholds[name], threshold.Source)
}
}
maxVUs := lib.GetMaxPossibleVUs(out.executionPlan)

testRun := &cloudapi.TestRun{
Name: out.config.Name.String,
ProjectID: out.config.ProjectID.Int64,
VUsMax: int64(maxVUs),
VUsMax: int64(lib.GetMaxPossibleVUs(out.executionPlan)),
Thresholds: thresholds,
Duration: out.duration,
}
Expand Down Expand Up @@ -323,9 +330,14 @@ func (out *Output) startVersionedOutput() error {
if out.referenceID == "" {
return errors.New("ReferenceID is required")
}

var err error
out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client)
switch out.config.APIVersion.Int64 {
case int64(apiVersion1):
out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client)
default:
err = fmt.Errorf("v%d is an unexpected version", out.config.APIVersion.Int64)
}

if err != nil {
return err
}
Expand Down
214 changes: 214 additions & 0 deletions output/cloud/output_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package cloud

import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
Expand All @@ -10,11 +12,15 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/errext"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
cloudv1 "go.k6.io/k6/output/cloud/v1"
"gopkg.in/guregu/null.v3"
)

func TestNewOutputNameResolution(t *testing.T) {
Expand Down Expand Up @@ -127,3 +133,211 @@ func TestOutputCreateTestWithConfigOverwrite(t *testing.T) {

require.NoError(t, out.StopWithTestError(nil))
}

func TestOutputStartVersionError(t *testing.T) {
t.Parallel()
o, err := newOutput(output.Params{
Logger: testutils.NewLogger(t),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &metrics.DefaultSystemTagSet,
},
Environment: map[string]string{
"K6_CLOUD_API_VERSION": "99",
},
ScriptPath: &url.URL{Path: "/script.js"},
})
require.NoError(t, err)

o.referenceID = "123"
err = o.startVersionedOutput()
require.ErrorContains(t, err, "v99 is an unexpected version")
}

func TestOutputStartVersionedOutputV1(t *testing.T) {
t.Parallel()

o := Output{
referenceID: "123",
config: cloudapi.Config{
APIVersion: null.IntFrom(1),
// Here, we are mostly silencing the flushing op
MetricPushInterval: types.NullDurationFrom(1 * time.Hour),
},
}

err := o.startVersionedOutput()
require.NoError(t, err)

_, ok := o.versionedOutput.(*cloudv1.Output)
assert.True(t, ok)
}

func TestOutputStartWithReferenceID(t *testing.T) {
t.Parallel()

handler := func(w http.ResponseWriter, r *http.Request) {
// no calls are expected to the cloud service when
// the reference ID is passed
t.Error("got unexpected call")
}
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

out, err := newOutput(output.Params{
Logger: testutils.NewLogger(t),
Environment: map[string]string{
"K6_CLOUD_HOST": ts.URL,
"K6_CLOUD_PUSH_REF_ID": "my-passed-id",
},
ScriptOptions: lib.Options{
SystemTags: &metrics.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
require.NoError(t, err)
require.NoError(t, out.Start())
require.NoError(t, out.Stop())
}

func TestCloudOutputDescription(t *testing.T) {
t.Parallel()

t.Run("WithTestRunDetails", func(t *testing.T) {
t.Parallel()
o := Output{referenceID: "74"}
o.config.TestRunDetails = null.StringFrom("my-custom-string")
assert.Equal(t, "cloud (my-custom-string)", o.Description())
})
t.Run("WithWebAppURL", func(t *testing.T) {
t.Parallel()
o := Output{referenceID: "74"}
o.config.WebAppURL = null.StringFrom("mywebappurl.com")
assert.Equal(t, "cloud (mywebappurl.com/runs/74)", o.Description())
})
}

func TestOutputStopWithTestError(t *testing.T) {
t.Parallel()

done := make(chan struct{})

handler := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v1/tests/test-ref-id-1234":
b, err := io.ReadAll(r.Body)
require.NoError(t, err)

// aborted by system status
expB := `{"result_status":0, "run_status":6, "thresholds":{}}`
require.JSONEq(t, expB, string(b))

w.WriteHeader(http.StatusOK)
close(done)
default:
http.Error(w, "not expected path", http.StatusInternalServerError)
}
}
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

out, err := newOutput(output.Params{
Logger: testutils.NewLogger(t),
Environment: map[string]string{
"K6_CLOUD_HOST": ts.URL,
},
ScriptOptions: lib.Options{
SystemTags: &metrics.DefaultSystemTagSet,
},
ScriptPath: &url.URL{Path: "/script.js"},
})
require.NoError(t, err)

calledStopFn := false
out.referenceID = "test-ref-id-1234"
out.versionedOutput = versionedOutputMock{
callback: func(fn string) {
if fn == "StopWithTestError" {
calledStopFn = true
}
},
}

fakeErr := errors.New("this is my error")
require.NoError(t, out.StopWithTestError(fakeErr))
assert.True(t, calledStopFn)

select {
case <-time.After(1 * time.Second):
t.Error("timed out")
case <-done:
}
}

func TestOutputGetStatusRun(t *testing.T) {
t.Parallel()

t.Run("Success", func(t *testing.T) {
t.Parallel()
o := Output{}
assert.Equal(t, cloudapi.RunStatusFinished, o.getRunStatus(nil))
})
t.Run("WithErrorNoAbortReason", func(t *testing.T) {
t.Parallel()
o := Output{logger: testutils.NewLogger(t)}
assert.Equal(t, cloudapi.RunStatusAbortedSystem, o.getRunStatus(errors.New("my-error")))
})
t.Run("WithAbortReason", func(t *testing.T) {
t.Parallel()
o := Output{}
errWithReason := errext.WithAbortReasonIfNone(
errors.New("my-original-error"),
errext.AbortedByOutput,
)
assert.Equal(t, cloudapi.RunStatusAbortedSystem, o.getRunStatus(errWithReason))
})
}

func TestOutputProxyAddMetricSamples(t *testing.T) {
t.Parallel()

called := false
o := &Output{
versionedOutput: versionedOutputMock{
callback: func(fn string) {
if fn != "AddMetricSamples" {
return
}
called = true
},
},
}
o.AddMetricSamples([]metrics.SampleContainer{})
assert.True(t, called)
}

type versionedOutputMock struct {
callback func(name string)
}

func (o versionedOutputMock) Start() error {
o.callback("Start")
return nil
}

func (o versionedOutputMock) StopWithTestError(testRunErr error) error {
o.callback("StopWithTestError")
return nil
}

func (o versionedOutputMock) SetTestRunStopCallback(_ func(error)) {
o.callback("SetTestRunStopCallback")
}

func (o versionedOutputMock) SetReferenceID(id string) {
o.callback("SetReferenceID")
}

func (o versionedOutputMock) AddMetricSamples(samples []metrics.SampleContainer) {
o.callback("AddMetricSamples")
}

0 comments on commit bc11542

Please sign in to comment.