Skip to content

Commit

Permalink
balancergroup: improve observability around balancer cache behavior (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Aug 31, 2023
1 parent aa6ce35 commit 778e638
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
17 changes: 16 additions & 1 deletion internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
// caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
if bg.logger.V(2) {
bg.logger.Infof("Removing and reusing child policy of type %q for locality %q from the balancer cache", balancerName, id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}

sbc, _ = old.(*subBalancerWrapper)
if sbc != nil && sbc.builder != builder {
// If the sub-balancer in cache was built with a different
Expand Down Expand Up @@ -403,7 +408,7 @@ func (bg *BalancerGroup) Remove(id string) {

sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok {
bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
bg.logger.Errorf("Child policy for locality %q does not exist in the balancer group", id)
bg.outgoingMu.Unlock()
return
}
Expand All @@ -418,7 +423,17 @@ func (bg *BalancerGroup) Remove(id string) {
}

if bg.deletedBalancerCache != nil {
if bg.logger.V(2) {
bg.logger.Infof("Adding child policy for locality %q to the balancer cache", id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}

bg.deletedBalancerCache.Add(id, sbToRemove, func() {
if bg.logger.V(2) {
bg.logger.Infof("Removing child policy for locality %q from the balancer cache after timeout", id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}

// A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby
Expand Down
7 changes: 7 additions & 0 deletions internal/cache/timeoutCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,10 @@ func (c *TimeoutCache) Clear(runCallback bool) {
entry.callback()
}
}

// Len returns the number of entries in the cache.
func (c *TimeoutCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.cache)
}
24 changes: 24 additions & 0 deletions internal/cache/timeoutCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (s) TestCacheExpire(t *testing.T) {
if gotV, ok := c.getForTesting(k); !ok || gotV.item != v {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", gotV.item, ok, v, true)
}
if l := c.Len(); l != 1 {
t.Fatalf("%d number of items in the cache, want 1", l)
}

select {
case <-callbackChan:
Expand All @@ -68,6 +71,9 @@ func (s) TestCacheExpire(t *testing.T) {
if _, ok := c.getForTesting(k); ok {
t.Fatalf("After Add(), after timeout, from cache got: _, %v, want _, %v", ok, false)
}
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}
}

// TestCacheRemove attempts to remove an existing entry from the cache and
Expand All @@ -83,6 +89,9 @@ func (s) TestCacheRemove(t *testing.T) {
if got, ok := c.getForTesting(k); !ok || got.item != v {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
}
if l := c.Len(); l != 1 {
t.Fatalf("%d number of items in the cache, want 1", l)
}

time.Sleep(testCacheTimeout / 2)

Expand All @@ -94,6 +103,9 @@ func (s) TestCacheRemove(t *testing.T) {
if _, ok := c.getForTesting(k); ok {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
}
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}

select {
case <-callbackChan:
Expand Down Expand Up @@ -133,6 +145,9 @@ func (s) TestCacheClearWithoutCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
}
}
if l := c.Len(); l != itemCount {
t.Fatalf("%d number of items in the cache, want %d", l, itemCount)
}

time.Sleep(testCacheTimeout / 2)
c.Clear(false)
Expand All @@ -142,6 +157,9 @@ func (s) TestCacheClearWithoutCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
}
}
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}

select {
case <-callbackChan:
Expand Down Expand Up @@ -188,6 +206,9 @@ func (s) TestCacheClearWithCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
}
}
if l := c.Len(); l != itemCount {
t.Fatalf("%d number of items in the cache, want %d", l, itemCount)
}

time.Sleep(testCacheTimeout / 2)
c.Clear(true)
Expand All @@ -197,6 +218,9 @@ func (s) TestCacheClearWithCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
}
}
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}

select {
case <-allGoroutineDone:
Expand Down

0 comments on commit 778e638

Please sign in to comment.