Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idle: decrement active call count for streaming RPCs only when the call completes #6610

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.OnCallEnd()

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
Expand Down
170 changes: 102 additions & 68 deletions internal/idle/idle_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -179,80 +180,113 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-blockCh
return &testpb.Empty{}, nil
tests := []struct {
name string
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
}{
{
name: "unary",
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
return fmt.Errorf("EmptyCall RPC failed: %v", err)
}
return nil
},
},
{
name: "streaming",
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
stream, err := client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("FullDuplexCall RPC failed: %v", err)
}
if _, err := stream.Recv(); err != nil && err != io.EOF {
t.Fatalf("stream.Recv() failed: %v", err)
}
return nil
dfawley marked this conversation as resolved.
Show resolved Hide resolved
},
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
// the server RPC handler and the unary call below.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
// Verify that the ClientConn stays in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)

// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
errCh <- err
return
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-blockCh
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
<-blockCh
return nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)

// Verify that the ClientConn stays in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
if cc.WaitForStateChange(sCtx, connectivity.Ready) {
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
return
}
Comment on lines +261 to +264
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you simplify this test a good amount, remove the errCh, and go back to the old testutils.AwaitNoStateChange, by performing the RPC in a goroutine instead of doing this block in a goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move the RPC to a goroutine and perform the rest of the checks here, there seems to be a race between how the RPC terminates:

  • the blockCh getting closes, and thereby a normal termination
  • the context getting cancelled

So, I was having to do way too many checks in different places. The way it currently exists seems easier for error handling.


// Unblock the unary RPC on the server.
errCh <- nil
}()
// Verify that there are no idleness related channelz events.
//
// TODO: Improve the checks here. If these log strings are
// changed in the code, these checks will continue to pass.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
}
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
}()

// Make a unary RPC that blocks on the server, thereby ensuring that the
// count of active RPCs on the client is non-zero.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall RPC failed: %v", err)
}
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
t.Fatalf("%s rpc failed: %v", test.name, err)
}

select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
}
})
}
}

Expand Down
15 changes: 10 additions & 5 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ type ClientStream interface {
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
defer cc.idlenessMgr.OnCallEnd()

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
Expand All @@ -179,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
// Start tracking the RPC for idleness purposes. This is where a stream is
// created for both streaming and unary RPCs, and hence is a good place to
// track active RPC count.
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
// Add a calloption, to decrement the active call count, that gets executed
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)

if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
// validate md
if err := imetadata.Validate(md); err != nil {
Expand Down