Skip to content

Commit

Permalink
server: Support dynamic max recv message size limit
Browse files Browse the repository at this point in the history
This adds alternative to `MaxRecvMsgSize` for max message size varying
in app runtime. New option allows to tune the setting w/o server
recreation.
  • Loading branch information
cthulhu-rider committed Aug 8, 2024
1 parent d715b2e commit 00d000d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 43 deletions.
8 changes: 4 additions & 4 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (s) TestAddGlobalServerOptions(t *testing.T) {

// Ensure the extra server options applies to new servers
s := NewServer()
if s.opts.maxReceiveMessageSize != maxRecvSize {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
if s.opts.maxReceiveMessageSizeFunc() != maxRecvSize {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSizeFunc(), maxRecvSize)
}

internal.ClearGlobalServerOptions()
Expand Down Expand Up @@ -125,8 +125,8 @@ func (s) TestJoinServerOption(t *testing.T) {
const initialWindowSize = 100
jso := newJoinServerOption(Creds(insecure.NewCredentials()), MaxRecvMsgSize(maxRecvSize), InitialWindowSize(initialWindowSize))
s := NewServer(jso)
if s.opts.maxReceiveMessageSize != maxRecvSize {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSize, maxRecvSize)
if s.opts.maxReceiveMessageSizeFunc() != maxRecvSize {
t.Fatalf("Unexpected s.opts.maxReceiveMessageSize: %d != %d", s.opts.maxReceiveMessageSizeFunc(), maxRecvSize)
}
if s.opts.initialWindowSize != initialWindowSize {
t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize)
Expand Down
85 changes: 46 additions & 39 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,44 +145,44 @@ type Server struct {
}

type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
waitForHandlers bool
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSizeFunc func() int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
waitForHandlers bool
}

var defaultServerOptions = serverOptions{
maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
recvBufferPool: nopBufferPool{},
maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSizeFunc: func() int { return defaultServerMaxReceiveMessageSize },
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
recvBufferPool: nopBufferPool{},
}
var globalServerOptions []ServerOption

Expand Down Expand Up @@ -384,10 +384,17 @@ func MaxMsgSize(m int) ServerOption {
}

// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default 4MB.
// If this is not set, gRPC uses the default 4MB. Overlaps [MaxRecvMsgSizeFunc].
func MaxRecvMsgSize(m int) ServerOption {
return MaxRecvMsgSizeFunc(func() int { return m })
}

// MaxRecvMsgSizeFunc returns a ServerOption to set the dynamic max message size
// in bytes the server can receive. If this is not set, gRPC uses the default
// 4MB. Overlaps [MaxRecvMsgSize].
func MaxRecvMsgSizeFunc(f func() int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.maxReceiveMessageSize = m
o.maxReceiveMessageSizeFunc = f
})
}

Expand Down Expand Up @@ -1342,7 +1349,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSizeFunc(), payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
Expand Down Expand Up @@ -1554,7 +1561,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran
s: stream,
p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxReceiveMessageSize: s.opts.maxReceiveMessageSizeFunc(),
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: shs,
Expand Down

0 comments on commit 00d000d

Please sign in to comment.