Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Jun 20, 2024
1 parent 336431b commit 90dde67
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 52 deletions.
13 changes: 0 additions & 13 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -978,20 +977,8 @@ func (a *Account) removeLeafNode(c *client) {
}
}

func (a *Account) LEVDunp() {
a.lmu.RLock()
m := map[string]int{}
for c := range a.clients {
m[c.kindString()+"/"+c.LEVNAME]++
}
a.lmu.RUnlock()
bb, _ := json.MarshalIndent(m, "", " ")
fmt.Printf("<>/<> Account %q remaining clients:\n%s\n", a.Name, string(bb))
}

// removeClient keeps our accounting of local active clients updated.
func (a *Account) removeClient(c *client) int {
a.LEVDunp()
a.mu.Lock()
n := len(a.clients)
delete(a.clients, c)
Expand Down
3 changes: 0 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ type client struct {
stats
gwReplyMapping
kind int
LEVNAME string
srv *Server
acc *Account
perms *permissions
Expand Down Expand Up @@ -778,7 +777,6 @@ func (c *client) registerWithAccount(acc *Account) error {
return ErrBadAccount
}
acc.mu.RLock()
fmt.Printf("<>/<> registerWithAccount acc:%q kind:%q (%p): %v remaining\n", acc.Name, c.kindString(), c, len(acc.clients))
bad := acc.sl == nil
acc.mu.RUnlock()
if bad {
Expand Down Expand Up @@ -1805,7 +1803,6 @@ func (c *client) markConnAsClosed(reason ClosedState) {
return
}
c.flags.set(connMarkedClosed)

// For a websocket client, unless we are told not to flush, enqueue
// a websocket CloseMessage based on the reason.
if !skipFlush && c.isWebsocket() && !c.ws.closeSent {
Expand Down
2 changes: 0 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
retention := cfg.Retention
mset.mu.RUnlock()

fmt.Printf("<>/<> addConsumerWithAssignment: %q (%q,%q) oname %q, action: %v\n", config.Name, config.FilterSubject, config.FilterSubjects, oname, action)

// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
// This can happen on startup with restored state where on meta replay we still do not have
// the assignment. Running in single server mode this always returns true.
Expand Down
46 changes: 28 additions & 18 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,8 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(true); err != nil {
if err := sess.clear(); err != nil {
// if err := sess.clear(true); err != nil {
c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1475,7 +1476,8 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)
// jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1702,12 +1704,13 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
}

// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
// func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
if noWait {
jsa.sendMsg(subj, nil)
return nil, nil
}
// if noWait {
// jsa.sendMsg(subj, nil)
// return nil, nil
// }

cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)
if err != nil {
Expand Down Expand Up @@ -3184,7 +3187,8 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear(noWait bool) error {
func (sess *mqttSession) clear() error {
// func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -3212,19 +3216,19 @@ func (sess *mqttSession) clear(noWait bool) error {
sess.mu.Unlock()

for _, dur := range durs {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
}

if seq > 0 {
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
Expand Down Expand Up @@ -3829,7 +3833,7 @@ CHECK:
// This Session lasts as long as the Network Connection. State data
// associated with this Session MUST NOT be reused in any subsequent
// Session.
if err := es.clear(false); err != nil {
if err := es.clear(); err != nil {
asm.removeSession(es, true)
return err
}
Expand Down Expand Up @@ -5189,17 +5193,20 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
Config: ConsumerConfig{
DeliverSubject: pubRelDeliverySubject,
Durable: mqttPubRelConsumerDurablePrefix + idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: pubRelSubject,
AckWait: ackWait,
MaxAckPending: maxAckPending,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
// Name: mqttPubRelConsumerDurablePrefix + idHash,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: pubRelSubject,
AckWait: ackWait,
MaxAckPending: maxAckPending,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
// InactiveThreshold: 1 * time.Hour,
},
}
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
// if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err)
return err
Expand Down Expand Up @@ -5305,9 +5312,12 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
MemoryStorage: opts.MQTT.ConsumerMemoryStorage,
},
}
// Name: durName,
// InactiveThreshold: 1 * time.Hour,
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
// if _, err := sess.jsa.createEphemeralConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err)
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions server/mqtt_ex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func mqttBenchWrapForMatrixField(
func (m mqttBenchMatrix) runMatrix(b *testing.B, bc mqttBenchContext, f func(*testing.B, *mqttBenchContext)) {
b.Helper()
f = mqttBenchWrapForMatrixField(&bc.MessageSize, m.MessageSize, f, func(size int) string {
return sizeKB(size)
return sizeKB(uint64(size))
})
f = mqttBenchWrapForMatrixField(&bc.Topics, m.Topics, f, func(n int) string {
return fmt.Sprintf("%dtopics", n)
Expand Down Expand Up @@ -387,7 +387,7 @@ func (m mqttBenchMatrix) QOS1Only() mqttBenchMatrix {
return m
}

func sizeKB(size int) string {
func sizeKB(size uint64) string {
unit := "B"
N := size
if size >= KB {
Expand Down
63 changes: 56 additions & 7 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"net"
"os"
"reflect"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -3047,14 +3049,30 @@ func TestMQTTCluster(t *testing.T) {
}
}

func testMQTTConnectDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool) {
func testMQTTConnectSubDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool, sub bool, qos byte) {
t.Helper()
mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: clean}, o.MQTT.Host, o.MQTT.Port)
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, found)
if sub {
testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: qos}}, []byte{qos})
}
testMQTTDisconnectEx(t, mc, nil, false)
mc.Close()
}

func captureHeapProfile(filename string) {
f, _ := os.Create(filename)
defer f.Close()
runtime.GC() // Force garbage collection to get a clear picture
pprof.WriteHeapProfile(f)
}

func printHeapUsage(label string) runtime.MemStats {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
fmt.Printf("%s Heap: allocs=%v, objects=%v\n", label, memStats.HeapAlloc, memStats.HeapObjects)
return memStats
}
func TestMQTTClusterConnectDisconnectClean(t *testing.T) {
nServers := 3
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers)
Expand All @@ -3066,7 +3084,38 @@ func TestMQTTClusterConnectDisconnectClean(t *testing.T) {
// specified.
N := 100
for n := 0; n < N; n++ {
testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false)
testMQTTConnectSubDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false, false, 0)
}
}

func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) {
nServers := 3
cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers)
defer cl.shutdown()

time.Sleep(1 * time.Second)

// initialize MQTT assets in the cluster
testMQTTConnectSubDisconnect(t, cl.opts[0], "init", true, false, false, 0)
runtime.GC() // Force garbage collection to get a clear picture

memStats := printHeapUsage("BEFORE")
baseHeapAlloc := memStats.HeapAlloc
baseHeapObjects := memStats.HeapObjects

for i := 0; i < 10; i++ {
clientID := nuid.Next()
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2)
runtime.GC() // Force garbage collection to get a clear picture

memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i))
if memStats.HeapAlloc > 2*baseHeapAlloc || memStats.HeapObjects > 2*baseHeapObjects {
captureHeapProfile("AFTERLEAK.pprof")
t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)",
i,
sizeKB(baseHeapAlloc), sizeKB(memStats.HeapAlloc), memStats.HeapAlloc*100/baseHeapAlloc,
baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects)
}
}
}

Expand All @@ -3083,13 +3132,13 @@ func TestMQTTClusterConnectDisconnectPersist(t *testing.T) {
for n := 0; n < N; n++ {
// First clean sessions on all servers
for i := 0; i < nServers; i++ {
testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false)
testMQTTConnectSubDisconnect(t, cl.opts[i], clientID, true, false, false, 0)
}

testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false)
testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true)
testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true)
testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true)
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, false, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[1], clientID, false, true, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[2], clientID, false, true, false, 0)
testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, true, false, 0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
acks: make(map[uint64]map[string]struct{}),
pae: make(map[uint64]*appendEntry),
s: s,
c: s.createInternalSystemClient("RAFT"),
c: s.createInternalSystemClient(),
js: s.getJetStream(),
sq: sq,
quit: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion server/sendq.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (sq *sendq) internalLoop() {

defer s.grWG.Done()

c := s.createInternalSystemClient("SENDQ")
c := s.createInternalSystemClient()
c.registerWithAccount(s.SystemAccount())
c.noIcb = true

Expand Down
8 changes: 3 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ func (s *Server) setSystemAccount(acc *Account) error {

s.sys = &internal{
account: acc,
client: s.createInternalSystemClient("SERVER"),
client: s.createInternalSystemClient(),
seq: 1,
sid: 1,
servers: make(map[string]*serverUpdate),
Expand Down Expand Up @@ -1784,10 +1784,8 @@ func (s *Server) setSystemAccount(acc *Account) error {
}

// Creates an internal system client.
func (s *Server) createInternalSystemClient(levname string) *client {
c := s.createInternalClient(SYSTEM)
c.LEVNAME = levname
return c
func (s *Server) createInternalSystemClient() *client {
return s.createInternalClient(SYSTEM)
}

// Creates an internal jetstream client.
Expand Down

0 comments on commit 90dde67

Please sign in to comment.