Skip to content

Commit

Permalink
v1.37.x: backport PRs (#4411)
Browse files Browse the repository at this point in the history
Co-authored-by: Doug Fawley <dfawley@google.com>
  • Loading branch information
menghanl and dfawley authored May 11, 2021
1 parent 43d7a9f commit ef64e13
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 7 deletions.
4 changes: 2 additions & 2 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
if len(addrs) <= 0 {
acbw.ac.tearDown(errConnDrain)
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
return
}
if !acbw.ac.tryUpdateAddrs(addrs) {
Expand All @@ -220,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac.acbw = nil
acbw.ac.mu.Unlock()
acState := acbw.ac.getState()
acbw.ac.tearDown(errConnDrain)
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)

if acState == connectivity.Shutdown {
return
Expand Down
8 changes: 4 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())

for _, opt := range opts {
Expand Down Expand Up @@ -1446,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
}

// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
//
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
// will leak. In most cases, call cc.removeAddrConn() instead.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
//
// If the route name did change, then we must wait until the first RDS
// update before reporting this LDS config.
w.serviceCb(w.lastUpdate, nil)
if w.lastUpdate.virtualHost != nil {
// We want to send an update with the new fields from the new LDS
// (e.g. max stream duration), and old fields from the the previous
// RDS.
//
// But note that this should only happen when virtual host is set,
// which means an RDS was received.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
Expand Down
42 changes: 42 additions & 0 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,48 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
}
}

// TestXDSResolverMultipleLDSUpdates tests the case where two LDS updates with
// the same RDS name to watch are received without an RDS in between. Those LDS
// updates shouldn't trigger service config update.
//
// This test case also makes sure the resolver doesn't panic.
func (s) TestXDSResolverMultipleLDSUpdates(t *testing.T) {
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil },
})
defer func() {
cancel()
xdsR.Close()
}()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
defer replaceRandNumGenerator(0)()

// Send a new LDS update, with the same fields.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
// Should NOT trigger a state update.
gotState, err := tcc.stateCh.Receive(ctx)
if err == nil {
t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState)
}

// Send a new LDS update, with the same RDS name, but different fields.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil)
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
gotState, err = tcc.stateCh.Receive(ctx)
if err == nil {
t.Fatalf("ClientConn.UpdateState received %v, want timeout error", gotState)
}
}

type filterBuilder struct {
httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test.
path *[]string
Expand Down

0 comments on commit ef64e13

Please sign in to comment.