Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: remove option to send RPCs before HTTP/2 handshake is completed #2904

Merged
merged 1 commit into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (s) TestOneServerDown(t *testing.T) {
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
Expand Down Expand Up @@ -654,7 +654,7 @@ func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
Expand Down
27 changes: 12 additions & 15 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1176,20 +1175,18 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return nil, nil, err
}

if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-time.After(connectDeadline.Sub(time.Now())):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
select {
case <-time.After(connectDeadline.Sub(time.Now())):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
return newTr, reconnect, nil
}
Expand Down
7 changes: 3 additions & 4 deletions clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s

client, err := DialContext(ctx,
"",
WithWaitForHandshake(),
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
WithDialer(pl.Dialer()),
Expand Down Expand Up @@ -236,7 +235,7 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
conn.Close()
}()

client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -322,7 +321,7 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -418,7 +417,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
client, err := DialContext(ctx, "this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), withResolverBuilder(rb))
if err != nil {
t.Fatal(err)
}
Expand Down
166 changes: 0 additions & 166 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/naming"
Expand Down Expand Up @@ -151,72 +150,7 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
}
}

var allReqHSSettings = []envconfig.RequireHandshakeSetting{
envconfig.RequireHandshakeOff,
envconfig.RequireHandshakeOn,
}

func (s) TestDialWaitsForServerSettings(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()

// Test with all environment variable settings, which should not impact the
// test case since WithWaitForHandshake has higher priority.
for _, setting := range allReqHSSettings {
envconfig.RequireHandshake = setting
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
done := make(chan struct{})
sent := make(chan struct{})
dialDone := make(chan struct{})
go func() { // Launch the server.
defer func() {
close(done)
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn.Close()
// Sleep for a little bit to make sure that Dial on client
// side blocks until settings are received.
time.Sleep(100 * time.Millisecond)
framer := http2.NewFramer(conn, conn)
close(sent)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings. Err: %v", err)
return
}
<-dialDone // Close conn only after dial returns.
}()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock())
close(dialDone)
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}
defer client.Close()
select {
case <-sent:
default:
t.Fatalf("Dial returned before server settings were sent")
}
<-done
}
}

func (s) TestDialWaitsForServerSettingsViaEnv(t *testing.T) {
// Set default behavior and restore current setting after test.
old := envconfig.RequireHandshake
envconfig.RequireHandshake = envconfig.RequireHandshakeOn
defer func() { envconfig.RequireHandshake = old }()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -263,61 +197,6 @@ func (s) TestDialWaitsForServerSettingsViaEnv(t *testing.T) {
}

func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()

for _, setting := range allReqHSSettings {
envconfig.RequireHandshake = setting
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
done := make(chan struct{})
numConns := 0
go func() { // Launch the server.
defer func() {
close(done)
}()
for {
conn, err := lis.Accept()
if err != nil {
break
}
numConns++
defer conn.Close()
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
client, err := DialContext(ctx,
lis.Addr().String(),
WithInsecure(),
WithWaitForHandshake(),
WithBlock(),
withBackoff(noBackoff{}),
withMinConnectDeadline(func() time.Duration { return time.Second / 4 }))
lis.Close()
if err == nil {
client.Close()
t.Fatalf("Unexpected success (err=nil) while dialing")
}
if err != context.DeadlineExceeded {
t.Fatalf("DialContext(_) = %v; want context.DeadlineExceeded", err)
}
if numConns < 2 {
t.Fatalf("dial attempts: %v; want > 1", numConns)
}
<-done
}
}

func (s) TestDialWaitsForServerSettingsViaEnvAndFails(t *testing.T) {
// Set default behavior and restore current setting after test.
old := envconfig.RequireHandshake
envconfig.RequireHandshake = envconfig.RequireHandshakeOn
defer func() { envconfig.RequireHandshake = old }()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -359,55 +238,11 @@ func (s) TestDialWaitsForServerSettingsViaEnvAndFails(t *testing.T) {
<-done
}

func (s) TestDialDoesNotWaitForServerSettings(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()
envconfig.RequireHandshake = envconfig.RequireHandshakeOff

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
done := make(chan struct{})
dialDone := make(chan struct{})
go func() { // Launch the server.
defer func() {
close(done)
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn.Close()
<-dialDone // Close conn only after dial returns.
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
if err != nil {
t.Fatalf("DialContext returned err =%v; want nil", err)
}
defer client.Close()

if state := client.GetState(); state != connectivity.Ready {
t.Fatalf("client.GetState() = %v; want connectivity.Ready", state)
}
close(dialDone)
}

// 1. Client connects to a server that doesn't send preface.
// 2. After minConnectTimeout(500 ms here), client disconnects and retries.
// 3. The new server sends its preface.
// 4. Client doesn't kill the connection this time.
func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()
envconfig.RequireHandshake = envconfig.RequireHandshakeOn

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -1224,7 +1059,6 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {

client, err := Dial("this-gets-overwritten",
WithInsecure(),
WithWaitForHandshake(),
withResolverBuilder(rb),
withBackoff(noBackoff{}),
WithBalancerName(stateRecordingBalancerName),
Expand Down
13 changes: 0 additions & 13 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type dialOptions struct {
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
reqHandshake envconfig.RequireHandshakeSetting
channelzParentID int64
disableServiceConfig bool
disableRetry bool
Expand Down Expand Up @@ -100,17 +99,6 @@ func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
}
}

// WithWaitForHandshake blocks until the initial settings frame is received from
// the server before assigning RPCs to the connection.
//
// Deprecated: this is the default behavior, and this option will be removed
// after the 1.18 release.
func WithWaitForHandshake() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.reqHandshake = envconfig.RequireHandshakeOn
})
}

// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is
Expand Down Expand Up @@ -546,7 +534,6 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption {
func defaultDialOptions() dialOptions {
return dialOptions{
disableRetry: !envconfig.Retry,
reqHandshake: envconfig.RequireHandshake,
healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
Expand Down
33 changes: 2 additions & 31 deletions internal/envconfig/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,11 @@ import (
)

const (
prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
)

// RequireHandshakeSetting describes the settings for handshaking.
type RequireHandshakeSetting int

const (
// RequireHandshakeOn indicates to wait for handshake before considering a
// connection ready/successful.
RequireHandshakeOn RequireHandshakeSetting = iota
// RequireHandshakeOff indicates to not wait for handshake before
// considering a connection ready/successful.
RequireHandshakeOff
prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
)

var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
// RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
// environment variable.
//
// Will be removed after the 1.18 release.
RequireHandshake = RequireHandshakeOn
)

func init() {
switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
case "on":
fallthrough
default:
RequireHandshake = RequireHandshakeOn
case "off":
RequireHandshake = RequireHandshakeOff
}
}
Loading