Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.37.x: backport PRs #4411

Merged
merged 3 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
if len(addrs) <= 0 {
acbw.ac.tearDown(errConnDrain)
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
return
}
if !acbw.ac.tryUpdateAddrs(addrs) {
Expand All @@ -220,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac.acbw = nil
acbw.ac.mu.Unlock()
acState := acbw.ac.getState()
acbw.ac.tearDown(errConnDrain)
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)

if acState == connectivity.Shutdown {
return
Expand Down
8 changes: 4 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())

for _, opt := range opts {
Expand Down Expand Up @@ -1446,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
}

// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
//
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
// will leak. In most cases, call cc.removeAddrConn() instead.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
//
// If the route name did change, then we must wait until the first RDS
// update before reporting this LDS config.
w.serviceCb(w.lastUpdate, nil)
if w.lastUpdate.virtualHost != nil {
// We want to send an update with the new fields from the new LDS
// (e.g. max stream duration), and old fields from the the previous
// RDS.
//
// But note that this should only happen when virtual host is set,
// which means an RDS was received.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
Expand Down
42 changes: 42 additions & 0 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,48 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
}
}

// TestXDSResolverMultipleLDSUpdates tests the case where two LDS updates with
// the same RDS name to watch are received without an RDS in between. Those LDS
// updates shouldn't trigger service config update.
//
// This test case also makes sure the resolver doesn't panic.
func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
defer replaceRandNumGenerator(0)()

// Send a new LDS update, with the same fields.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
// Should NOT trigger a state update.
gotState, err := tcc.stateCh.Receive(ctx)
if err == nil {
t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState)
}

// Send a new LDS update, with the same RDS name, but different fields.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil)
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
gotState, err = tcc.stateCh.Receive(ctx)
if err == nil {
t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState)
}
}

type filterBuilder struct {
httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test.
path *[]string
Expand Down