Skip to content

Commit

Permalink
Add proper support for 'identity' encoding type (#1664)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Nov 17, 2017
1 parent c1fc296 commit 816fa5b
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 118 deletions.
80 changes: 80 additions & 0 deletions Documentation/compression.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Compression

The preferred method for configuring message compression on both clients and
servers is to use
[`encoding.RegisterCompressor`](https://godoc.org/google.golang.org/grpc/encoding#RegisterCompressor)
to register an implementation of a compression algorithm. See
`grpc/encoding/gzip/gzip.go` for an example of how to implement one.

Once a compressor has been registered on the client-side, RPCs may be sent using
it via the
[`UseCompressor`](https://godoc.org/google.golang.org/grpc#UseCompressor)
`CallOption`. Remember that `CallOption`s may be turned into defaults for all
calls from a `ClientConn` by using the
[`WithDefaultCallOptions`](https://godoc.org/google.golang.org/grpc#WithDefaultCallOptions)
`DialOption`. If `UseCompressor` is used and the corresponding compressor has
not been installed, an `Internal` error will be returned to the application
before the RPC is sent.

Server-side, registered compressors will be used automatically to decode request
messages and encode the responses. Servers currently always respond using the
same compression method specified by the client. If the corresponding
compressor has not been registered, an `Unimplemented` status will be returned
to the client.

## Deprecated API

There is a deprecated API for setting compression as well. It is not
recommended for use. However, if you were previously using it, the following
section may be helpful in understanding how it works in combination with the new
API.

### Client-Side

There are two legacy functions and one new function to configure compression:

```go
func WithCompressor(grpc.Compressor) DialOption {}
func WithDecompressor(grpc.Decompressor) DialOption {}
func UseCompressor(name) CallOption {}
```

For outgoing requests, the following rules are applied in order:
1. If `UseCompressor` is used, messages will be compressed using the compressor
named.
* If the compressor named is not registered, an Internal error is returned
back to the client before sending the RPC.
* If UseCompressor("identity"), no compressor will be used, but "identity"
will be sent in the header to the server.
1. If `WithCompressor` is used, messages will be compressed using that
compressor implementation.
1. Otherwise, outbound messages will be uncompressed.

For incoming responses, the following rules are applied in order:
1. If `WithDecompressor` is used and it matches the message's encoding, it will
be used.
1. If a registered compressor matches the response's encoding, it will be used.
1. Otherwise, the stream will be closed and an `Unimplemented` status error will
be returned to the application.

### Server-Side

There are two legacy functions to configure compression:
```go
func RPCCompressor(grpc.Compressor) ServerOption {}
func RPCDecompressor(grpc.Decompressor) ServerOption {}
```

For incoming requests, the following rules are applied in order:
1. If `RPCDecompressor` is used and that decompressor matches the request's
encoding: it will be used.
1. If a registered compressor matches the request's encoding, it will be used.
1. Otherwise, an `Unimplemented` status will be returned to the client.

For outgoing responses, the following rules are applied in order:
1. If `RPCCompressor` is used, that compressor will be used to compress all
response messages.
1. If compression was used for the incoming request and a registered compressor
supports it, that same compression method will be used for the outgoing
response.
1. Otherwise, no compression will be used for the outgoing response.
29 changes: 22 additions & 7 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,17 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
if c.maxReceiveMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload, encoding.GetCompressor(c.compressorType)); err != nil {

// Set dc if it exists and matches the message compression type used,
// otherwise set comp if a registered compressor exists for it.
var comp encoding.Compressor
var dc Decompressor
if rc := stream.RecvCompress(); dopts.dc != nil && dopts.dc.Type() == rc {
dc = dopts.dc
} else if rc != "" && rc != encoding.Identity {
comp = encoding.GetCompressor(rc)
}
if err = recv(p, dopts.codec, stream, dc, reply, *c.maxReceiveMessageSize, inPayload, comp); err != nil {
if err == io.EOF {
break
}
Expand Down Expand Up @@ -95,10 +105,18 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
Client: true,
}
}
if c.compressorType != "" && encoding.GetCompressor(c.compressorType) == nil {
return Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", c.compressorType)
// Set comp and clear compressor if a registered compressor matches the type
// specified via UseCompressor. (And error if a matching compressor is not
// registered.)
var comp encoding.Compressor
if ct := c.compressorType; ct != "" && ct != encoding.Identity {
compressor = nil // Disable the legacy compressor.
comp = encoding.GetCompressor(ct)
if comp == nil {
return Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", ct)
}
}
hdr, data, err := encode(dopts.codec, args, compressor, outPayload, encoding.GetCompressor(c.compressorType))
hdr, data, err := encode(dopts.codec, args, compressor, outPayload, comp)
if err != nil {
return err
}
Expand Down Expand Up @@ -211,9 +229,6 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Host: cc.authority,
Method: method,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
if c.creds != nil {
callHdr.Creds = c.creds
}
Expand Down
30 changes: 14 additions & 16 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,6 @@ const (
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)

// UseCompressor returns a CallOption which sets the compressor used when sending the request.
// If WithCompressor is set, UseCompressor has higher priority.
// This API is EXPERIMENTAL.
func UseCompressor(name string) CallOption {
return beforeCall(func(c *callInfo) error {
c.compressorType = name
return nil
})
}

// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
func WithWriteBufferSize(s int) DialOption {
Expand Down Expand Up @@ -168,18 +158,26 @@ func WithCodec(c Codec) DialOption {
}
}

// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
// compressor. It has lower priority than the compressor set by RegisterCompressor.
// This function is deprecated.
// WithCompressor returns a DialOption which sets a Compressor to use for
// message compression. It has lower priority than the compressor set by
// the UseCompressor CallOption.
//
// Deprecated: use UseCompressor instead.
func WithCompressor(cp Compressor) DialOption {
return func(o *dialOptions) {
o.cp = cp
}
}

// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
// message decompressor. It has higher priority than the decompressor set by RegisterCompressor.
// This function is deprecated.
// WithDecompressor returns a DialOption which sets a Decompressor to use for
// incoming message decompression. If incoming response messages are encoded
// using the decompressor's Type(), it will be used. Otherwise, the message
// encoding will be used to look up the compressor registered via
// encoding.RegisterCompressor, which will then be used to decompress the
// message. If no compressor is registered for the encoding, an Unimplemented
// status error will be returned.
//
// Deprecated: use encoding.RegisterCompressor instead.
func WithDecompressor(dc Decompressor) DialOption {
return func(o *dialOptions) {
o.dc = dc
Expand Down
4 changes: 4 additions & 0 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,7 @@ func RegisterCompressor(c Compressor) {
func GetCompressor(name string) Compressor {
return registerCompressor[name]
}

// Identity specifies the optional encoding for uncompressed streams.
// It is intended for grpc internal use only.
const Identity = "identity"
36 changes: 27 additions & 9 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,18 @@ func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
})
}

// UseCompressor returns a CallOption which sets the compressor used when
// sending the request. If WithCompressor is also set, UseCompressor has
// higher priority.
//
// This API is EXPERIMENTAL.
func UseCompressor(name string) CallOption {
return beforeCall(func(c *callInfo) error {
c.compressorType = name
return nil
})
}

// The format of the payload: compressed or not?
type payloadFormat uint8

Expand Down Expand Up @@ -359,32 +371,38 @@ func encode(c Codec, msg interface{}, cp Compressor, outPayload *stats.OutPayloa
return bufHeader, b, nil
}

func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
switch pf {
case compressionNone:
case compressionMade:
if (dc == nil || recvCompress != dc.Type()) && encoding.GetCompressor(recvCompress) == nil {
return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
if recvCompress == "" || recvCompress == encoding.Identity {
return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
}
if !haveCompressor {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}

// TODO(ddyihai): eliminate extra Compressor parameter.
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int,
inPayload *stats.InPayload, compressor encoding.Compressor) error {
// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return err
}
if inPayload != nil {
inPayload.WireLength = len(d)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err

if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return st.Err()
}

if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
Expand Down
Loading

0 comments on commit 816fa5b

Please sign in to comment.