diff --git a/server/accounts.go b/server/accounts.go index ae7ca04d5c5..6f5a1a7026d 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -16,7 +16,6 @@ package server import ( "bytes" "encoding/hex" - "encoding/json" "errors" "fmt" "io" @@ -978,20 +977,8 @@ func (a *Account) removeLeafNode(c *client) { } } -func (a *Account) LEVDunp() { - a.lmu.RLock() - m := map[string]int{} - for c := range a.clients { - m[c.kindString()+"/"+c.LEVNAME]++ - } - a.lmu.RUnlock() - bb, _ := json.MarshalIndent(m, "", " ") - fmt.Printf("<>/<> Account %q remaining clients:\n%s\n", a.Name, string(bb)) -} - // removeClient keeps our accounting of local active clients updated. func (a *Account) removeClient(c *client) int { - a.LEVDunp() a.mu.Lock() n := len(a.clients) delete(a.clients, c) diff --git a/server/client.go b/server/client.go index 1ba21dbc5f9..5f41b4fb09e 100644 --- a/server/client.go +++ b/server/client.go @@ -229,7 +229,6 @@ type client struct { stats gwReplyMapping kind int - LEVNAME string srv *Server acc *Account perms *permissions @@ -778,7 +777,6 @@ func (c *client) registerWithAccount(acc *Account) error { return ErrBadAccount } acc.mu.RLock() - fmt.Printf("<>/<> registerWithAccount acc:%q kind:%q (%p): %v remaining\n", acc.Name, c.kindString(), c, len(acc.clients)) bad := acc.sl == nil acc.mu.RUnlock() if bad { @@ -1805,7 +1803,6 @@ func (c *client) markConnAsClosed(reason ClosedState) { return } c.flags.set(connMarkedClosed) - // For a websocket client, unless we are told not to flush, enqueue // a websocket CloseMessage based on the reason. if !skipFlush && c.isWebsocket() && !c.ws.closeSent { diff --git a/server/consumer.go b/server/consumer.go index b3e2b4ad993..846cd78829b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -721,8 +721,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri retention := cfg.Retention mset.mu.RUnlock() - fmt.Printf("<>/<> addConsumerWithAssignment: %q (%q,%q) oname %q, action: %v\n", config.Name, config.FilterSubject, config.FilterSubjects, oname, action) - // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. // This can happen on startup with restored state where on meta replay we still do not have // the assignment. Running in single server mode this always returns true. diff --git a/server/mqtt.go b/server/mqtt.go index 0774f80a2f0..76b949b561a 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1000,7 +1000,8 @@ func (s *Server) mqttHandleClosedClient(c *client) { // This needs to be done outside of any lock. if doClean { - if err := sess.clear(true); err != nil { + if err := sess.clear(); err != nil { + // if err := sess.clear(true); err != nil { c.Errorf(err.Error()) } } @@ -1475,7 +1476,8 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // Opportunistically delete the old (legacy) consumer, from v2.10.10 and // before. Ignore any errors that might arise. rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id - jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true) + // jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true) + jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) // Create a new, uniquely names consumer for retained messages for this // server. The prior one will expire eventually. @@ -1702,12 +1704,13 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon } // if noWait is specified, does not wait for the JS response, returns nil -func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { +// func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) { +func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) { subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName) - if noWait { - jsa.sendMsg(subj, nil) - return nil, nil - } + // if noWait { + // jsa.sendMsg(subj, nil) + // return nil, nil + // } cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil) if err != nil { @@ -3184,7 +3187,8 @@ func (sess *mqttSession) save() error { // // Runs from the client's readLoop. // Lock not held on entry, but session is in the locked map. -func (sess *mqttSession) clear(noWait bool) error { +func (sess *mqttSession) clear() error { + // func (sess *mqttSession) clear(noWait bool) error { var durs []string var pubRelDur string @@ -3212,19 +3216,19 @@ func (sess *mqttSession) clear(noWait bool) error { sess.mu.Unlock() for _, dur := range durs { - if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) { + if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) { return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err) } } if pubRelDur != _EMPTY_ { - _, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait) + _, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur) if isErrorOtherThan(err, JSConsumerNotFoundErr) { return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err) } } if seq > 0 { - err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait) + err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true) // Ignore the various errors indicating that the message (or sequence) // is already deleted, can happen in a cluster. if isErrorOtherThan(err, JSSequenceNotFoundErrF) { @@ -3829,7 +3833,7 @@ CHECK: // This Session lasts as long as the Network Connection. State data // associated with this Session MUST NOT be reused in any subsequent // Session. - if err := es.clear(false); err != nil { + if err := es.clear(); err != nil { asm.removeSession(es, true) return err } @@ -5189,17 +5193,20 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error { Config: ConsumerConfig{ DeliverSubject: pubRelDeliverySubject, Durable: mqttPubRelConsumerDurablePrefix + idHash, - AckPolicy: AckExplicit, - DeliverPolicy: DeliverNew, - FilterSubject: pubRelSubject, - AckWait: ackWait, - MaxAckPending: maxAckPending, - MemoryStorage: opts.MQTT.ConsumerMemoryStorage, + // Name: mqttPubRelConsumerDurablePrefix + idHash, + AckPolicy: AckExplicit, + DeliverPolicy: DeliverNew, + FilterSubject: pubRelSubject, + AckWait: ackWait, + MaxAckPending: maxAckPending, + MemoryStorage: opts.MQTT.ConsumerMemoryStorage, + // InactiveThreshold: 1 * time.Hour, }, } if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } + // if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil { if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err) return err @@ -5305,9 +5312,12 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string, MemoryStorage: opts.MQTT.ConsumerMemoryStorage, }, } + // Name: durName, + // InactiveThreshold: 1 * time.Hour, if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } + // if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil { if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err) return nil, nil, err diff --git a/server/mqtt_ex_test.go b/server/mqtt_ex_test.go index 7ddcf0aa0a4..21fd65bac73 100644 --- a/server/mqtt_ex_test.go +++ b/server/mqtt_ex_test.go @@ -343,7 +343,7 @@ func mqttBenchWrapForMatrixField( func (m mqttBenchMatrix) runMatrix(b *testing.B, bc mqttBenchContext, f func(*testing.B, *mqttBenchContext)) { b.Helper() f = mqttBenchWrapForMatrixField(&bc.MessageSize, m.MessageSize, f, func(size int) string { - return sizeKB(size) + return sizeKB(uint64(size)) }) f = mqttBenchWrapForMatrixField(&bc.Topics, m.Topics, f, func(n int) string { return fmt.Sprintf("%dtopics", n) @@ -387,7 +387,7 @@ func (m mqttBenchMatrix) QOS1Only() mqttBenchMatrix { return m } -func sizeKB(size int) string { +func sizeKB(size uint64) string { unit := "B" N := size if size >= KB { diff --git a/server/mqtt_test.go b/server/mqtt_test.go index af776b2fdbc..dd3c55aca7f 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -28,6 +28,8 @@ import ( "net" "os" "reflect" + "runtime" + "runtime/pprof" "strconv" "strings" "sync" @@ -3047,14 +3049,30 @@ func TestMQTTCluster(t *testing.T) { } } -func testMQTTConnectDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool) { +func testMQTTConnectSubDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool, sub bool, qos byte) { t.Helper() mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: clean}, o.MQTT.Host, o.MQTT.Port) testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, found) + if sub { + testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: qos}}, []byte{qos}) + } testMQTTDisconnectEx(t, mc, nil, false) mc.Close() } +func captureHeapProfile(filename string) { + f, _ := os.Create(filename) + defer f.Close() + runtime.GC() // Force garbage collection to get a clear picture + pprof.WriteHeapProfile(f) +} + +func printHeapUsage(label string) runtime.MemStats { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + fmt.Printf("%s Heap: allocs=%v, objects=%v\n", label, memStats.HeapAlloc, memStats.HeapObjects) + return memStats +} func TestMQTTClusterConnectDisconnectClean(t *testing.T) { nServers := 3 cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) @@ -3066,7 +3084,38 @@ func TestMQTTClusterConnectDisconnectClean(t *testing.T) { // specified. N := 100 for n := 0; n < N; n++ { - testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false) + testMQTTConnectSubDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false, false, 0) + } +} + +func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) { + nServers := 3 + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) + defer cl.shutdown() + + time.Sleep(1 * time.Second) + + // initialize MQTT assets in the cluster + testMQTTConnectSubDisconnect(t, cl.opts[0], "init", true, false, false, 0) + runtime.GC() // Force garbage collection to get a clear picture + + memStats := printHeapUsage("BEFORE") + baseHeapAlloc := memStats.HeapAlloc + baseHeapObjects := memStats.HeapObjects + + for i := 0; i < 10; i++ { + clientID := nuid.Next() + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2) + runtime.GC() // Force garbage collection to get a clear picture + + memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i)) + if memStats.HeapAlloc > 2*baseHeapAlloc || memStats.HeapObjects > 2*baseHeapObjects { + captureHeapProfile("AFTERLEAK.pprof") + t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)", + i, + sizeKB(baseHeapAlloc), sizeKB(memStats.HeapAlloc), memStats.HeapAlloc*100/baseHeapAlloc, + baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects) + } } } @@ -3083,13 +3132,13 @@ func TestMQTTClusterConnectDisconnectPersist(t *testing.T) { for n := 0; n < N; n++ { // First clean sessions on all servers for i := 0; i < nServers; i++ { - testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false) + testMQTTConnectSubDisconnect(t, cl.opts[i], clientID, true, false, false, 0) } - testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false) - testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true) - testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true) - testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true) + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, false, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[1], clientID, false, true, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[2], clientID, false, true, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, true, false, 0) } } diff --git a/server/raft.go b/server/raft.go index 894a91e4d37..291c3c80c8e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -380,7 +380,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient("RAFT"), + c: s.createInternalSystemClient(), js: s.getJetStream(), sq: sq, quit: make(chan struct{}), diff --git a/server/sendq.go b/server/sendq.go index dfd4fc73884..0287c5548a7 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -44,7 +44,7 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() - c := s.createInternalSystemClient("SENDQ") + c := s.createInternalSystemClient() c.registerWithAccount(s.SystemAccount()) c.noIcb = true diff --git a/server/server.go b/server/server.go index f4316148397..fc8c886381c 100644 --- a/server/server.go +++ b/server/server.go @@ -1734,7 +1734,7 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys = &internal{ account: acc, - client: s.createInternalSystemClient("SERVER"), + client: s.createInternalSystemClient(), seq: 1, sid: 1, servers: make(map[string]*serverUpdate), @@ -1784,10 +1784,8 @@ func (s *Server) setSystemAccount(acc *Account) error { } // Creates an internal system client. -func (s *Server) createInternalSystemClient(levname string) *client { - c := s.createInternalClient(SYSTEM) - c.LEVNAME = levname - return c +func (s *Server) createInternalSystemClient() *client { + return s.createInternalClient(SYSTEM) } // Creates an internal jetstream client.