Skip to content

Commit

Permalink
http2: limit number of PINGs bundled with RST_STREAMs
Browse files Browse the repository at this point in the history
gRPC has an unfortunate behavior of stictly rate limiting
the number of PING frames that it will receive. The default is
two PING frames every two hours when no requests are in flight;
two PING frames every five minutes when a request is in flight;
and the limit resets every time the gRPC endpoint sends a
HEADERS or DATA frame.

When sending a RST_STREAM frame, the Transport can bundle a PING
frame with it to confirm the server is responding. When canceling
several requests in succession, this can result in hitting the
gRPC ping limit.

Work around this gRPC behavior by sending at most one bundled
PING per HEADERS or DATA  frame received. We already limit
ourselves to one PING in flight at a time; now, when we receive
a PING response, disable sending additional bundled PINGs
until we read a HEADERS/DATA frame.

This does not affect keep-alive pings.

Fixes golang/go#70575.

Change-Id: I7c4003039bd2dc52106b2806ca31eeeee37b7e09
Reviewed-on: https://go-review.googlesource.com/c/net/+/632995
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Auto-Submit: Damien Neil <dneil@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
  • Loading branch information
neild authored and gopherbot committed Dec 2, 2024
1 parent e9cd716 commit bc37675
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 9 deletions.
46 changes: 37 additions & 9 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,16 @@ type ClientConn struct {
pingTimeout time.Duration
extendedConnectAllowed bool

// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
// gRPC strictly limits the number of PING frames that it will receive.
// The default is two pings per two hours, but the limit resets every time
// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
//
// rstStreamPingsBlocked is set after receiving a response to a PING frame
// bundled with an RST_STREAM (see pendingResets below), and cleared after
// receiving a HEADERS or DATA frame.
rstStreamPingsBlocked bool

// pendingResets is the number of RST_STREAM frames we have sent to the peer,
// without confirming that the peer has received them. When we send a RST_STREAM,
// we bundle it with a PING frame, unless a PING is already in flight. We count
Expand Down Expand Up @@ -1738,10 +1748,14 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
ping := false
if !closeOnIdle {
cc.mu.Lock()
if cc.pendingResets == 0 {
ping = true
// rstStreamPingsBlocked works around a gRPC behavior:
// see comment on the field for details.
if !cc.rstStreamPingsBlocked {
if cc.pendingResets == 0 {
ping = true
}
cc.pendingResets++
}
cc.pendingResets++
cc.mu.Unlock()
}
cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
Expand Down Expand Up @@ -2489,7 +2503,7 @@ func (rl *clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
if cs := rl.streamByID(se.StreamID); cs != nil {
if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
Expand Down Expand Up @@ -2544,7 +2558,7 @@ func (rl *clientConnReadLoop) run() error {
}

func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
cs := rl.streamByID(f.StreamID)
cs := rl.streamByID(f.StreamID, headerOrDataFrame)
if cs == nil {
// We'd get here if we canceled a request while the
// server had its response still in flight. So if this
Expand Down Expand Up @@ -2873,7 +2887,7 @@ func (b transportResponseBody) Close() error {

func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc := rl.cc
cs := rl.streamByID(f.StreamID)
cs := rl.streamByID(f.StreamID, headerOrDataFrame)
data := f.Data()
if cs == nil {
cc.mu.Lock()
Expand Down Expand Up @@ -3008,9 +3022,22 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
cs.abortStream(err)
}

func (rl *clientConnReadLoop) streamByID(id uint32) *clientStream {
// Constants passed to streamByID for documentation purposes.
const (
headerOrDataFrame = true
notHeaderOrDataFrame = false
)

// streamByID returns the stream with the given id, or nil if no stream has that id.
// If headerOrData is true, it clears rst.StreamPingsBlocked.
func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
rl.cc.mu.Lock()
defer rl.cc.mu.Unlock()
if headerOrData {
// Work around an unfortunate gRPC behavior.
// See comment on ClientConn.rstStreamPingsBlocked for details.
rl.cc.rstStreamPingsBlocked = false
}
cs := rl.cc.streams[id]
if cs != nil && !cs.readAborted {
return cs
Expand Down Expand Up @@ -3145,7 +3172,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {

func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
cc := rl.cc
cs := rl.streamByID(f.StreamID)
cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
if f.StreamID != 0 && cs == nil {
return nil
}
Expand Down Expand Up @@ -3174,7 +3201,7 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
}

func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
cs := rl.streamByID(f.StreamID)
cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
if cs == nil {
// TODO: return error if server tries to RST_STREAM an idle stream
return nil
Expand Down Expand Up @@ -3252,6 +3279,7 @@ func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
if cc.pendingResets > 0 {
// See clientStream.cleanupWriteRequest.
cc.pendingResets = 0
cc.rstStreamPingsBlocked = true
cc.cond.Broadcast()
}
return nil
Expand Down
71 changes: 71 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5562,13 +5562,84 @@ func TestTransportSendPingWithReset(t *testing.T) {
tc.wantFrameType(FrameHeaders)
tc.wantIdle()

// Receive a byte of data for the remaining stream, which resets our ability
// to send pings (see comment on ClientConn.rstStreamPingsBlocked).
tc.writeData(rts[2].streamID(), false, []byte{0})

// Cancel the last request. We send another PING, since none are in flight.
rts[2].response().Body.Close()
tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel)
tc.wantFrameType(FramePing)
tc.wantIdle()
}

// Issue #70505: gRPC gets upset if we send more than 2 pings per HEADERS/DATA frame
// sent by the server.
func TestTransportSendNoMoreThanOnePingWithReset(t *testing.T) {
tc := newTestClientConn(t)
tc.greet()

makeAndResetRequest := func() {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
req := must(http.NewRequestWithContext(ctx, "GET", "https://dummy.tld/", nil))
rt := tc.roundTrip(req)
tc.wantFrameType(FrameHeaders)
cancel()
tc.wantRSTStream(rt.streamID(), ErrCodeCancel) // client sends RST_STREAM
}

// Create a request and cancel it.
// The client sends a PING frame along with the reset.
makeAndResetRequest()
pf1 := readFrame[*PingFrame](t, tc) // client sends PING

// Create another request and cancel it.
// We do not send a PING frame along with the reset,
// because we haven't received a HEADERS or DATA frame from the server
// since the last PING we sent.
makeAndResetRequest()

// Server belatedly responds to request 1.
// The server has not responded to our first PING yet.
tc.writeHeaders(HeadersFrameParam{
StreamID: 1,
EndHeaders: true,
EndStream: true,
BlockFragment: tc.makeHeaderBlockFragment(
":status", "200",
),
})

// Create yet another request and cancel it.
// We still do not send a PING frame along with the reset.
// We've received a HEADERS frame, but it came before the response to the PING.
makeAndResetRequest()

// The server responds to our PING.
tc.writePing(true, pf1.Data)

// Create yet another request and cancel it.
// Still no PING frame; we got a response to the previous one,
// but no HEADERS or DATA.
makeAndResetRequest()

// Server belatedly responds to the second request.
tc.writeHeaders(HeadersFrameParam{
StreamID: 3,
EndHeaders: true,
EndStream: true,
BlockFragment: tc.makeHeaderBlockFragment(
":status", "200",
),
})

// One more request.
// This time we send a PING frame.
makeAndResetRequest()
tc.wantFrameType(FramePing)
}

func TestTransportConnBecomesUnresponsive(t *testing.T) {
// We send a number of requests in series to an unresponsive connection.
// Each request is canceled or times out without a response.
Expand Down

0 comments on commit bc37675

Please sign in to comment.