Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync.Pool to share per-connection transport write buffer. #6309

Merged
merged 19 commits into from
Jul 20, 2023
Merged
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0f1058d
Release write buffer after Flush() in transport
s-matyukevich May 15, 2023
63f4ba3
Use sync.Pool
s-matyukevich May 15, 2023
4faff38
Use interface{} instead of any
s-matyukevich May 15, 2023
f594b29
use pool map
s-matyukevich May 16, 2023
ec75cb1
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 16, 2023
95bde71
fix flush()
s-matyukevich May 16, 2023
a243c24
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 17, 2023
a7ededc
Fix linter error
s-matyukevich May 17, 2023
16345bb
make variable names more descripptive
s-matyukevich May 23, 2023
c8036d0
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 23, 2023
09407f4
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 24, 2023
0b42642
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jun 28, 2023
e1bdb20
Add options to enable write buffer sharing
s-matyukevich Jun 28, 2023
9b97327
delete unsed variable
s-matyukevich Jun 28, 2023
d274f35
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jul 12, 2023
00e8de7
address review comments
s-matyukevich Jul 12, 2023
9aeb148
fix linter error
s-matyukevich Jul 13, 2023
195a467
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jul 20, 2023
82a400e
Add Experimental tag, rename flush to flushKeepBuffer
s-matyukevich Jul 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -309,18 +310,19 @@ func decodeGrpcMessageUnchecked(msg string) string {
}

type bufWriter struct {
pool *sync.Pool
buf []byte
offset int
batchSize int
conn net.Conn
err error
}

func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
return &bufWriter{
buf: make([]byte, batchSize*2),
batchSize: batchSize,
conn: conn,
pool: pool,
}
}

Expand All @@ -332,19 +334,33 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
n, err = w.conn.Write(b)
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
w.buf = *b
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
nn := copy((w.buf)[w.offset:], b)
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
err = w.Flush()
err = w.flush()
}
}
return n, err
}

func (w *bufWriter) Flush() error {
err := w.flush()
if w.buf != nil {
b := w.buf
w.pool.Put(&b)
easwars marked this conversation as resolved.
Show resolved Hide resolved
w.buf = nil
}
return err
}

func (w *bufWriter) flush() error {
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved
if w.err != nil {
return w.err
}
Expand Down Expand Up @@ -381,6 +397,9 @@ type framer struct {
fr *http2.Framer
}

var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex

func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
Expand All @@ -389,7 +408,20 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
w := newBufWriter(conn, writeBufferSize)
writeBufferMutex.Lock()
size := writeBufferSize * 2
pool, ok := writeBufferPoolMap[size]
if !ok {
pool = &sync.Pool{
New: func() interface{} {
b := make([]byte, size)
return &b
},
}
writeBufferPoolMap[size] = pool
}
writeBufferMutex.Unlock()
Copy link
Contributor Author

@s-matyukevich s-matyukevich May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, we can get rid of this mutex and pool map if we store sync.Pool per ClientConnection and per server. However, this will require modifying a lot of internal interface to pass the pool all the way down to the transport layer. This also will prevent us from sharing the pool between different ClientConnections and servers, so I decided to take the mutex approach.

As far as I understand, this mutex shouldn't really impact the performance, as it got called only on new connection creation, which happens not very often in http2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do entries from this map ever get deleted? Can this map grow in a unbounded fashion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now - never. In theory it can grow unbounded if the app creates multiple servers and/or ClientConnections with different buffer sizes. I can't think about any use-case where this could be useful, but it is possible. I am not sure if it worth adding more complicated logic to protect against this, because my understanding is that go runtime will eventually deallocate all memory from unused pools, so we will be waisting memory only on storing pointers to empty pools, which should be neglectable in practise.

Still I can fix this if you prefer:

  • I can add a goroutine that removes unused pools. I'll probably need to store something like lastModifedTime for each pool to do that. The downsides are some additional complexity + the fact that we need to take the lock in the goroutine.
  • I can store a pool per Server and ClientConnection and propagate it all the way down to the transport layer. The downsides are the fact that we need to extend some internal interfaces + we won't be able to reuse pools between different connections that uses the same buffer size (which I assume is a very common use-case) But in this case we can get rid of the lock completely.

Do you have any other ideas? Let me know what I should do here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it can grow unbounded if the app creates multiple servers and/or ClientConnections with different buffer sizes. I can't think about any use-case where this could be useful, but it is possible.

I agree. I don't expect applications to create multiple grpc.ClientConns or grpc.Servers and pass them different values for the send and receive buffer sizes. But given that we are going to have a dial option and a server option to enable the sync.Pool on the client and server respectively, we could document it there, just so no one complains saying a few bytes are leaked.

@dfawley : Thoughts/Objections?

w := newBufWriter(conn, writeBufferSize, pool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
Expand Down