diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index e175f3db526..3fbffdf0ed9 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -363,8 +363,8 @@ func TestSchedulerSystemTags(t *testing.T) { expTrailPVUTags := expCommonTrailTags.With("scenario", "per_vu_test") expTrailSITags := expCommonTrailTags.With("scenario", "shared_test") - expNetTrailPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test") - expNetTrailSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test") + expDataSentPVUTags := testRunState.RunTags.With("group", "").With("scenario", "per_vu_test") + expDataSentSITags := testRunState.RunTags.With("group", "").With("scenario", "shared_test") var gotCorrectTags int for { @@ -375,9 +375,13 @@ func TestSchedulerSystemTags(t *testing.T) { if s.Tags == expTrailPVUTags || s.Tags == expTrailSITags { gotCorrectTags++ } - case *netext.NetTrail: - if s.Tags == expNetTrailPVUTags || s.Tags == expNetTrailSITags { - gotCorrectTags++ + default: + for _, sample := range s.GetSamples() { + if sample.Metric.Name == metrics.DataSentName { + if sample.Tags == expDataSentPVUTags || sample.Tags == expDataSentSITags { + gotCorrectTags++ + } + } } } @@ -487,7 +491,7 @@ func TestSchedulerRunCustomTags(t *testing.T) { defer stopEmission() require.NoError(t, execScheduler.Run(ctx, ctx, samples)) }() - var gotTrailTag, gotNetTrailTag bool + var gotTrailTag, gotDataSentTag bool for { select { case sample := <-samples: @@ -497,14 +501,16 @@ func TestSchedulerRunCustomTags(t *testing.T) { gotTrailTag = true } } - if netTrail, ok := sample.(*netext.NetTrail); ok && !gotNetTrailTag { - tags := netTrail.Tags.Map() - if v, ok := tags["customTag"]; ok && v == "value" { - gotNetTrailTag = true + for _, s := range sample.GetSamples() { + if s.Metric.Name == metrics.DataSentName && !gotDataSentTag { + tags := s.Tags.Map() + if v, ok := tags["customTag"]; ok && v == "value" { + gotDataSentTag = true + } } } case <-done: - if !gotTrailTag || !gotNetTrailTag { + if !gotTrailTag || !gotDataSentTag { assert.FailNow(t, "a sample with expected tag wasn't received") } return @@ -692,11 +698,15 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { gotSampleTags++ } } - case *netext.NetTrail: - tags := s.Tags.Map() - for _, expTags := range expectedNetTrailTags { - if reflect.DeepEqual(expTags, tags) { - gotSampleTags++ + case metrics.Samples: + for _, sample := range s.GetSamples() { + if sample.Metric.Name == metrics.DataSentName { + tags := sample.Tags.Map() + for _, expTags := range expectedNetTrailTags { + if reflect.DeepEqual(expTags, tags) { + gotSampleTags++ + } + } } } case metrics.ConnectedSamples: @@ -1275,7 +1285,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { Value: expValue, } } - getDummyTrail := func(group string, emitIterations bool, addExpTags ...string) metrics.SampleContainer { + getNetworkSamples := func(group string, addExpTags ...string) metrics.SampleContainer { expTags := []string{"group", group} expTags = append(expTags, addExpTags...) dialer := netext.NewDialer( @@ -1284,25 +1294,56 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { ) ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)} - return dialer.GetTrail(time.Now(), time.Now(), true, emitIterations, ctm, piState.BuiltinMetrics) + return dialer.IOSamples(time.Now(), ctm, piState.BuiltinMetrics) + } + + getIterationsSamples := func(group string, addExpTags ...string) metrics.SampleContainer { + expTags := []string{"group", group} + expTags = append(expTags, addExpTags...) + ctm := metrics.TagsAndMeta{Tags: getTags(piState.Registry, expTags...)} + startTime := time.Now() + endTime := time.Now() + + return metrics.Samples([]metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: piState.BuiltinMetrics.IterationDuration, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: metrics.D(endTime.Sub(startTime)), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: piState.BuiltinMetrics.Iterations, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: 1, + }, + }) } // Initially give a long time (5s) for the execScheduler to start expectIn(0, 5000, getSample(1, testCounter, "group", "::setup", "place", "setupBeforeSleep")) expectIn(900, 1100, getSample(2, testCounter, "group", "::setup", "place", "setupAfterSleep")) - expectIn(0, 100, getDummyTrail("::setup", false)) + expectIn(0, 100, getNetworkSamples("::setup")) expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default")) expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default")) - expectIn(0, 100, getDummyTrail("", true, "scenario", "default")) + expectIn(0, 100, getNetworkSamples("", "scenario", "default")) + expectIn(0, 100, getIterationsSamples("", "scenario", "default")) expectIn(0, 100, getSample(5, testCounter, "group", "", "place", "defaultBeforeSleep", "scenario", "default")) expectIn(900, 1100, getSample(6, testCounter, "group", "", "place", "defaultAfterSleep", "scenario", "default")) - expectIn(0, 100, getDummyTrail("", true, "scenario", "default")) + expectIn(0, 100, getNetworkSamples("", "scenario", "default")) + expectIn(0, 100, getIterationsSamples("", "scenario", "default")) expectIn(0, 1000, getSample(3, testCounter, "group", "::teardown", "place", "teardownBeforeSleep")) expectIn(900, 1100, getSample(4, testCounter, "group", "::teardown", "place", "teardownAfterSleep")) - expectIn(0, 100, getDummyTrail("::teardown", false)) + expectIn(0, 100, getNetworkSamples("::teardown")) for { select { diff --git a/js/runner.go b/js/runner.go index a3741fc7f86..56dae12768a 100644 --- a/js/runner.go +++ b/js/runner.go @@ -865,15 +865,44 @@ func (u *VU) runFn( u.Transport.CloseIdleConnections() } - u.state.Samples <- u.Dialer.GetTrail( - startTime, endTime, isFullIteration, - isDefault, u.state.Tags.GetCurrentValues(), u.Runner.preInitState.BuiltinMetrics) + builtinMetrics := u.Runner.preInitState.BuiltinMetrics + ctm := u.state.Tags.GetCurrentValues() + u.state.Samples <- u.Dialer.IOSamples(endTime, ctm, builtinMetrics) + + if isFullIteration && isDefault { + u.state.Samples <- iterationSamples(startTime, endTime, ctm, builtinMetrics) + } v = unPromisify(v) return v, isFullIteration, endTime.Sub(startTime), err } +func iterationSamples( + startTime, endTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, +) metrics.Samples { + return metrics.Samples([]metrics.Sample{ + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.IterationDuration, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: metrics.D(endTime.Sub(startTime)), + }, + { + TimeSeries: metrics.TimeSeries{ + Metric: builtinMetrics.Iterations, + Tags: ctm.Tags, + }, + Time: endTime, + Metadata: ctm.Metadata, + Value: 1, + }, + }) +} + func (u *ActiveVU) incrIteration() { u.iteration++ u.state.Iteration = u.iteration diff --git a/js/runner_test.go b/js/runner_test.go index c0ca818086c..0808547a42f 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -794,10 +794,9 @@ func TestVUIntegrationMetrics(t *testing.T) { require.NoError(t, err) sampleCount := 0 builtinMetrics := r.preInitState.BuiltinMetrics - for i, sampleC := range metrics.GetBufferedSamples(samples) { - for j, s := range sampleC.GetSamples() { - sampleCount++ - switch i + j { + for _, sampleC := range metrics.GetBufferedSamples(samples) { + for _, s := range sampleC.GetSamples() { + switch sampleCount { case 0: assert.Equal(t, 5.0, s.Value) assert.Equal(t, "my_metric", s.Metric.Name) @@ -814,6 +813,7 @@ func TestVUIntegrationMetrics(t *testing.T) { assert.Same(t, builtinMetrics.Iterations, s.Metric, "`iterations` sample is after `iteration_duration`") assert.Equal(t, float64(1), s.Value) } + sampleCount++ } } assert.Equal(t, sampleCount, 5) diff --git a/lib/netext/dialer.go b/lib/netext/dialer.go index e9db26fb2b6..0faaef9fb63 100644 --- a/lib/netext/dialer.go +++ b/lib/netext/dialer.go @@ -69,23 +69,20 @@ func (d *Dialer) DialContext(ctx context.Context, proto, addr string) (net.Conn, return conn, err } -// GetTrail creates a new NetTrail instance with the Dialer -// sent and received data metrics and the supplied times and tags. -// TODO: Refactor this according to -// https://github.com/k6io/k6/pull/1203#discussion_r337938370 -func (d *Dialer) GetTrail( - startTime, endTime time.Time, fullIteration bool, emitIterations bool, ctm metrics.TagsAndMeta, - builtinMetrics *metrics.BuiltinMetrics, -) *NetTrail { +// IOSamples returns samples for data send and received since it last call and zeros out. +// It uses the provided time as the sample time and tags and builtinMetrics to build the samples. +func (d *Dialer) IOSamples( + sampleTime time.Time, ctm metrics.TagsAndMeta, builtinMetrics *metrics.BuiltinMetrics, +) metrics.SampleContainer { bytesWritten := atomic.SwapInt64(&d.BytesWritten, 0) bytesRead := atomic.SwapInt64(&d.BytesRead, 0) - samples := []metrics.Sample{ + return metrics.Samples([]metrics.Sample{ { TimeSeries: metrics.TimeSeries{ Metric: builtinMetrics.DataSent, Tags: ctm.Tags, }, - Time: endTime, + Time: sampleTime, Metadata: ctm.Metadata, Value: float64(bytesWritten), }, @@ -94,43 +91,11 @@ func (d *Dialer) GetTrail( Metric: builtinMetrics.DataReceived, Tags: ctm.Tags, }, - Time: endTime, + Time: sampleTime, Metadata: ctm.Metadata, Value: float64(bytesRead), }, - } - if fullIteration { - samples = append(samples, metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.IterationDuration, - Tags: ctm.Tags, - }, - Time: endTime, - Metadata: ctm.Metadata, - Value: metrics.D(endTime.Sub(startTime)), - }) - if emitIterations { - samples = append(samples, metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.Iterations, - Tags: ctm.Tags, - }, - Time: endTime, - Metadata: ctm.Metadata, - Value: 1, - }) - } - } - - return &NetTrail{ - BytesRead: bytesRead, - BytesWritten: bytesWritten, - FullIteration: fullIteration, - StartTime: startTime, - EndTime: endTime, - Tags: ctm.Tags, - Samples: samples, - } + }) } func (d *Dialer) getDialAddr(addr string) (string, error) { @@ -208,36 +173,6 @@ func (d *Dialer) getConfiguredHost(addr, host, port string) (*types.Host, error) return nil, nil //nolint:nilnil } -// NetTrail contains information about the exchanged data size and length of a -// series of connections from a particular netext.Dialer -type NetTrail struct { - BytesRead int64 - BytesWritten int64 - FullIteration bool - StartTime time.Time - EndTime time.Time - Tags *metrics.TagSet - Samples []metrics.Sample -} - -// Ensure that interfaces are implemented correctly -var _ metrics.ConnectedSampleContainer = &NetTrail{} - -// GetSamples implements the metrics.SampleContainer interface. -func (ntr *NetTrail) GetSamples() []metrics.Sample { - return ntr.Samples -} - -// GetTags implements the metrics.ConnectedSampleContainer interface. -func (ntr *NetTrail) GetTags() *metrics.TagSet { - return ntr.Tags -} - -// GetTime implements the metrics.ConnectedSampleContainer interface. -func (ntr *NetTrail) GetTime() time.Time { - return ntr.EndTime -} - // Conn wraps net.Conn and keeps track of sent and received data size type Conn struct { net.Conn diff --git a/output/cloud/insights/collect_test.go b/output/cloud/insights/collect_test.go index aeca952b9bc..6350dc8eb76 100644 --- a/output/cloud/insights/collect_test.go +++ b/output/cloud/insights/collect_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/cloudapi/insights" - "go.k6.io/k6/lib/netext" "go.k6.io/k6/lib/netext/httpext" "go.k6.io/k6/metrics" ) @@ -51,9 +50,6 @@ func Test_Collector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestM &httpext.Trail{ // HTTP trail without trace ID should be ignored }, - &netext.NetTrail{ - // Net trail should be ignored - }, &httpext.Trail{ EndTime: time.Unix(20, 0), Duration: time.Second,