Skip to content

Commit

Permalink
Prevent a server from panicking when the server is shutdown while we …
Browse files Browse the repository at this point in the history
…are initializing eventing.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed Jul 16, 2024
1 parent f58991c commit 06a573c
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,51 +1065,64 @@ func (s *Server) Node() string {
// Tradeoff is subscription and interest graph events vs connect and
// disconnect events, etc.
func (s *Server) initEventTracking() {
if !s.EventsEnabled() {
// Capture sys in case we are shutdown while setting up.
s.mu.RLock()
sys := s.sys
s.mu.RUnlock()

if sys == nil || sys.client == nil || sys.account == nil {
return
}
// Create a system hash which we use for other servers to target us specifically.
s.sys.shash = getHash(s.info.Name)
sys.shash = getHash(s.info.Name)

// This will be for all inbox responses.
subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*")
subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*")
if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
s.sys.inboxPre = subject
sys.inboxPre = subject
// This is for remote updates for connection accounting.
subject = fmt.Sprintf(accConnsEventSubjOld, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking for %s: %v", subject, err)
return
}
// This will be for responses for account info that we send out.
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for broad requests to respond with number of subscriptions for a given subject.
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for statsz from others.
subject = fmt.Sprintf(serverStatsSubj, "*")
if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
} else {
// Keep track of this one.
s.sys.remoteStatsSub = sub
sys.remoteStatsSub = sub
}

// Listen for all server shutdowns.
subject = fmt.Sprintf(shutdownEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for servers entering lame-duck mode.
// NOTE: This currently is handled in the same way as a server shutdown, but has
// a different subject in case we need to handle differently in future.
subject = fmt.Sprintf(lameDuckEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for account claims updates.
subscribeToUpdate := true
Expand All @@ -1120,13 +1133,15 @@ func (s *Server) initEventTracking() {
for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} {
if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
}
// Listen for ping messages that will be sent to all servers for statsz.
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
monSrvc := map[string]sysMsgHandler{
"IDZ": s.idzReq,
Expand Down Expand Up @@ -1180,10 +1195,12 @@ func (s *Server) initEventTracking() {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
subject = fmt.Sprintf(serverPingReqSubj, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
extractAccount := func(subject string) (string, error) {
Expand Down Expand Up @@ -1276,6 +1293,7 @@ func (s *Server) initEventTracking() {
for name, req := range monAccSrvc {
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}

Expand All @@ -1284,6 +1302,7 @@ func (s *Server) initEventTracking() {
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// For now only the STATZ subject has an account specific ping equivalent.
Expand All @@ -1301,39 +1320,46 @@ func (s *Server) initEventTracking() {
})
})); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// For tracking remote latency measurements.
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil {
s.Errorf("Error setting up internal latency tracking: %v", err)
return
}
// This is for simple debugging of number of subscribers that exist in the system.
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
return
}

// Listen for requests to reload the server configuration.
subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil {
s.Errorf("Error setting up server reload handler: %v", err)
return
}

// Client connection kick
subject = fmt.Sprintf(clientKickReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil {
s.Errorf("Error setting up client kick service: %v", err)
return
}
// Client connection LDM
subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil {
s.Errorf("Error setting up client LDM service: %v", err)
return
}
}

Expand Down

0 comments on commit 06a573c

Please sign in to comment.