Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: update client state subscriber test to be not flaky and more stressful about rapid updates #6512

Merged
merged 8 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions test/clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -547,3 +550,82 @@ func awaitNoStateChange(ctx context.Context, t *testing.T, cc *grpc.ClientConn,
t.Fatalf("State changed from %q to %q when no state change was expected", currState, cc.GetState())
}
}

type testConnectivityStateSubscriber struct {
onMsgCh chan connectivity.State
}

func (ts *testConnectivityStateSubscriber) OnMessage(msg interface{}) {
ts.onMsgCh <- msg.(connectivity.State)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if msg.(connectivity.State) == connectivity.Shutdown {
close(ts.onMsgCh)
}
}

// TestConnectivityStateSubscriber confirms updates sent by the balancer in
// rapid succession are not missed by the subscriber.
func (s) TestConnectivityStateSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

sendStates := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
connectivity.Idle,
connectivity.Connecting,
connectivity.Ready,
}
wantStates := append(sendStates, connectivity.Shutdown)

const testBalName = "any"
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
// Send the expected states in rapid succession.
for _, s := range sendStates {
t.Logf("Sending state update %s", s)
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
}
return nil
},
}
stub.Register(testBalName, bf)

// Create the ClientConn
const testResName = "any"
rb := manual.NewBuilderWithScheme(testResName)
cc, err := grpc.Dial(testResName+":///",
grpc.WithResolvers(rb),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Unexpected error from grpc.Dial: %v", err)
}

// Subscribe to state updates
s := &testConnectivityStateSubscriber{onMsgCh: make(chan connectivity.State)}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)

// Send an update from the resolver that will trigger the LB policy's UpdateClientConnState.
go rb.UpdateState(resolver.State{})

// Verify the resulting states.
for i, want := range wantStates {
if i == len(sendStates) {
// Trigger Shutdown to be sent by the channel.
cc.Close()
}
select {
case got := <-s.onMsgCh:
if got != want {
t.Errorf("Update %v was %s; want %s", i, got, want)
} else {
t.Logf("Update %v was %s as expected", i, got)
}
case <-ctx.Done():
t.Fatalf("Timed out waiting for state update %v: %s", i, want)
}
}
}
108 changes: 0 additions & 108 deletions test/connectivity_state_updates_test.go

This file was deleted.