From 31fd271bcf1054d747a57a560ebf226163366170 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 29 Jan 2022 19:30:16 -0700 Subject: [PATCH] [FIXED] Possible panic on message redelivery If the message redelivery callback fired and a client was marked with failed heartbeat and the subscription is closed (likely due to the server closing connection due to failed HBs), there was a race that could cause a panic trying to reset a timer that has been set to nil. Signed-off-by: Ivan Kozlovic --- server/server.go | 7 ++++++- server/server_redelivery_test.go | 33 ++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index be883ca7..80438007 100644 --- a/server/server.go +++ b/server/server.go @@ -3691,8 +3691,13 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo sub.Unlock() return } - // Sort our messages outstanding from acksPending, grab some state and unlock. sub.Lock() + // Subscriber could have been closed + if sub.ackTimer == nil { + sub.Unlock() + return + } + // Sort our messages outstanding from acksPending, grab some state and unlock. sortedPendingMsgs := sub.makeSortedPendingMsgs() if len(sortedPendingMsgs) == 0 { sub.clearAckTimer() diff --git a/server/server_redelivery_test.go b/server/server_redelivery_test.go index d715575c..cb52e196 100644 --- a/server/server_redelivery_test.go +++ b/server/server_redelivery_test.go @@ -16,6 +16,7 @@ package server import ( "encoding/json" "fmt" + "math/rand" "net" "runtime" "sync" @@ -1714,3 +1715,35 @@ func TestRedeliveryRaceWithAck(t *testing.T) { } } + +func TestNoPanicOnSubCloseWhileOnRedelivery(t *testing.T) { + s := runServer(t, clusterName) + defer s.Shutdown() + + sc := NewDefaultConnection(t) + defer sc.Close() + + if err := sc.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + for i := 0; i < 100; i++ { + sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, + stan.AckWait(ackWaitInMs(5)), + stan.SetManualAckMode(), + stan.DeliverAllAvailable()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Artificially pretend that the client had failed hearbeat + srvSub := s.clients.getSubs(clientName)[0] + srvSub.Lock() + srvSub.hasFailedHB = true + srvSub.Unlock() + + time.Sleep(time.Duration(rand.Intn(15)) * time.Millisecond) + sub.Close() + + waitForNumSubs(t, s, clientName, 0) + } +}