Skip to content

Commit

Permalink
grpc: skip compression of empty messages (#6842)
Browse files Browse the repository at this point in the history
Fixes #6831.

This avoids compressing messages that are empty, since you can't compress zero
bytes to anything smaller than zero bytes, and most compression algorithms
output headers and trailers which means the resulting message will be non-zero
bytes.
  • Loading branch information
jroper authored Jan 9, 2024
1 parent 7e9d319 commit 3a8270f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
8 changes: 6 additions & 2 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,18 @@ func encode(c baseCodec, msg any) ([]byte, error) {
return b, nil
}

// compress returns the input bytes compressed by compressor or cp. If both
// compressors are nil, returns nil.
// compress returns the input bytes compressed by compressor or cp.
// If both compressors are nil, or if the message has zero length, returns nil,
// indicating no compression was done.
//
// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
if compressor == nil && cp == nil {
return nil, nil
}
if len(in) == 0 {
return nil, nil
}
wrapErr := func(err error) error {
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
}
Expand Down
30 changes: 22 additions & 8 deletions test/compressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,29 @@ func (s) TestSetSendCompressorSuccess(t *testing.T) {
for _, tt := range []struct {
name string
desc string
payload *testpb.Payload
dialOpts []grpc.DialOption
resCompressor string
wantCompressInvokes int32
}{
{
name: "identity_request_and_gzip_response",
desc: "request is uncompressed and response is gzip compressed",
payload: &testpb.Payload{Body: []byte("payload")},
resCompressor: "gzip",
wantCompressInvokes: 1,
},
{
name: "identity_request_and_empty_response",
desc: "request is uncompressed and response is gzip compressed",
payload: nil,
resCompressor: "gzip",
wantCompressInvokes: 0,
},
{
name: "gzip_request_and_identity_response",
desc: "request is gzip compressed and response is uncompressed with identity",
payload: &testpb.Payload{Body: []byte("payload")},
resCompressor: "identity",
dialOpts: []grpc.DialOption{
// Use WithCompressor instead of UseCompressor to avoid counting
Expand All @@ -314,24 +324,26 @@ func (s) TestSetSendCompressorSuccess(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
t.Run("unary", func(t *testing.T) {
testUnarySetSendCompressorSuccess(t, tt.resCompressor, tt.wantCompressInvokes, tt.dialOpts)
testUnarySetSendCompressorSuccess(t, tt.payload, tt.resCompressor, tt.wantCompressInvokes, tt.dialOpts)
})

t.Run("stream", func(t *testing.T) {
testStreamSetSendCompressorSuccess(t, tt.resCompressor, tt.wantCompressInvokes, tt.dialOpts)
testStreamSetSendCompressorSuccess(t, tt.payload, tt.resCompressor, tt.wantCompressInvokes, tt.dialOpts)
})
})
}
}

func testUnarySetSendCompressorSuccess(t *testing.T, resCompressor string, wantCompressInvokes int32, dialOpts []grpc.DialOption) {
func testUnarySetSendCompressorSuccess(t *testing.T, payload *testpb.Payload, resCompressor string, wantCompressInvokes int32, dialOpts []grpc.DialOption) {
wc := setupGzipWrapCompressor(t)
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
if err := grpc.SetSendCompressor(ctx, resCompressor); err != nil {
return nil, err
}
return &testpb.Empty{}, nil
return &testpb.SimpleResponse{
Payload: payload,
}, nil
},
}
if err := ss.Start(nil, dialOpts...); err != nil {
Expand All @@ -342,7 +354,7 @@ func testUnarySetSendCompressorSuccess(t *testing.T, resCompressor string, wantC
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("Unexpected unary call error, got: %v, want: nil", err)
}

Expand All @@ -352,7 +364,7 @@ func testUnarySetSendCompressorSuccess(t *testing.T, resCompressor string, wantC
}
}

func testStreamSetSendCompressorSuccess(t *testing.T, resCompressor string, wantCompressInvokes int32, dialOpts []grpc.DialOption) {
func testStreamSetSendCompressorSuccess(t *testing.T, payload *testpb.Payload, resCompressor string, wantCompressInvokes int32, dialOpts []grpc.DialOption) {
wc := setupGzipWrapCompressor(t)
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
Expand All @@ -364,7 +376,9 @@ func testStreamSetSendCompressorSuccess(t *testing.T, resCompressor string, want
return err
}

return stream.Send(&testpb.StreamingOutputCallResponse{})
return stream.Send(&testpb.StreamingOutputCallResponse{
Payload: payload,
})
},
}
if err := ss.Start(nil, dialOpts...); err != nil {
Expand Down

0 comments on commit 3a8270f

Please sign in to comment.