Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/testutils: add a new test type that implements resolver.ClientConn #6668

Merged
merged 11 commits into from
Oct 12, 2023
32 changes: 16 additions & 16 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func init() {
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -329,7 +329,7 @@ func (s) TestWeightedTarget(t *testing.T) {
// have a weighted target balancer will one sub-balancer, and we add and remove
// backends from the subBalancer.
func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -427,7 +427,7 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
// TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a
// weighted target balancer with two sub-balancers, each with one backend.
func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -493,7 +493,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
// a weighted target balancer with two sub-balancers, each with more than one
// backend.
func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -637,7 +637,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
// case where we have a weighted target balancer with two sub-balancers of
// differing weights.
func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -718,7 +718,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test
// have a weighted target balancer with three sub-balancers and we remove one of
// the subBalancers.
func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -879,7 +879,7 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// where we have a weighted target balancer with two sub-balancers, and we
// change the weight of these subBalancers.
func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -997,7 +997,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing
// the picks won't fail with transient_failure, and should instead wait for the
// other sub-balancer.
func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes

// Verify that a SubConn is created with the expected address and hierarchy
// path cleared.
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.TestClientConn, addr resolver.Address) {
func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) {
t.Helper()

gotAddr := <-cc.NewSubConnAddrsCh
Expand All @@ -1163,7 +1163,7 @@ type subConnWithAddr struct {
//
// Returned value is a map from subBalancer (identified by its config) to
// subConns created by it.
func waitForNewSubConns(t *testing.T, cc *testutils.TestClientConn, num int) map[string][]subConnWithAddr {
func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr {
t.Helper()

scs := make(map[string][]subConnWithAddr)
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func init() {
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func (s) TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

Expand Down Expand Up @@ -1274,7 +1274,7 @@ func (s) TestInitialIdle(t *testing.T) {
// TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a
// transition from TF to Connecting, the overall state will still be TF.
func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}

wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()
Expand Down Expand Up @@ -1314,17 +1314,17 @@ func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
*testutils.BalancerClientConn
states []balancer.State
}

func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
t.BalancerClientConn.UpdateState(bs)
}

func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}

balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
Expand Down
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ import (
"google.golang.org/grpc/status"

_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
)

const (
Expand Down
4 changes: 2 additions & 2 deletions internal/balancer/gracefulswitch/gracefulswitch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func setup(t *testing.T) (*testutils.TestClientConn, *Balancer) {
tcc := testutils.NewTestClientConn(t)
func setup(t *testing.T) (*testutils.BalancerClientConn, *Balancer) {
tcc := testutils.NewBalancerClientConn(t)
return tcc, NewBalancer(tcc, balancer.BuildOptions{})
}

Expand Down
14 changes: 7 additions & 7 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Test(t *testing.T) {
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)

cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand All @@ -203,8 +203,8 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewTestClientConn(t)
func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[resolver.Address]*testutils.TestSubConn) {
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) {
return nil
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: bOpts,
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
exitIdleCh <- struct{}{}
},
})
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
bg := New(Options{
CC: cc,
BuildOpts: balancer.BuildOptions{},
Expand Down Expand Up @@ -561,7 +561,7 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
// for the second passed in address and also only picks that created SubConn.
// The new aggregated picker should reflect this change for the child.
func (s) TestBalancerGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(Options{
Expand Down
69 changes: 20 additions & 49 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package dns
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
Expand All @@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/resolver/dns/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -47,15 +47,11 @@ var EnableSRVLookups = false

var logger = grpclog.Component("dns")

// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)

func init() {
resolver.Register(NewBuilder())
internal.TimeAfterFunc = time.After
internal.NewNetResolver = newNetResolver
internal.AddressDialer = addressDialer
}

const (
Expand All @@ -70,31 +66,18 @@ const (
txtAttribute = "grpc_config="
)

var (
errMissingAddr = errors.New("dns resolver: missing address")

// Addresses ending with a colon that is supposed to be the separator
// between host and port is not allowed. E.g. "::" is a valid address as
// it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
// a colon as the host and port separator
errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
)

var (
defaultResolver netResolver = net.DefaultResolver
// To prevent excessive re-resolution, we enforce a rate limit on DNS
// resolution requests.
minDNSResRate = 30 * time.Second
)

var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
return dialer.DialContext(ctx, network, address)
}
}

var newNetResolver = func(authority string) (netResolver, error) {
var newNetResolver = func(authority string) (internal.NetResolver, error) {
if authority == "" {
return net.DefaultResolver, nil
}

host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
Expand All @@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {

return &net.Resolver{
PreferGo: true,
Dial: addressDialer(authorityWithPort),
Dial: internal.AddressDialer(authorityWithPort),
}, nil
}

Expand Down Expand Up @@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
disableServiceConfig: opts.DisableServiceConfig,
}

if target.URL.Host == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = newNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
d.resolver, err = internal.NewNetResolver(target.URL.Host)
if err != nil {
return nil, err
}

d.wg.Add(1)
Expand All @@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {
return "dns"
}

type netResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

// deadResolver is a resolver that does nothing.
type deadResolver struct{}

Expand All @@ -178,7 +151,7 @@ func (deadResolver) Close() {}
type dnsResolver struct {
host string
port string
resolver netResolver
resolver internal.NetResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
Expand Down Expand Up @@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {
err = d.cc.UpdateState(*state)
}

var timer *time.Timer
var waitTime time.Duration
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
waitTime = internal.MinResolutionRate
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
waitTime = backoff.DefaultExponential.Backoff(backoffIndex)
backoffIndex++
}
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
case <-internal.TimeAfterFunc(waitTime):
}
}
}
Expand Down Expand Up @@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
func parseTarget(target, defaultPort string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
return "", "", internal.ErrMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
Expand All @@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
if port == "" {
// If the port field is empty (target ends with colon), e.g. "[::1]:",
// this is an error.
return "", "", errEndsWithColon
return "", "", internal.ErrEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
Expand Down
Loading