Skip to content

Commit

Permalink
client, server: update dial/server buffer options to support a "disab…
Browse files Browse the repository at this point in the history
…le" setting (#2147)
  • Loading branch information
MakMukhi authored Jun 27, 2018
1 parent f1ab7ac commit 3ec535a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 28 deletions.
30 changes: 24 additions & 6 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,20 @@ type dialOptions struct {
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)

func defaultDialOptions() dialOptions {
return dialOptions{
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
},
}
}

// RegisterChannelz turns on channelz service.
// This is an EXPERIMENTAL API.
func RegisterChannelz() {
Expand All @@ -141,8 +153,11 @@ func WithWaitForHandshake() DialOption {
}
}

// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// WithWriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
// The default value for this buffer is 32KB.
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WithWriteBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.WriteBufferSize = s
Expand All @@ -151,6 +166,9 @@ func WithWriteBufferSize(s int) DialOption {

// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for each read syscall.
// The default value for this buffer is 32KB
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func WithReadBufferSize(s int) DialOption {
return func(o *dialOptions) {
o.copts.ReadBufferSize = s
Expand Down Expand Up @@ -458,10 +476,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),

target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
Expand Down
12 changes: 10 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,18 @@ var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}

// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)

// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
// WriteBufferSize determines how much data can be batched before doing a write on the wire.
// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
// The default value for this buffer is 32KB.
// Zero will disable the write buffer such that each write will be on underlying connection.
// Note: A Send call may not directly translate to a write.
func WriteBufferSize(s int) ServerOption {
return func(o *options) {
o.writeBufferSize = s
Expand All @@ -156,6 +161,9 @@ func WriteBufferSize(s int) ServerOption {

// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
// for one read syscall.
// The default value for this buffer is 32KB.
// Zero will disable read buffer for a connection so data framer can access the underlying
// conn directly.
func ReadBufferSize(s int) ServerOption {
return func(o *options) {
o.readBufferSize = s
Expand Down
84 changes: 84 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6340,3 +6340,87 @@ func testRPCTimeout(t *testing.T, e env) {
cancel()
}
}

func TestDisabledIOBuffers(t *testing.T) {
defer leakcheck.Check(t)

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
if err != nil {
t.Fatalf("Failed to create payload: %v", err)
}
req := &testpb.StreamingOutputCallRequest{
Payload: payload,
}
resp := &testpb.StreamingOutputCallResponse{
Payload: payload,
}

ss := &stubServer{
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
return err
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
return err
}
if err := stream.Send(resp); err != nil {
t.Errorf("stream.Send(_)= %v, want <nil>", err)
return err
}

}
},
}

s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
testpb.RegisterTestServiceServer(s, ss)

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}

done := make(chan struct{})
go func() {
s.Serve(lis)
close(done)
}()
defer s.Stop()
dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer dcancel()
cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
if err != nil {
t.Fatalf("Failed to dial server")
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false))
if err != nil {
t.Fatalf("Failed to send test RPC to server")
}
for i := 0; i < 10; i++ {
if err := stream.Send(req); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
in, err := stream.Recv()
if err != nil {
t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
}
if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
}
}
stream.CloseSend()
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
}
}
10 changes: 2 additions & 8 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
writeBufSize = opts.WriteBufferSize
}
readBufSize := defaultReadBufSize
if opts.ReadBufferSize > 0 {
readBufSize = opts.ReadBufferSize
}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
Expand Down
10 changes: 2 additions & 8 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,8 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
writeBufSize := defaultWriteBufSize
if config.WriteBufferSize > 0 {
writeBufSize = config.WriteBufferSize
}
readBufSize := defaultReadBufSize
if config.ReadBufferSize > 0 {
readBufSize = config.ReadBufferSize
}
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
framer := newFramer(conn, writeBufSize, readBufSize)
// Send initial settings as connection preface to client.
var isettings []http2.Setting
Expand Down
15 changes: 11 additions & 4 deletions transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"strconv"
Expand All @@ -43,9 +44,6 @@ const (
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
http2InitHeaderTableSize = 4096
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
Expand Down Expand Up @@ -545,6 +543,9 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
Expand Down Expand Up @@ -578,7 +579,13 @@ type framer struct {
}

func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
r := bufio.NewReaderSize(conn, readBufferSize)
if writeBufferSize < 0 {
writeBufferSize = 0
}
var r io.Reader = conn
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
w := newBufWriter(conn, writeBufferSize)
f := &framer{
writer: w,
Expand Down
5 changes: 5 additions & 0 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,11 @@ func TestMaxConnectionAge(t *testing.T) {
}
}

const (
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
)

// TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{
Expand Down

0 comments on commit 3ec535a

Please sign in to comment.