Skip to content

Commit

Permalink
[FIXED] Make sure to always remove internal clients from the account …
Browse files Browse the repository at this point in the history
…regardless of kind. (#5566)

Internal clients for NRG (Raft Groups) were of type SYSTEM and were not
being removed from the system's account when they were closed, causing a
leak of closed client connections and memory buildup.

Resolves: #5471 

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Jun 19, 2024
2 parents c8090e7 + c822017 commit e2752d5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 35 deletions.
73 changes: 38 additions & 35 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5457,48 +5457,51 @@ func (c *client) closeConnection(reason ClosedState) {
// Unregister
srv.removeClient(c)

// Update remote subscriptions.
if acc != nil && (kind == CLIENT || kind == LEAF || kind == JETSTREAM) {
qsubs := map[string]*qsub{}
for _, sub := range subs {
// Call unsubscribe here to cleanup shadow subscriptions and such.
c.unsubscribe(acc, sub, true, false)
// Update route as normal for a normal subscriber.
if sub.queue == nil {
if !spoke {
srv.updateRouteSubscriptionMap(acc, sub, -1)
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
if acc != nil {
// Update remote subscriptions.
if kind == CLIENT || kind == LEAF || kind == JETSTREAM {
qsubs := map[string]*qsub{}
for _, sub := range subs {
// Call unsubscribe here to cleanup shadow subscriptions and such.
c.unsubscribe(acc, sub, true, false)
// Update route as normal for a normal subscriber.
if sub.queue == nil {
if !spoke {
srv.updateRouteSubscriptionMap(acc, sub, -1)
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
}
acc.updateLeafNodes(sub, -1)
} else {
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
// connected routes.
num := int32(1)
if kind == LEAF {
num = sub.qw
}
// TODO(dlc) - Better to use string builder?
key := bytesToString(sub.subject) + " " + bytesToString(sub.queue)
if esub, ok := qsubs[key]; ok {
esub.n += num
acc.updateLeafNodes(sub, -1)
} else {
qsubs[key] = &qsub{sub, num}
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
// connected routes.
num := int32(1)
if kind == LEAF {
num = sub.qw
}
key := keyFromSub(sub)
if esub, ok := qsubs[key]; ok {
esub.n += num
} else {
qsubs[key] = &qsub{sub, num}
}
}
}
}
// Process any qsubs here.
for _, esub := range qsubs {
if !spoke {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
// Process any qsubs here.
for _, esub := range qsubs {
if !spoke {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
}
}
acc.updateLeafNodes(esub.sub, -(esub.n))
}
acc.updateLeafNodes(esub.sub, -(esub.n))
}
// Always remove from the account, otherwise we can leak clients.
// Note that SYSTEM and ACCOUNT types from above cleanup their own subs.
if prev := acc.removeClient(c); prev == 1 {
srv.decActiveAccounts()
}
Expand Down
31 changes: 31 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,34 @@ func TestNRGElectionTimerAfterObserver(t *testing.T) {
require_True(t, etlr.After(before))
}
}

func TestNRGSystemClientCleanupFromAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomServer()
sacc := s.SystemAccount()

numClients := func() int {
sacc.mu.RLock()
defer sacc.mu.RUnlock()
return len(sacc.clients)
}

start := numClients()

var all []smGroup
for i := 0; i < 5; i++ {
rgName := fmt.Sprintf("TEST-%d", i)
rg := c.createRaftGroup(rgName, 3, newStateAdder)
all = append(all, rg)
rg.waitOnLeader()
}
for _, rg := range all {
for _, sm := range rg {
sm.node().Stop()
}
}
finish := numClients()
require_Equal(t, start, finish)
}

0 comments on commit e2752d5

Please sign in to comment.