Skip to content

Commit

Permalink
Avoid redelivery right when removing a queue member
Browse files Browse the repository at this point in the history
When a queue member was removed and it had unacknowledged messages,
those were reassigned to other queue members. We would then invoke
the redelivery routine to reset the ack timer, but this would have
the consequence of possibly doing redeliveries at that time.
Instead, reset the queue members that had reassigned messages to
fire shortly.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 13, 2019
1 parent 0afbd8e commit 0137945
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
17 changes: 13 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,20 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
ss.Unlock()

if !ss.stan.isClustered || ss.stan.isLeader() {
// Calling this will sort current pending messages and ensure
// that the ackTimer is properly set. It does not necessarily
// mean that messages are going to be redelivered on the spot.
// Go over the list of queue subs to which we have transferred
// messages from the leaving member. We want to have those
// messages redelivered quickly.
for _, qsub := range qsubs {
ss.stan.performAckExpirationRedelivery(qsub, false)
qsub.Lock()
// Make the timer fire soon. performAckExpirationRedelivery
// will then re-order messages, etc..
fireIn := 100 * time.Millisecond
if qsub.ackTimer == nil {
ss.stan.setupAckTimer(qsub, fireIn)
} else {
qsub.ackTimer.Reset(fireIn)
}
qsub.Unlock()
}
}

Expand Down
85 changes: 83 additions & 2 deletions server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package server

import (
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -93,7 +96,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) {
select {
case <-rch2:
// ok
case <-time.After(100 * time.Millisecond):
case <-time.After(200 * time.Millisecond):
t.Fatal("Message should have been redelivered")
}
// Create 3rd member with higher AckWait than the 2nd
Expand All @@ -113,7 +116,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) {
select {
case <-rch3:
// ok
case <-time.After(100 * time.Millisecond):
case <-time.After(200 * time.Millisecond):
t.Fatal("Message should have been redelivered")
}
}
Expand Down Expand Up @@ -1080,3 +1083,81 @@ func TestQueueGroupNotStalledOnMemberLeaving(t *testing.T) {
t.Fatalf("Did not receive all msgs")
}
}

type checkStoreLookup struct {
stores.MsgStore
errCh chan error
}

func (s *checkStoreLookup) Lookup(seq uint64) (*pb.MsgProto, error) {
buf := make([]byte, 10000)
n := runtime.Stack(buf, false)
if strings.Contains(string(buf[:n]), "Remove") {
select {
case s.errCh <- fmt.Errorf("Lookup invoked from Remove"):
default:
}
}
return s.MsgStore.Lookup(seq)
}

func TestQueueNoRedeliveryDuringSubClose(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

errCh := make(chan error, 1)

c, err := s.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error creating channel: %v", err)
}
c.store.Msgs = &checkStoreLookup{MsgStore: c.store.Msgs, errCh: errCh}

sc1 := NewDefaultConnection(t)
defer sc1.Close()

ch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
if _, err := sc1.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
<-ch
if m.Sequence == 10 {
wg.Done()
}
}); err != nil {
t.Fatalf("Error on queue subscribe: %v", err)
}

sc2, err := stan.Connect(clusterName, clientName+"2")
if err != nil {
t.Fatalf("Error on connect")
}
defer sc2.Close()

if _, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {},
stan.SetManualAckMode()); err != nil {
t.Fatalf("Error on queue subscribe: %v", err)
}

// Send some messages, since sub1 does not ack due to channel
// being blocked and sub2 does not ack due to manual ack mode,
// messages should be distributed.
for i := 0; i < 10; i++ {
if err := sc2.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish")
}
}

if err := sc2.Close(); err != nil {
t.Fatalf("Error on publish")
}

close(ch)
wg.Wait()

select {
case e := <-errCh:
t.Fatal(e.Error())
default:
}
}

0 comments on commit 0137945

Please sign in to comment.