Skip to content

Commit

Permalink
Merge pull request #910 from nats-io/no_redelivery_from_remove
Browse files Browse the repository at this point in the history
Avoid redelivery right when removing a queue member
  • Loading branch information
kozlovic authored Aug 14, 2019
2 parents 0afbd8e + 0137945 commit b81b026
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
17 changes: 13 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,20 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
ss.Unlock()

if !ss.stan.isClustered || ss.stan.isLeader() {
// Calling this will sort current pending messages and ensure
// that the ackTimer is properly set. It does not necessarily
// mean that messages are going to be redelivered on the spot.
// Go over the list of queue subs to which we have transferred
// messages from the leaving member. We want to have those
// messages redelivered quickly.
for _, qsub := range qsubs {
ss.stan.performAckExpirationRedelivery(qsub, false)
qsub.Lock()
// Make the timer fire soon. performAckExpirationRedelivery
// will then re-order messages, etc..
fireIn := 100 * time.Millisecond
if qsub.ackTimer == nil {
ss.stan.setupAckTimer(qsub, fireIn)
} else {
qsub.ackTimer.Reset(fireIn)
}
qsub.Unlock()
}
}

Expand Down
85 changes: 83 additions & 2 deletions server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package server

import (
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -93,7 +96,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) {
select {
case <-rch2:
// ok
case <-time.After(100 * time.Millisecond):
case <-time.After(200 * time.Millisecond):
t.Fatal("Message should have been redelivered")
}
// Create 3rd member with higher AckWait than the 2nd
Expand All @@ -113,7 +116,7 @@ func TestQueueSubsWithDifferentAckWait(t *testing.T) {
select {
case <-rch3:
// ok
case <-time.After(100 * time.Millisecond):
case <-time.After(200 * time.Millisecond):
t.Fatal("Message should have been redelivered")
}
}
Expand Down Expand Up @@ -1080,3 +1083,81 @@ func TestQueueGroupNotStalledOnMemberLeaving(t *testing.T) {
t.Fatalf("Did not receive all msgs")
}
}

type checkStoreLookup struct {
stores.MsgStore
errCh chan error
}

func (s *checkStoreLookup) Lookup(seq uint64) (*pb.MsgProto, error) {
buf := make([]byte, 10000)
n := runtime.Stack(buf, false)
if strings.Contains(string(buf[:n]), "Remove") {
select {
case s.errCh <- fmt.Errorf("Lookup invoked from Remove"):
default:
}
}
return s.MsgStore.Lookup(seq)
}

func TestQueueNoRedeliveryDuringSubClose(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

errCh := make(chan error, 1)

c, err := s.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error creating channel: %v", err)
}
c.store.Msgs = &checkStoreLookup{MsgStore: c.store.Msgs, errCh: errCh}

sc1 := NewDefaultConnection(t)
defer sc1.Close()

ch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
if _, err := sc1.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
<-ch
if m.Sequence == 10 {
wg.Done()
}
}); err != nil {
t.Fatalf("Error on queue subscribe: %v", err)
}

sc2, err := stan.Connect(clusterName, clientName+"2")
if err != nil {
t.Fatalf("Error on connect")
}
defer sc2.Close()

if _, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {},
stan.SetManualAckMode()); err != nil {
t.Fatalf("Error on queue subscribe: %v", err)
}

// Send some messages, since sub1 does not ack due to channel
// being blocked and sub2 does not ack due to manual ack mode,
// messages should be distributed.
for i := 0; i < 10; i++ {
if err := sc2.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish")
}
}

if err := sc2.Close(); err != nil {
t.Fatalf("Error on publish")
}

close(ch)
wg.Wait()

select {
case e := <-errCh:
t.Fatal(e.Error())
default:
}
}

0 comments on commit b81b026

Please sign in to comment.