Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: port e2e tests from c-core #7271

Merged
merged 18 commits into from
Jun 11, 2024
56 changes: 56 additions & 0 deletions internal/testutils/blocking_context_dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package testutils

import (
"context"
"net"
)

// BlockingDialer is a dialer that waits for Resume() to be called before
// dialing.
type BlockingDialer struct {
dialer *net.Dialer
blockCh chan struct{}
}

// NewBlockingDialer wraps a net.Dialer and waits for Resume() to be called
// before dialing. Useful to simulate connection delays.
func NewBlockingDialer() *BlockingDialer {
return &BlockingDialer{
dialer: &net.Dialer{},
blockCh: make(chan struct{}),
}
}

// DialContext implements a context dialer for use with grpc.WithContextDialer
// dial option.
func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
select {
case <-d.blockCh:
case <-ctx.Done():
return nil, ctx.Err()
}
return d.dialer.DialContext(ctx, "tcp", addr)
}

// Resume unblocks the dialer. Resume can only be called once.
func (d *BlockingDialer) Resume() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
close(d.blockCh)
}
14 changes: 11 additions & 3 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ type LocalityOptions struct {
Weight uint32
// Backends is a set of backends belonging to this locality.
Backends []BackendOptions
// Priority is the priority of the locality. Defaults to 0.
Priority uint32
}

// BackendOptions contains options to configure individual backends in a
Expand All @@ -686,6 +688,8 @@ type BackendOptions struct {
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
// Weight sets the backend weight. Defaults to 1.
Weight uint32
}

// EndpointOptions contains options to configure an Endpoint (or
Expand All @@ -708,7 +712,7 @@ type EndpointOptions struct {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
bOpts = append(bOpts, BackendOptions{Port: p})
bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1})
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Expand All @@ -729,6 +733,10 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, b := range locality.Backends {
// Weight defaults to 1.
if b.Weight == 0 {
b.Weight = 1
}
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
Expand All @@ -740,7 +748,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}},
}},
HealthStatus: b.HealthStatus,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight},
})
}

Expand All @@ -752,7 +760,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
Priority: 0,
Priority: locality.Priority,
})
}

Expand Down
28 changes: 28 additions & 0 deletions internal/testutils/xds/e2e/setup_management_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

// SetupManagementServer performs the following:
Expand Down Expand Up @@ -110,3 +111,30 @@ func DefaultBootstrapContents(nodeID, serverURI string) ([]byte, error) {
}
return bs, nil
}

// SetupDNS unregisters the DNS resolver and registers a manual resolver for the
easwars marked this conversation as resolved.
Show resolved Hide resolved
// same scheme. This allows the test to mock the DNS resolution by supplying the
// addresses of the test backends.
//
// Returns the following:
// - a channel onto which the DNS target being resolved is written to by the
// mock DNS resolver
// - a channel to notify close of the DNS resolver
// - a channel to notify re-resolution requests to the DNS resolver
// - a manual resolver which is used to mock the actual DNS resolution
// - a cleanup function which re-registers the original DNS resolver
func SetupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
targetCh := make(chan resolver.Target, 1)
closeCh := make(chan struct{}, 1)
resolveNowCh := make(chan resolver.ResolveNowOptions, 1)

mr := manual.NewBuilderWithScheme("dns")
mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }
mr.CloseCallback = func() { closeCh <- struct{}{} }
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }

dnsResolverBuilder := resolver.Get("dns")
resolver.Register(mr)

return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
}
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
// - the nodeID expected by the management server
// - the grpc channel to the test backend service
// - the manual resolver configured on the channel
// - the xDS cient used the grpc channel
// - the xDS client used the grpc channel
easwars marked this conversation as resolved.
Show resolved Hide resolved
// - a channel on which requested cluster resource names are sent
// - a channel used to signal that previously requested cluster resources are
// no longer requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,6 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
})
}

// setupDNS unregisters the DNS resolver and registers a manual resolver for the
// same scheme. This allows the test to mock the DNS resolution by supplying the
// addresses of the test backends.
//
// Returns the following:
// - a channel onto which the DNS target being resolved is written to by the
// mock DNS resolver
// - a channel to notify close of the DNS resolver
// - a channel to notify re-resolution requests to the DNS resolver
// - a manual resolver which is used to mock the actual DNS resolution
// - a cleanup function which re-registers the original DNS resolver
func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
targetCh := make(chan resolver.Target, 1)
closeCh := make(chan struct{}, 1)
resolveNowCh := make(chan resolver.ResolveNowOptions, 1)

mr := manual.NewBuilderWithScheme("dns")
mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }
mr.CloseCallback = func() { closeCh <- struct{}{} }
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }

dnsResolverBuilder := resolver.Get("dns")
resolver.Register(mr)

return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
}

// TestAggregateCluster_WithTwoEDSClusters tests the case where the top-level
// cluster resource is an aggregate cluster. It verifies that RPCs fail when the
// management server has not responded to all requested EDS resources, and also
Expand Down Expand Up @@ -472,7 +445,7 @@ func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
// cluster. The test verifies that RPCs fail until both clusters are resolved to
// endpoints, and RPCs are routed to the higher priority EDS cluster.
func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
dnsTargetCh, _, _, dnsR, cleanup1 := e2e.SetupDNS()
defer cleanup1()

// Start an xDS management server that pushes the name of the requested EDS
Expand Down Expand Up @@ -662,7 +635,7 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
// still successful. This is the expected behavior because the cluster resolver
// policy eats errors from DNS Resolver after it has returned an error.
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
dnsTargetCh, _, _, dnsR, cleanup1 := e2e.SetupDNS()
defer cleanup1()

// Start an xDS management server.
Expand Down
Loading
Loading