Skip to content

Commit

Permalink
k6 Insights (2/2): Integrate request metadata output to cloud output …
Browse files Browse the repository at this point in the history
…v1 (#3202)

* Integrate request metadata output to cloud output v1

* Implement `NewDefaultClientConfigForTestRun(...)`

* Add godoc comment to `NewDefaultClientConfigForTestRun(...)`

* Add early return in `flushRequestMetadatas()`

* Add `Timeout` to `ConnectConfig`
  • Loading branch information
Blinkuu authored Jul 19, 2023
1 parent dccdf16 commit 703970e
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 55 deletions.
44 changes: 39 additions & 5 deletions cloudapi/insights/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.k6.io/k6/lib/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -37,7 +36,7 @@ var (
// ClientConfig is the configuration for the client.
type ClientConfig struct {
IngesterHost string
Timeout types.NullDuration
Timeout time.Duration
ConnectConfig ClientConnectConfig
AuthConfig ClientAuthConfig
TLSConfig ClientTLSConfig
Expand All @@ -48,6 +47,7 @@ type ClientConfig struct {
type ClientConnectConfig struct {
Block bool
FailOnNonTempDialError bool
Timeout time.Duration
Dialer func(context.Context, string) (net.Conn, error)
}

Expand Down Expand Up @@ -88,6 +88,39 @@ type Client struct {
connMu *sync.RWMutex
}

// NewDefaultClientConfigForTestRun creates a new default client config for a test run.
func NewDefaultClientConfigForTestRun(ingesterHost, authToken string, testRunID int64) ClientConfig {
return ClientConfig{
IngesterHost: ingesterHost,
Timeout: 90 * time.Second,
ConnectConfig: ClientConnectConfig{
Block: false,
FailOnNonTempDialError: false,
Timeout: 10 * time.Second,
Dialer: nil,
},
AuthConfig: ClientAuthConfig{
Enabled: true,
TestRunID: testRunID,
Token: authToken,
RequireTransportSecurity: true,
},
TLSConfig: ClientTLSConfig{
Insecure: false,
},
RetryConfig: ClientRetryConfig{
RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`,
MaxAttempts: 3,
PerRetryTimeout: 30 * time.Second,
BackoffConfig: ClientBackoffConfig{
Enabled: true,
JitterFraction: 0.1,
WaitBetween: 1 * time.Second,
},
},
}
}

// NewClient creates a new client.
func NewClient(cfg ClientConfig) *Client {
return &Client{
Expand All @@ -112,6 +145,8 @@ func (c *Client) Dial(ctx context.Context) error {
return fmt.Errorf("failed to create dial options: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.ConnectConfig.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, c.cfg.IngesterHost, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
Expand All @@ -132,9 +167,6 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return ErrClientClosed
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout.TimeDuration())
defer cancel()

if len(requestMetadatas) < 1 {
return nil
}
Expand All @@ -144,6 +176,8 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return fmt.Errorf("failed to create request from request metadatas: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
_, err = c.client.BatchCreateRequestMetadatas(ctx, req)
if err != nil {
st := status.Convert(err)
Expand Down
44 changes: 22 additions & 22 deletions cloudapi/insights/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.k6.io/k6/lib/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -92,8 +91,8 @@ func TestClient_Dial_ReturnsNoErrorWithWorkingDialer(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -114,8 +113,8 @@ func TestClient_Dial_ReturnsErrorWhenCalledTwice(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -138,6 +137,7 @@ func TestClient_Dial_ReturnsNoErrorWithFailingDialer(t *testing.T) {
ConnectConfig: ClientConnectConfig{
Block: true,
FailOnNonTempDialError: true,
Timeout: 1 * time.Second,
Dialer: func(ctx context.Context, s string) (net.Conn, error) {
return nil, &fatalError{}
},
Expand All @@ -163,7 +163,7 @@ func TestClient_Dial_ReturnsErrorWithoutRetryableStatusCodes(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
}
Expand All @@ -184,7 +184,7 @@ func TestClient_Dial_ReturnsErrorWithInvalidRetryableStatusCodes(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: "RANDOM,INTERNAL"},
Expand All @@ -206,8 +206,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -231,8 +231,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -272,8 +272,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithWorkingServerAndCanc
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithUninitializedClient(
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
}
Expand Down Expand Up @@ -341,8 +341,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithFailingServerAndNonC
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -373,8 +373,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorAfterRetrySeveralTimes
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{
MaxAttempts: 20,
Expand Down Expand Up @@ -422,8 +422,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorAfterExhaustingMaxRetryA
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{
BackoffConfig: ClientBackoffConfig{
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedOnce(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -491,7 +491,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedTwice(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down
35 changes: 8 additions & 27 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
insightsOutput "go.k6.io/k6/output/cloud/insights"
Expand Down Expand Up @@ -125,34 +124,14 @@ func (o *Output) Start() error {
}
o.requestMetadatasCollector = insightsOutput.NewCollector(testRunID)

insightsClientConfig := insights.ClientConfig{
IngesterHost: o.config.TracesHost.String,
Timeout: types.NewNullDuration(90*time.Second, false),
AuthConfig: insights.ClientAuthConfig{
Enabled: true,
TestRunID: testRunID,
Token: o.config.Token.String,
RequireTransportSecurity: true,
},
TLSConfig: insights.ClientTLSConfig{
Insecure: false,
},
RetryConfig: insights.ClientRetryConfig{
RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`,
MaxAttempts: 3,
PerRetryTimeout: 30 * time.Second,
BackoffConfig: insights.ClientBackoffConfig{
Enabled: true,
JitterFraction: 0.1,
WaitBetween: 1 * time.Second,
},
},
}
insightsClientConfig := insights.NewDefaultClientConfigForTestRun(
o.config.TracesHost.String,
o.config.Token.String,
testRunID,
)
insightsClient := insights.NewClient(insightsClientConfig)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := insightsClient.Dial(ctx); err != nil {
if err := insightsClient.Dial(context.Background()); err != nil {
return err
}

Expand Down Expand Up @@ -319,6 +298,8 @@ func (o *Output) flushRequestMetadatas() {
err := o.requestMetadatasFlusher.Flush()
if err != nil {
o.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud")

return
}

o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud")
Expand Down
Loading

0 comments on commit 703970e

Please sign in to comment.