From 06a573caa0e34b74ed0c44cbc87d58ba7c0c77f7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Jul 2024 13:52:54 -0500 Subject: [PATCH] Prevent a server from panicking when the server is shutdown while we are initializing eventing. Signed-off-by: Derek Collison --- server/events.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/server/events.go b/server/events.go index 3ab75c56d7..2ebfc5ebac 100644 --- a/server/events.go +++ b/server/events.go @@ -1065,44 +1065,56 @@ func (s *Server) Node() string { // Tradeoff is subscription and interest graph events vs connect and // disconnect events, etc. func (s *Server) initEventTracking() { - if !s.EventsEnabled() { + // Capture sys in case we are shutdown while setting up. + s.mu.RLock() + sys := s.sys + s.mu.RUnlock() + + if sys == nil || sys.client == nil || sys.account == nil { return } // Create a system hash which we use for other servers to target us specifically. - s.sys.shash = getHash(s.info.Name) + sys.shash = getHash(s.info.Name) // This will be for all inbox responses. - subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*") + subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*") if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } - s.sys.inboxPre = subject + sys.inboxPre = subject // This is for remote updates for connection accounting. subject = fmt.Sprintf(accConnsEventSubjOld, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking for %s: %v", subject, err) + return } // This will be for responses for account info that we send out. subject = fmt.Sprintf(connsRespSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for broad requests to respond with number of subscriptions for a given subject. if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for statsz from others. subject = fmt.Sprintf(serverStatsSubj, "*") if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } else { // Keep track of this one. - s.sys.remoteStatsSub = sub + sys.remoteStatsSub = sub } + // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for servers entering lame-duck mode. // NOTE: This currently is handled in the same way as a server shutdown, but has @@ -1110,6 +1122,7 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(lameDuckEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for account claims updates. subscribeToUpdate := true @@ -1120,6 +1133,7 @@ func (s *Server) initEventTracking() { for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} { if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } } @@ -1127,6 +1141,7 @@ func (s *Server) initEventTracking() { // This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } monSrvc := map[string]sysMsgHandler{ "IDZ": s.idzReq, @@ -1180,10 +1195,12 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } subject = fmt.Sprintf(serverPingReqSubj, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } extractAccount := func(subject string) (string, error) { @@ -1276,6 +1293,7 @@ func (s *Server) initEventTracking() { for name, req := range monAccSrvc { if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } @@ -1284,6 +1302,7 @@ func (s *Server) initEventTracking() { // is only one that will answer. This breaks tests since we still forward on remote server connect. if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For now only the STATZ subject has an account specific ping equivalent. @@ -1301,6 +1320,7 @@ func (s *Server) initEventTracking() { }) })); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for updates when leaf nodes connect for a given account. This will @@ -1308,32 +1328,38 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For tracking remote latency measurements. - subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash) + subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil { s.Errorf("Error setting up internal latency tracking: %v", err) + return } // This is for simple debugging of number of subscribers that exist in the system. if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil { s.Errorf("Error setting up internal debug service for subscribers: %v", err) + return } // Listen for requests to reload the server configuration. subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil { s.Errorf("Error setting up server reload handler: %v", err) + return } // Client connection kick subject = fmt.Sprintf(clientKickReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil { s.Errorf("Error setting up client kick service: %v", err) + return } // Client connection LDM subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil { s.Errorf("Error setting up client LDM service: %v", err) + return } }