Skip to content

Commit

Permalink
transport: fix error handling on Stream deletion (grpc#1275)
Browse files Browse the repository at this point in the history
This patch writes client-side error before closing the active stream
to fix blocking `RecvMsg` issue on `grpc.ClientStream` [1].

Previous gRPC client stream just exits on `ClientTransport.Error` [2].
And latest gRPC added another select case on client connection context
cancel [3]. Now when client stream closes from client connection context
cancel, it calls `CloseStream` with `ErrClientConnClosing` error. And then
the stream gets deleted from `*http2Client.activeStreams`, without processing
the error [4]. Then in-flight `RecvMsg` call on this client will block on
`*parser.Reader.recvMsg` [5].

In short,

1. `ClientConn.Close`.
2. in-flight streams will receive case `<-cc.ctx.Done()`
   https://github.com/grpc/grpc-go/blob/master/stream.go#L253-L255.
3. `cs.closeTransportStream(ErrClientConnClosing)` calls `cs.t.CloseStream(cs.s, err)`.
4. `CloseStream(cs.s, err)` calls `delete(t.activeStreams, s.id)`
   without handling the error.
5. in-flight streams will never receive error, left hanging.

I can reproduce in etcd tests with in-flight `recvMsg` calls to `Observe` RPC.

---
[1] etcd-io/etcd#7896 (comment)
[2] https://github.com/grpc/grpc-go/blob/v1.2.x/stream.go#L235-L238
[3] grpc#1136
[4] https://github.com/grpc/grpc-go/blob/master/transport/http2_client.go#L569
[5] https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L280

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho authored and menghanl committed Jun 14, 2017
1 parent de34174 commit e90ffa3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
4 changes: 4 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
t.mu.Unlock()
return
}
if err != nil {
// notify in-flight streams, before the deletion
s.write(recvMsg{err: err})
}
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
Expand Down
37 changes: 37 additions & 0 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,43 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
return tr
}

// TestInflightStreamClosing ensures that closing in-flight stream
// sends StreamError to concurrent stream reader.
func TestInflightStreamClosing(t *testing.T) {
serverConfig := &ServerConfig{}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()

stream, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Client failed to create RPC request: %v", err)
}

donec := make(chan struct{})
serr := StreamError{Desc: "client connection is closing"}
go func() {
defer close(donec)
if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
t.Errorf("unexpected Stream error %v, expected %v", err, serr)
}
}()

// should unblock concurrent stream.Read
client.CloseStream(stream, serr)

// wait for stream.Read error
timeout := time.NewTimer(5 * time.Second)
select {
case <-donec:
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
t.Fatalf("Test timed out, expected a StreamError.")
}
}

// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
// An idle client is one who doesn't make any RPC calls for a duration of
// MaxConnectionIdle time.
Expand Down

0 comments on commit e90ffa3

Please sign in to comment.