From 0137945bbf94538eef8ff5b3ace6db76afbdd826 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 13 Aug 2019 15:06:48 -0600 Subject: [PATCH] Avoid redelivery right when removing a queue member When a queue member was removed and it had unacknowledged messages, those were reassigned to other queue members. We would then invoke the redelivery routine to reset the ack timer, but this would have the consequence of possibly doing redeliveries at that time. Instead, reset the queue members that had reassigned messages to fire shortly. Signed-off-by: Ivan Kozlovic --- server/server.go | 17 ++++++-- server/server_queue_test.go | 85 ++++++++++++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/server/server.go b/server/server.go index 112f3333..b9a4eb2b 100644 --- a/server/server.go +++ b/server/server.go @@ -1195,11 +1195,20 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { ss.Unlock() if !ss.stan.isClustered || ss.stan.isLeader() { - // Calling this will sort current pending messages and ensure - // that the ackTimer is properly set. It does not necessarily - // mean that messages are going to be redelivered on the spot. + // Go over the list of queue subs to which we have transferred + // messages from the leaving member. We want to have those + // messages redelivered quickly. for _, qsub := range qsubs { - ss.stan.performAckExpirationRedelivery(qsub, false) + qsub.Lock() + // Make the timer fire soon. performAckExpirationRedelivery + // will then re-order messages, etc.. + fireIn := 100 * time.Millisecond + if qsub.ackTimer == nil { + ss.stan.setupAckTimer(qsub, fireIn) + } else { + qsub.ackTimer.Reset(fireIn) + } + qsub.Unlock() } } diff --git a/server/server_queue_test.go b/server/server_queue_test.go index 140241f9..91076fbf 100644 --- a/server/server_queue_test.go +++ b/server/server_queue_test.go @@ -15,6 +15,9 @@ package server import ( "fmt" + "runtime" + "strings" + "sync" "sync/atomic" "testing" "time" @@ -93,7 +96,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) { select { case <-rch2: // ok - case <-time.After(100 * time.Millisecond): + case <-time.After(200 * time.Millisecond): t.Fatal("Message should have been redelivered") } // Create 3rd member with higher AckWait than the 2nd @@ -113,7 +116,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) { select { case <-rch3: // ok - case <-time.After(100 * time.Millisecond): + case <-time.After(200 * time.Millisecond): t.Fatal("Message should have been redelivered") } } @@ -1080,3 +1083,81 @@ func TestQueueGroupNotStalledOnMemberLeaving(t *testing.T) { t.Fatalf("Did not receive all msgs") } } + +type checkStoreLookup struct { + stores.MsgStore + errCh chan error +} + +func (s *checkStoreLookup) Lookup(seq uint64) (*pb.MsgProto, error) { + buf := make([]byte, 10000) + n := runtime.Stack(buf, false) + if strings.Contains(string(buf[:n]), "Remove") { + select { + case s.errCh <- fmt.Errorf("Lookup invoked from Remove"): + default: + } + } + return s.MsgStore.Lookup(seq) +} + +func TestQueueNoRedeliveryDuringSubClose(t *testing.T) { + s := runServer(t, clusterName) + defer s.Shutdown() + + errCh := make(chan error, 1) + + c, err := s.lookupOrCreateChannel("foo") + if err != nil { + t.Fatalf("Error creating channel: %v", err) + } + c.store.Msgs = &checkStoreLookup{MsgStore: c.store.Msgs, errCh: errCh} + + sc1 := NewDefaultConnection(t) + defer sc1.Close() + + ch := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(1) + if _, err := sc1.QueueSubscribe("foo", "bar", func(m *stan.Msg) { + <-ch + if m.Sequence == 10 { + wg.Done() + } + }); err != nil { + t.Fatalf("Error on queue subscribe: %v", err) + } + + sc2, err := stan.Connect(clusterName, clientName+"2") + if err != nil { + t.Fatalf("Error on connect") + } + defer sc2.Close() + + if _, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {}, + stan.SetManualAckMode()); err != nil { + t.Fatalf("Error on queue subscribe: %v", err) + } + + // Send some messages, since sub1 does not ack due to channel + // being blocked and sub2 does not ack due to manual ack mode, + // messages should be distributed. + for i := 0; i < 10; i++ { + if err := sc2.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish") + } + } + + if err := sc2.Close(); err != nil { + t.Fatalf("Error on publish") + } + + close(ch) + wg.Wait() + + select { + case e := <-errCh: + t.Fatal(e.Error()) + default: + } +}