Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: ensure transports are closed when the channel enters IDLE #6620

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.cancel()
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)

// We have to defer here because GracefulClose => Close => onClose, which
// requires locking ac.mu.
// We have to defer here because GracefulClose => onClose, which requires
// locking ac.mu.
if ac.transport != nil {
defer ac.transport.GracefulClose()
ac.transport = nil
Expand Down Expand Up @@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) {
ac.updateConnectivityState(connectivity.Shutdown, nil)
ac.cancel()
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
ac.mu.Unlock()
curTr.GracefulClose()
ac.mu.Lock()
}

channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
Expand All @@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) {
// being deleted right away.
channelz.RemoveEntry(ac.channelzID)
ac.mu.Unlock()

// We have to release the lock before the call to GracefulClose/Close here
// because both of them call onClose(), which requires locking ac.mu.
if curTr != nil {
if err == errConnDrain {
// Close the transport gracefully when the subConn is being shutdown.
//
// GracefulClose() may be executed multiple times if:
// - multiple GoAway frames are received from the server
// - there are concurrent name resolver or balancer triggered
// address removal and GoAway
curTr.GracefulClose()
} else {
// Hard close the transport when the channel is entering idle or is
// being shutdown. In the case where the channel is being shutdown,
// closing of transports is also taken care of by cancelation of cc.ctx.
// But in the case where the channel is entering idle, we need to
// explicitly close the transports here. Instead of distinguishing
// between these two cases, it is simpler to close the transport
// unconditionally here.
curTr.Close(err)
}
}
}

func (ac *addrConn) getState() connectivity.State {
Expand Down
18 changes: 16 additions & 2 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
}

// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
// the connection to the backend is closed.
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
Expand All @@ -159,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
t.Cleanup(func() { cc.Close() })

// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

Expand All @@ -168,13 +170,25 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Retrieve the wrapped conn from the listener.
v, err := lis.NewConnCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to retrieve conn from test listener: %v", err)
}
conn := v.(*testutils.ConnWrapper)

// Verify that the ClientConn moves to IDLE as there is no activity.
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Verify idleness related channelz events.
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}

// Verify that the previously open connection is closed.
if _, err := conn.CloseCh.Receive(ctx); err != nil {
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
}
}

// Tests the case where channel idleness is enabled by passing a small value for
Expand Down