Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add discardResponseMessage option for gRPC client #3820

Merged
13 changes: 7 additions & 6 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,13 @@ func (c *Client) buildInvokeRequest(
p.SetSystemTags(state, c.addr, method)

return grpcext.InvokeRequest{
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
DiscardResponseMessage: p.DiscardResponseMessage,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
}, nil
}

Expand Down
69 changes: 69 additions & 0 deletions js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,19 @@ func TestClient(t *testing.T) {
client.invoke("grpc.testing.TestService/EmptyCall", {}, { timeout: 2000 })`,
},
},
{
name: "InvokeDiscardResponseMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true })`,
},
},
{
name: "Invoke",
initString: codeBlock{code: `
Expand All @@ -333,6 +346,32 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "InvokeDiscardResponseMessage",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
var resp = client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true })
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
if (resp.message !== null) {
throw new Error("unexpected message: " + JSON.stringify(resp.message))
}`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "AsyncInvoke",
initString: codeBlock{code: `
Expand Down Expand Up @@ -360,6 +399,36 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncInvokeDiscardResponseMessage",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true }).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
if (resp.message !== null) {
throw new Error("unexpected message: " + JSON.stringify(resp.message))
}
}, (err) => {
throw new Error("unexpected error: " + err)
})
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "InvokeAnyProto",
initString: codeBlock{code: `
Expand Down
9 changes: 6 additions & 3 deletions js/modules/k6/grpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
// callParams is the parameters that can be passed to a gRPC calls
// like invoke or newStream.
type callParams struct {
Metadata metadata.MD
TagsAndMeta metrics.TagsAndMeta
Timeout time.Duration
Metadata metadata.MD
TagsAndMeta metrics.TagsAndMeta
Timeout time.Duration
DiscardResponseMessage bool
}

// newCallParams constructs the call parameters from the input value.
Expand Down Expand Up @@ -58,6 +59,8 @@ func newCallParams(vu modules.VU, input sobek.Value) (*callParams, error) {
if err != nil {
return result, fmt.Errorf("invalid timeout value: %w", err)
}
case "discardResponseMessage":
result.DiscardResponseMessage = params.Get(k).ToBoolean()
default:
return result, fmt.Errorf("unknown param: %q", k)
}
Expand Down
41 changes: 41 additions & 0 deletions js/modules/k6/grpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,47 @@ func TestCallParamsTimeOutParse(t *testing.T) {
}
}

func TestCallParamsDiscardResponseMessageParse(t *testing.T) {
t.Parallel()

testCases := []struct {
Name string
JSON string
DiscardResponseMessage bool
}{
{
Name: "Empty",
JSON: `{}`,
DiscardResponseMessage: false,
},
{
Name: "DiscardResponseMessageFalse",
JSON: `{ discardResponseMessage: false }`,
DiscardResponseMessage: false,
},
{
Name: "DiscardResponseMessageTrue",
JSON: `{ discardResponseMessage: true }`,
DiscardResponseMessage: true,
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.Name, func(t *testing.T) {
t.Parallel()

testRuntime, params := newParamsTestRuntime(t, tc.JSON)

p, err := newCallParams(testRuntime.VU, params)
require.NoError(t, err)

assert.Equal(t, tc.DiscardResponseMessage, p.DiscardResponseMessage)
})
}
}

// newParamsTestRuntime creates a new test runtime
// that could be used to test the params
// it also moves to the VU context and creates the params
Expand Down
26 changes: 18 additions & 8 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/emptypb"
)

var emptyDescriptor = (&emptypb.Empty{}).ProtoReflect().Descriptor() //nolint:gochecknoglobals
lzakharov marked this conversation as resolved.
Show resolved Hide resolved

// InvokeRequest represents a unary gRPC request.
type InvokeRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Message []byte
Metadata metadata.MD
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
DiscardResponseMessage bool
Message []byte
Metadata metadata.MD
}

// InvokeResponse represents a gRPC response.
Expand Down Expand Up @@ -133,7 +137,13 @@ func (c *Conn) Invoke(

ctx = withRPCState(ctx, &rpcState{tagsAndMeta: req.TagsAndMeta})

resp := dynamicpb.NewMessage(req.MethodDescriptor.Output())
var resp *dynamicpb.Message
if req.DiscardResponseMessage {
resp = dynamicpb.NewMessage(emptyDescriptor)
} else {
resp = dynamicpb.NewMessage(req.MethodDescriptor.Output())
}

header, trailer := metadata.New(nil), metadata.New(nil)

copts := make([]grpc.CallOption, 0, len(opts)+2)
Expand Down Expand Up @@ -165,7 +175,7 @@ func (c *Conn) Invoke(
response.Error = errMsg
}

if resp != nil {
if resp != nil && !req.DiscardResponseMessage {
msg, err := convert(marshaler, resp)
if err != nil {
return nil, fmt.Errorf("unable to convert response object to JSON: %w", err)
Expand Down
22 changes: 22 additions & 0 deletions lib/netext/grpcext/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ func TestInvokeWithCallOptions(t *testing.T) {
assert.NotNil(t, res)
}

func TestInvokeWithDiscardResponseMessage(t *testing.T) {
t.Parallel()

reply := func(_, _ *dynamicpb.Message, opts ...grpc.CallOption) error {
assert.Len(t, opts, 3) // two by default plus one injected
return nil
}

c := Conn{raw: invokemock(reply)}
r := InvokeRequest{
Method: "/hello.HelloService/NoOp",
MethodDescriptor: methodFromProto("NoOp"),
DiscardResponseMessage: true,
Message: []byte(`{}`),
Metadata: metadata.New(nil),
}
res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone"))
require.NoError(t, err)
assert.NotNil(t, res)
assert.Nil(t, res.Message)
}

func TestInvokeReturnError(t *testing.T) {
t.Parallel()

Expand Down