From cd4f1b615cc0dc027283472ade94527d0e1fbf1b Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 8 Mar 2019 11:17:57 -0800 Subject: [PATCH 1/5] service config: default service config --- clientconn.go | 77 ++++++++++++++++++++++++++++++---------- clientconn_test.go | 63 ++++++++++++++++++++++++++++++++ dialoptions.go | 18 ++++++++++ resolver_conn_wrapper.go | 4 +-- service_config.go | 26 +++++++++----- service_config_test.go | 16 ++++++--- 6 files changed, 169 insertions(+), 35 deletions(-) diff --git a/clientconn.go b/clientconn.go index a1e1a98006a7..bea3efe5435a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -383,7 +383,6 @@ type ClientConn struct { mu sync.RWMutex resolverWrapper *ccResolverWrapper sc ServiceConfig - scRaw string conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters @@ -430,7 +429,8 @@ func (cc *ClientConn) scWatcher() { // TODO: load balance policy runtime change is ignored. // We may revisit this decision in the future. cc.sc = sc - cc.scRaw = "" + s := "" + cc.sc.rawJSONString = &s cc.mu.Unlock() case <-cc.ctx.Done(): return @@ -457,6 +457,29 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { } } +// Apply default service config when default service config is configured and: +// * resolver service config is disabled +// * or, resolver does not return a service config or returns an invalid one. +func (cc *ClientConn) shouldApplyDefaultServiceConfig(sc string) bool { + if cc.dopts.defaultServiceConfig != nil { + if cc.dopts.disableServiceConfig { + return true + } + // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type. + // Right now, we assume that empty service config string means resolver does not return a config. + if sc == "" { + return false + } + // TODO: the logic below is temporary. Once we finish the logic to validate service config + // in resolver, we will replace the logic below. + _, err := parseServiceConfig(sc) + if err != nil { + return true + } + } + return false +} + func (cc *ClientConn) updateResolverState(s resolver.State) error { cc.mu.Lock() defer cc.mu.Unlock() @@ -467,26 +490,18 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { return nil } - if !cc.dopts.disableServiceConfig && cc.scRaw != s.ServiceConfig { - // New service config; apply it. + if cc.shouldApplyDefaultServiceConfig(s.ServiceConfig) { + if cc.sc.rawJSONString == nil { + cc.applyServiceConfig(cc.dopts.defaultServiceConfig) + } + } else { + // TODO: the parsing logic below will be moved inside resolver. sc, err := parseServiceConfig(s.ServiceConfig) - if err != nil { - fmt.Println("error parsing config: ", err) + if err != nil && s.ServiceConfig != "" { // s.ServiceConfig != "" is a temporary special case. return err } - cc.scRaw = s.ServiceConfig - cc.sc = sc - - if cc.sc.retryThrottling != nil { - newThrottler := &retryThrottler{ - tokens: cc.sc.retryThrottling.MaxTokens, - max: cc.sc.retryThrottling.MaxTokens, - thresh: cc.sc.retryThrottling.MaxTokens / 2, - ratio: cc.sc.retryThrottling.TokenRatio, - } - cc.retryThrottler.Store(newThrottler) - } else { - cc.retryThrottler.Store((*retryThrottler)(nil)) + if cc.sc.rawJSONString == nil || *cc.sc.rawJSONString != s.ServiceConfig { + cc.applyServiceConfig(sc) } } @@ -748,6 +763,30 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st return t, done, nil } +// Parse and apply the service config. If sc is passed as a non-nil pointer, which indicates we have +// a parsed service config, we will skip the parsing. It will also skip the whole processing if +// the new service config is the same as the old one. +func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { + if sc == nil { + return fmt.Errorf("got nil pointer for service config") + } + cc.sc = *sc + + if cc.sc.retryThrottling != nil { + newThrottler := &retryThrottler{ + tokens: cc.sc.retryThrottling.MaxTokens, + max: cc.sc.retryThrottling.MaxTokens, + thresh: cc.sc.retryThrottling.MaxTokens / 2, + ratio: cc.sc.retryThrottling.TokenRatio, + } + cc.retryThrottler.Store(newThrottler) + } else { + cc.retryThrottler.Store((*retryThrottler)(nil)) + } + + return nil +} + func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { cc.mu.RLock() r := cc.resolverWrapper diff --git a/clientconn_test.go b/clientconn_test.go index 9b5ac03547b6..84f28f63b3f4 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -1232,3 +1232,66 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs") } } + +func (s) TestDefaultServiceConfig(t *testing.T) { + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + addr := r.Scheme() + ":///non.existent" + js := `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "bar" + } + ], + "waitForReady": true + } + ] +}` + testDefaultServiceConfigWhenResolverServiceConfigDisabled(t, r, addr, js) + testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t, r, addr, js) +} + +func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { + var i int + for i = 0; i < 10; i++ { + mc := cc.GetMethodConfig("/foo/bar") + if mc.WaitForReady != nil && *mc.WaitForReady == true { + break + } + time.Sleep(100 * time.Millisecond) + } + return i != 10 +} + +func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + // Resolver service config gets ignored since resolver service config is disabled. + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + ServiceConfig: "{}", + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} + +func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} diff --git a/dialoptions.go b/dialoptions.go index a0743a9e75fa..51a58a9e0404 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -63,6 +63,7 @@ type dialOptions struct { disableHealthCheck bool healthCheckFunc internal.HealthChecker minConnectTimeout func() time.Duration + defaultServiceConfig *ServiceConfig } // DialOption configures how we set up the connection. @@ -447,6 +448,23 @@ func WithDisableServiceConfig() DialOption { }) } +// WithDefaultServiceConfig returns a DialOption that configures the default service config, which +// will be used in cases where: +// 1. WithDisableServiceConfig is called. +// 2. Resolver does not return service config or if the resolver gets and invalid config. +// It is strongly recommended that caller of this function verifies the validity of the input string +// by using the grpc.ValidateServiceConfig function. +func WithDefaultServiceConfig(s string) DialOption { + return newFuncDialOption(func(o *dialOptions) { + sc, err := parseServiceConfig(s) + if err != nil { + grpclog.Warningf("the provided service config is invalid, err: %v", err) + return + } + o.defaultServiceConfig = sc + }) +} + // WithDisableRetry returns a DialOption that disables retries, even if the // service config enables them. This does not impact transparent retries, which // will happen automatically if no data is written to the wire or if the RPC is diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 176da7bd3c48..e9cef3a92b55 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -118,7 +118,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) { ccr.curState = s } -// NewAddress is called by the resolver implemenetion to send addresses to gRPC. +// NewAddress is called by the resolver implementation to send addresses to gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { if ccr.isDone() { return @@ -131,7 +131,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { ccr.cc.updateResolverState(ccr.curState) } -// NewServiceConfig is called by the resolver implemenetion to send service +// NewServiceConfig is called by the resolver implementation to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { if ccr.isDone() { diff --git a/service_config.go b/service_config.go index 982a3bf21e65..931108778a4a 100644 --- a/service_config.go +++ b/service_config.go @@ -99,6 +99,10 @@ type ServiceConfig struct { // healthCheckConfig must be set as one of the requirement to enable LB channel // health check. healthCheckConfig *healthCheckConfig + // rawJSONString stores the pointer to the original service config json string that get parsed into + // this service config struct. Null pointer means this is a service config struct just defined, + // but no json string has been parsed and initialize the struct. + rawJSONString *string } // healthCheckConfig defines the go-native version of the LB channel health check config. @@ -238,24 +242,22 @@ type jsonSC struct { HealthCheckConfig *healthCheckConfig } -func parseServiceConfig(js string) (ServiceConfig, error) { - if len(js) == 0 { - return ServiceConfig{}, fmt.Errorf("no JSON service config provided") - } +func parseServiceConfig(js string) (*ServiceConfig, error) { var rsc jsonSC err := json.Unmarshal([]byte(js), &rsc) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return &ServiceConfig{}, err } sc := ServiceConfig{ LB: rsc.LoadBalancingPolicy, Methods: make(map[string]MethodConfig), retryThrottling: rsc.RetryThrottling, healthCheckConfig: rsc.HealthCheckConfig, + rawJSONString: &js, } if rsc.MethodConfig == nil { - return sc, nil + return &sc, nil } for _, m := range *rsc.MethodConfig { @@ -265,7 +267,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { d, err := parseDuration(m.Timeout) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return &ServiceConfig{}, err } mc := MethodConfig{ @@ -274,7 +276,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { } if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return ServiceConfig{}, err + return &ServiceConfig{}, err } if m.MaxRequestMessageBytes != nil { if *m.MaxRequestMessageBytes > int64(maxInt) { @@ -305,7 +307,7 @@ func parseServiceConfig(js string) (ServiceConfig, error) { sc.retryThrottling = nil } } - return sc, nil + return &sc, nil } func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) { @@ -370,3 +372,9 @@ func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { func newInt(b int) *int { return &b } + +// ValidateServiceConfig validates the input service config json string and returns the error. +func ValidateServiceConfig(js string) error { + _, err := parseServiceConfig(js) + return err +} diff --git a/service_config_test.go b/service_config_test.go index 020b643f89c8..18c81eeb121e 100644 --- a/service_config_test.go +++ b/service_config_test.go @@ -79,8 +79,8 @@ func (s) TestParseLoadBalancer(t *testing.T) { for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { - t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, *sc, err, c.wantSC, c.wantErr) } } } @@ -167,7 +167,7 @@ func (s) TestParseWaitForReady(t *testing.T) { for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -249,7 +249,7 @@ func (s) TestPraseTimeOut(t *testing.T) { for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -318,7 +318,7 @@ func (s) TestPraseMsgSize(t *testing.T) { for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -384,3 +384,9 @@ func newDuration(b time.Duration) *time.Duration { func newString(b string) *string { return &b } + +func scCompareWithRawJSONSkipped(s1, s2 ServiceConfig) bool { + s1.rawJSONString = nil + s2.rawJSONString = nil + return reflect.DeepEqual(s1, s2) +} From 90b260d6e8dc1d44e535ebfeb5569bbb7fcf63c2 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 25 Mar 2019 14:33:16 -0700 Subject: [PATCH 2/5] fix reviews again --- clientconn.go | 51 ++++++++++++++++++++++-------------------- clientconn_test.go | 24 ++++++++++++++++++++ dialoptions.go | 30 +++++++++++-------------- service_config.go | 12 +++------- service_config_test.go | 46 ++++++++++++++++++++----------------- 5 files changed, 93 insertions(+), 70 deletions(-) diff --git a/clientconn.go b/clientconn.go index bea3efe5435a..b9c37e28bd44 100644 --- a/clientconn.go +++ b/clientconn.go @@ -86,6 +86,9 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") + // errInvalidDefaultServiceConfig indicates that grpc.WithDefaultServiceConfig(string) provides + // an invalid service config string. + errInvalidDefaultServiceConfig = errors.New("grpc: the provided default service config is invalid") ) const ( @@ -173,6 +176,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } } + if cc.dopts.defaultServiceConfigRawJSON != nil { + sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) + if err != nil { + return nil, errInvalidDefaultServiceConfig + } + cc.dopts.defaultServiceConfig = sc + } cc.mkp = cc.dopts.copts.KeepaliveParams if cc.dopts.copts.Dialer == nil { @@ -457,27 +467,22 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { } } -// Apply default service config when default service config is configured and: +// gRPC should resort to default service config when: // * resolver service config is disabled // * or, resolver does not return a service config or returns an invalid one. -func (cc *ClientConn) shouldApplyDefaultServiceConfig(sc string) bool { - if cc.dopts.defaultServiceConfig != nil { - if cc.dopts.disableServiceConfig { - return true - } - // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type. - // Right now, we assume that empty service config string means resolver does not return a config. - if sc == "" { - return false - } - // TODO: the logic below is temporary. Once we finish the logic to validate service config - // in resolver, we will replace the logic below. - _, err := parseServiceConfig(sc) - if err != nil { - return true - } +func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool { + if cc.dopts.disableServiceConfig { + return true } - return false + // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type. + // Right now, we assume that empty service config string means resolver does not return a config. + if sc == "" { + return true + } + // TODO: the logic below is temporary. Once we finish the logic to validate service config + // in resolver, we will replace the logic below. + _, err := parseServiceConfig(sc) + return err != nil } func (cc *ClientConn) updateResolverState(s resolver.State) error { @@ -490,14 +495,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { return nil } - if cc.shouldApplyDefaultServiceConfig(s.ServiceConfig) { - if cc.sc.rawJSONString == nil { + if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) { + if cc.dopts.defaultServiceConfig != nil && cc.sc.rawJSONString == nil { cc.applyServiceConfig(cc.dopts.defaultServiceConfig) } } else { // TODO: the parsing logic below will be moved inside resolver. sc, err := parseServiceConfig(s.ServiceConfig) - if err != nil && s.ServiceConfig != "" { // s.ServiceConfig != "" is a temporary special case. + if err != nil { return err } if cc.sc.rawJSONString == nil || *cc.sc.rawJSONString != s.ServiceConfig { @@ -763,11 +768,9 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st return t, done, nil } -// Parse and apply the service config. If sc is passed as a non-nil pointer, which indicates we have -// a parsed service config, we will skip the parsing. It will also skip the whole processing if -// the new service config is the same as the old one. func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { if sc == nil { + // should never reach here. return fmt.Errorf("got nil pointer for service config") } cc.sc = *sc diff --git a/clientconn_test.go b/clientconn_test.go index 84f28f63b3f4..ba6550fe882b 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -1250,8 +1250,10 @@ func (s) TestDefaultServiceConfig(t *testing.T) { } ] }` + testInvalidDefaultServiceConfig(t) testDefaultServiceConfigWhenResolverServiceConfigDisabled(t, r, addr, js) testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t, r, addr, js) + testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t, r, addr, js) } func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { @@ -1266,6 +1268,13 @@ func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { return i != 10 } +func testInvalidDefaultServiceConfig(t *testing.T) { + _, err := Dial("fake.com", WithInsecure(), WithDefaultServiceConfig("")) + if err != errInvalidDefaultServiceConfig { + t.Fatalf("Dial got err: %v, want: %v", err, errInvalidDefaultServiceConfig) + } +} + func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r resolver.Resolver, addr string, js string) { cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithDefaultServiceConfig(js)) if err != nil { @@ -1295,3 +1304,18 @@ func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T t.Fatal("default service config failed to be applied after 1s") } } + +func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r resolver.Resolver, addr string, js string) { + cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(js)) + if err != nil { + t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) + } + defer cc.Close() + r.(*manual.Resolver).UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr}}, + ServiceConfig: "{something wrong,}", + }) + if !verifyWaitForReadyEqualsTrue(cc) { + t.Fatal("default service config failed to be applied after 1s") + } +} diff --git a/dialoptions.go b/dialoptions.go index 51a58a9e0404..a9ccf496c2ff 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -55,15 +55,16 @@ type dialOptions struct { // balancer, and also by WithBalancerName dial option. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder - reqHandshake envconfig.RequireHandshakeSetting - channelzParentID int64 - disableServiceConfig bool - disableRetry bool - disableHealthCheck bool - healthCheckFunc internal.HealthChecker - minConnectTimeout func() time.Duration - defaultServiceConfig *ServiceConfig + resolverBuilder resolver.Builder + reqHandshake envconfig.RequireHandshakeSetting + channelzParentID int64 + disableServiceConfig bool + disableRetry bool + disableHealthCheck bool + healthCheckFunc internal.HealthChecker + minConnectTimeout func() time.Duration + defaultServiceConfig *ServiceConfig + defaultServiceConfigRawJSON *string } // DialOption configures how we set up the connection. @@ -452,16 +453,11 @@ func WithDisableServiceConfig() DialOption { // will be used in cases where: // 1. WithDisableServiceConfig is called. // 2. Resolver does not return service config or if the resolver gets and invalid config. -// It is strongly recommended that caller of this function verifies the validity of the input string -// by using the grpc.ValidateServiceConfig function. +// +// This API is EXPERIMENTAL. func WithDefaultServiceConfig(s string) DialOption { return newFuncDialOption(func(o *dialOptions) { - sc, err := parseServiceConfig(s) - if err != nil { - grpclog.Warningf("the provided service config is invalid, err: %v", err) - return - } - o.defaultServiceConfig = sc + o.defaultServiceConfigRawJSON = &s }) } diff --git a/service_config.go b/service_config.go index 931108778a4a..ff401e1c4579 100644 --- a/service_config.go +++ b/service_config.go @@ -247,7 +247,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) { err := json.Unmarshal([]byte(js), &rsc) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return &ServiceConfig{}, err + return nil, err } sc := ServiceConfig{ LB: rsc.LoadBalancingPolicy, @@ -267,7 +267,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) { d, err := parseDuration(m.Timeout) if err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return &ServiceConfig{}, err + return nil, err } mc := MethodConfig{ @@ -276,7 +276,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) { } if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) - return &ServiceConfig{}, err + return nil, err } if m.MaxRequestMessageBytes != nil { if *m.MaxRequestMessageBytes > int64(maxInt) { @@ -372,9 +372,3 @@ func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { func newInt(b int) *int { return &b } - -// ValidateServiceConfig validates the input service config json string and returns the error. -func ValidateServiceConfig(js string) error { - _, err := parseServiceConfig(js) - return err -} diff --git a/service_config_test.go b/service_config_test.go index 18c81eeb121e..afa02a990d04 100644 --- a/service_config_test.go +++ b/service_config_test.go @@ -29,7 +29,7 @@ import ( func (s) TestParseLoadBalancer(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -47,7 +47,7 @@ func (s) TestParseLoadBalancer(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ LB: newString("round_robin"), Methods: map[string]MethodConfig{ "/foo/Bar": { @@ -72,15 +72,15 @@ func (s) TestParseLoadBalancer(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { - t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, *sc, err, c.wantSC, c.wantErr) + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } } @@ -88,7 +88,7 @@ func (s) TestParseLoadBalancer(t *testing.T) { func (s) TestParseWaitForReady(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -105,7 +105,7 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { WaitForReady: newBool(true), @@ -128,7 +128,7 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { WaitForReady: newBool(false), @@ -160,14 +160,14 @@ func (s) TestParseWaitForReady(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -176,7 +176,7 @@ func (s) TestParseWaitForReady(t *testing.T) { func (s) TestPraseTimeOut(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -193,7 +193,7 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { Timeout: newDuration(time.Second), @@ -216,7 +216,7 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, { @@ -242,14 +242,14 @@ func (s) TestPraseTimeOut(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -258,7 +258,7 @@ func (s) TestPraseTimeOut(t *testing.T) { func (s) TestPraseMsgSize(t *testing.T) { testcases := []struct { scjs string - wantSC ServiceConfig + wantSC *ServiceConfig wantErr bool }{ { @@ -276,7 +276,7 @@ func (s) TestPraseMsgSize(t *testing.T) { } ] }`, - ServiceConfig{ + &ServiceConfig{ Methods: map[string]MethodConfig{ "/foo/Bar": { MaxReqSize: newInt(1024), @@ -311,14 +311,14 @@ func (s) TestPraseMsgSize(t *testing.T) { } ] }`, - ServiceConfig{}, + nil, true, }, } for _, c := range testcases { sc, err := parseServiceConfig(c.scjs) - if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(*sc, c.wantSC) { + if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) } } @@ -385,7 +385,13 @@ func newString(b string) *string { return &b } -func scCompareWithRawJSONSkipped(s1, s2 ServiceConfig) bool { +func scCompareWithRawJSONSkipped(s1, s2 *ServiceConfig) bool { + if s1 == nil && s2 == nil { + return true + } + if (s1 == nil) != (s2 == nil) { + return false + } s1.rawJSONString = nil s2.rawJSONString = nil return reflect.DeepEqual(s1, s2) From ec65214f68e34b460ffce43608f617a7cb9c1a67 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Tue, 26 Mar 2019 16:09:43 -0700 Subject: [PATCH 3/5] fix fix fix --- clientconn.go | 8 ++++---- clientconn_test.go | 5 +++-- dialoptions.go | 7 +++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/clientconn.go b/clientconn.go index b9c37e28bd44..ea9eaa4cffb4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -68,6 +68,9 @@ var ( errConnClosing = errors.New("grpc: the connection is closing") // errBalancerClosed indicates that the balancer is closed. errBalancerClosed = errors.New("grpc: balancer is closed") + // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default + // service config. + invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" ) // The following errors are returned from Dial and DialContext @@ -86,9 +89,6 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") - // errInvalidDefaultServiceConfig indicates that grpc.WithDefaultServiceConfig(string) provides - // an invalid service config string. - errInvalidDefaultServiceConfig = errors.New("grpc: the provided default service config is invalid") ) const ( @@ -179,7 +179,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if cc.dopts.defaultServiceConfigRawJSON != nil { sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) if err != nil { - return nil, errInvalidDefaultServiceConfig + return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err) } cc.dopts.defaultServiceConfig = sc } diff --git a/clientconn_test.go b/clientconn_test.go index ba6550fe882b..ab1db5664049 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "net" + "strings" "sync/atomic" "testing" "time" @@ -1270,8 +1271,8 @@ func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { func testInvalidDefaultServiceConfig(t *testing.T) { _, err := Dial("fake.com", WithInsecure(), WithDefaultServiceConfig("")) - if err != errInvalidDefaultServiceConfig { - t.Fatalf("Dial got err: %v, want: %v", err, errInvalidDefaultServiceConfig) + if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) { + t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix) } } diff --git a/dialoptions.go b/dialoptions.go index a9ccf496c2ff..e7491f9c91a8 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -443,14 +443,17 @@ func WithChannelzParentID(id int64) DialOption { // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any // service config provided by the resolver and provides a hint to the resolver // to not fetch service configs. +// +// Note that, this dial option only disables service config from resolver. If +// default service config is provided, grpc will use the default service config. func WithDisableServiceConfig() DialOption { return newFuncDialOption(func(o *dialOptions) { o.disableServiceConfig = true }) } -// WithDefaultServiceConfig returns a DialOption that configures the default service config, which -// will be used in cases where: +// WithDefaultServiceConfig returns a DialOption that configures the default +// service config, which will be used in cases where: // 1. WithDisableServiceConfig is called. // 2. Resolver does not return service config or if the resolver gets and invalid config. // From 6102f9d85721b4ae64454970b4232aeec1dc1823 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 29 Mar 2019 17:32:02 -0700 Subject: [PATCH 4/5] fff --- clientconn.go | 34 +++++++++++++++++++++++++--------- dialoptions.go | 2 +- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/clientconn.go b/clientconn.go index ea9eaa4cffb4..dfb30058f0f7 100644 --- a/clientconn.go +++ b/clientconn.go @@ -224,7 +224,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * select { case sc, ok := <-cc.dopts.scChan: if ok { - cc.sc = sc + cc.sc = &sc + s := "" // for service config API v1, we don't care the raw json string. + cc.sc.rawJSONString = &s scSet = true } default: @@ -270,7 +272,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * select { case sc, ok := <-cc.dopts.scChan: if ok { - cc.sc = sc + cc.sc = &sc + s := "" // for service config API v1, we don't care the raw json string. + cc.sc.rawJSONString = &s } case <-ctx.Done(): return nil, ctx.Err() @@ -392,7 +396,7 @@ type ClientConn struct { mu sync.RWMutex resolverWrapper *ccResolverWrapper - sc ServiceConfig + sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters @@ -438,8 +442,8 @@ func (cc *ClientConn) scWatcher() { cc.mu.Lock() // TODO: load balance policy runtime change is ignored. // We may revisit this decision in the future. - cc.sc = sc - s := "" + cc.sc = &sc + s := "" // for service config API v1, we don't care the raw json string. cc.sc.rawJSONString = &s cc.mu.Unlock() case <-cc.ctx.Done(): @@ -496,7 +500,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { } if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) { - if cc.dopts.defaultServiceConfig != nil && cc.sc.rawJSONString == nil { + if cc.dopts.defaultServiceConfig != nil && cc.sc == nil { cc.applyServiceConfig(cc.dopts.defaultServiceConfig) } } else { @@ -505,11 +509,17 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { if err != nil { return err } - if cc.sc.rawJSONString == nil || *cc.sc.rawJSONString != s.ServiceConfig { + if cc.sc == nil || *cc.sc.rawJSONString != s.ServiceConfig { cc.applyServiceConfig(sc) } } + // update the service config that will be sent to balancer. + if cc.sc != nil { + fmt.Println(cc.sc) + s.ServiceConfig = *cc.sc.rawJSONString + } + if cc.dopts.balancerBuilder == nil { // Only look at balancer types and switch balancer if balancer dial // option is not set. @@ -524,7 +534,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { // TODO: use new loadBalancerConfig field with appropriate priority. if isGRPCLB { newBalancerName = grpclbName - } else if cc.sc.LB != nil { + } else if cc.sc != nil && cc.sc.LB != nil { newBalancerName = *cc.sc.LB } else { newBalancerName = PickFirstBalancerName @@ -744,6 +754,9 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { // TODO: Avoid the locking here. cc.mu.RLock() defer cc.mu.RUnlock() + if cc.sc == nil { + return MethodConfig{} + } m, ok := cc.sc.Methods[method] if !ok { i := strings.LastIndex(method, "/") @@ -755,6 +768,9 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { cc.mu.RLock() defer cc.mu.RUnlock() + if cc.sc == nil { + return nil + } return cc.sc.healthCheckConfig } @@ -773,7 +789,7 @@ func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { // should never reach here. return fmt.Errorf("got nil pointer for service config") } - cc.sc = *sc + cc.sc = sc if cc.sc.retryThrottling != nil { newThrottler := &retryThrottler{ diff --git a/dialoptions.go b/dialoptions.go index e7491f9c91a8..e114fecbb7b4 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -63,7 +63,7 @@ type dialOptions struct { disableHealthCheck bool healthCheckFunc internal.HealthChecker minConnectTimeout func() time.Duration - defaultServiceConfig *ServiceConfig + defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string } From 54503a7b10258d61b4ce63609549649eecbfb613 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Tue, 2 Apr 2019 18:07:29 -0700 Subject: [PATCH 5/5] remove pointer --- clientconn.go | 11 ++--------- service_config.go | 9 ++++----- service_config_test.go | 4 ++-- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/clientconn.go b/clientconn.go index dfb30058f0f7..93b0aa2c06ad 100644 --- a/clientconn.go +++ b/clientconn.go @@ -225,8 +225,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = &sc - s := "" // for service config API v1, we don't care the raw json string. - cc.sc.rawJSONString = &s scSet = true } default: @@ -273,8 +271,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = &sc - s := "" // for service config API v1, we don't care the raw json string. - cc.sc.rawJSONString = &s } case <-ctx.Done(): return nil, ctx.Err() @@ -443,8 +439,6 @@ func (cc *ClientConn) scWatcher() { // TODO: load balance policy runtime change is ignored. // We may revisit this decision in the future. cc.sc = &sc - s := "" // for service config API v1, we don't care the raw json string. - cc.sc.rawJSONString = &s cc.mu.Unlock() case <-cc.ctx.Done(): return @@ -509,15 +503,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error { if err != nil { return err } - if cc.sc == nil || *cc.sc.rawJSONString != s.ServiceConfig { + if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig { cc.applyServiceConfig(sc) } } // update the service config that will be sent to balancer. if cc.sc != nil { - fmt.Println(cc.sc) - s.ServiceConfig = *cc.sc.rawJSONString + s.ServiceConfig = cc.sc.rawJSONString } if cc.dopts.balancerBuilder == nil { diff --git a/service_config.go b/service_config.go index ff401e1c4579..1c5227426f49 100644 --- a/service_config.go +++ b/service_config.go @@ -99,10 +99,9 @@ type ServiceConfig struct { // healthCheckConfig must be set as one of the requirement to enable LB channel // health check. healthCheckConfig *healthCheckConfig - // rawJSONString stores the pointer to the original service config json string that get parsed into - // this service config struct. Null pointer means this is a service config struct just defined, - // but no json string has been parsed and initialize the struct. - rawJSONString *string + // rawJSONString stores service config json string that get parsed into + // this service config struct. + rawJSONString string } // healthCheckConfig defines the go-native version of the LB channel health check config. @@ -254,7 +253,7 @@ func parseServiceConfig(js string) (*ServiceConfig, error) { Methods: make(map[string]MethodConfig), retryThrottling: rsc.RetryThrottling, healthCheckConfig: rsc.HealthCheckConfig, - rawJSONString: &js, + rawJSONString: js, } if rsc.MethodConfig == nil { return &sc, nil diff --git a/service_config_test.go b/service_config_test.go index afa02a990d04..a21416303c75 100644 --- a/service_config_test.go +++ b/service_config_test.go @@ -392,7 +392,7 @@ func scCompareWithRawJSONSkipped(s1, s2 *ServiceConfig) bool { if (s1 == nil) != (s2 == nil) { return false } - s1.rawJSONString = nil - s2.rawJSONString = nil + s1.rawJSONString = "" + s2.rawJSONString = "" return reflect.DeepEqual(s1, s2) }