From c33a63dceef7d96c03ac95ee4b4bf66e979f0159 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 20 Jun 2024 02:41:30 -0700 Subject: [PATCH 1/2] MQTT leak failing test --- server/mqtt_test.go | 64 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index a92607ccc02..fbb4c669191 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -28,6 +28,8 @@ import ( "net" "os" "reflect" + "runtime" + "runtime/pprof" "strings" "sync" "testing" @@ -3045,14 +3047,30 @@ func TestMQTTCluster(t *testing.T) { } } -func testMQTTConnectDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool) { +func testMQTTConnectSubDisconnect(t *testing.T, o *Options, clientID string, clean bool, found bool, sub bool, qos byte) { t.Helper() mc, r := testMQTTConnect(t, &mqttConnInfo{clientID: clientID, cleanSess: clean}, o.MQTT.Host, o.MQTT.Port) testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, found) + if sub { + testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: qos}}, []byte{qos}) + } testMQTTDisconnectEx(t, mc, nil, false) mc.Close() } +func captureHeapProfile(filename string) { + f, _ := os.Create(filename) + defer f.Close() + runtime.GC() // Force garbage collection to get a clear picture + pprof.WriteHeapProfile(f) +} + +func printHeapUsage(label string) runtime.MemStats { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + fmt.Printf("%s Heap: allocs=%v, objects=%v\n", label, memStats.HeapAlloc, memStats.HeapObjects) + return memStats +} func TestMQTTClusterConnectDisconnectClean(t *testing.T) { nServers := 3 cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) @@ -3064,7 +3082,39 @@ func TestMQTTClusterConnectDisconnectClean(t *testing.T) { // specified. N := 100 for n := 0; n < N; n++ { - testMQTTConnectDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false) + testMQTTConnectSubDisconnect(t, cl.opts[rand.Intn(nServers)], clientID, true, false, false, 0) + } +} + +func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) { + nServers := 3 + cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", nServers) + defer cl.shutdown() + + time.Sleep(1 * time.Second) + + // initialize MQTT assets in the cluster + testMQTTConnectSubDisconnect(t, cl.opts[0], "init", true, false, false, 0) + runtime.GC() // Force garbage collection to get a clear picture + + memStats := printHeapUsage("BEFORE") + baseHeapAlloc := memStats.HeapAlloc + baseHeapObjects := memStats.HeapObjects + + N := 100 + for i := 0; i < N; i++ { + clientID := nuid.Next() + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2) + runtime.GC() // Force garbage collection to get a clear picture + + memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i)) + if memStats.HeapAlloc > 2*baseHeapAlloc || memStats.HeapObjects > 2*baseHeapObjects { + captureHeapProfile("AFTERLEAK.pprof") + t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)", + i, + baseHeapAlloc, memStats.HeapAlloc, memStats.HeapAlloc*100/baseHeapAlloc, + baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects) + } } } @@ -3081,13 +3131,13 @@ func TestMQTTClusterConnectDisconnectPersist(t *testing.T) { for n := 0; n < N; n++ { // First clean sessions on all servers for i := 0; i < nServers; i++ { - testMQTTConnectDisconnect(t, cl.opts[i], clientID, true, false) + testMQTTConnectSubDisconnect(t, cl.opts[i], clientID, true, false, false, 0) } - testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, false) - testMQTTConnectDisconnect(t, cl.opts[1], clientID, false, true) - testMQTTConnectDisconnect(t, cl.opts[2], clientID, false, true) - testMQTTConnectDisconnect(t, cl.opts[0], clientID, false, true) + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, false, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[1], clientID, false, true, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[2], clientID, false, true, false, 0) + testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, false, true, false, 0) } } From d1ed54035c4fa572462d5a119d91e99be454319b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 20 Jun 2024 09:59:33 -0700 Subject: [PATCH 2/2] wip --- server/mqtt_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index fbb4c669191..c6ce702b8c6 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -3068,7 +3068,7 @@ func captureHeapProfile(filename string) { func printHeapUsage(label string) runtime.MemStats { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) - fmt.Printf("%s Heap: allocs=%v, objects=%v\n", label, memStats.HeapAlloc, memStats.HeapObjects) + fmt.Printf("%s Heap: inuse=%v, objects=%v\n", label, memStats.HeapInuse, memStats.HeapObjects) return memStats } func TestMQTTClusterConnectDisconnectClean(t *testing.T) { @@ -3098,21 +3098,21 @@ func TestMQTTClusterConnectSubDisconnectClean(t *testing.T) { runtime.GC() // Force garbage collection to get a clear picture memStats := printHeapUsage("BEFORE") - baseHeapAlloc := memStats.HeapAlloc + baseHeapInuse := memStats.HeapInuse baseHeapObjects := memStats.HeapObjects - N := 100 + N := 1000000 for i := 0; i < N; i++ { clientID := nuid.Next() testMQTTConnectSubDisconnect(t, cl.opts[0], clientID, true, false, true, 2) runtime.GC() // Force garbage collection to get a clear picture memStats = printHeapUsage(fmt.Sprintf("AFTER %d", i)) - if memStats.HeapAlloc > 2*baseHeapAlloc || memStats.HeapObjects > 2*baseHeapObjects { + if memStats.HeapInuse > 100*baseHeapInuse || memStats.HeapObjects > 100*baseHeapObjects { captureHeapProfile("AFTERLEAK.pprof") t.Fatalf("after %d iterations heap alloc has grown from %v to %v (%v%%), objects from %v to %v (%v%%)", i, - baseHeapAlloc, memStats.HeapAlloc, memStats.HeapAlloc*100/baseHeapAlloc, + baseHeapInuse, memStats.HeapInuse, memStats.HeapInuse*100/baseHeapInuse, baseHeapObjects, memStats.HeapObjects, memStats.HeapObjects*100/baseHeapObjects) } }