Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Possible stall of queue member #863

Merged
merged 1 commit into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3238,14 +3238,14 @@ func findBestQueueSub(sl []*subState) *subState {

// Send a message to the queue group
// Assumes qs lock held for write
func (s *StanServer) sendMsgToQueueGroup(qs *queueState, m *pb.MsgProto, force bool) (*subState, bool, bool) {
func (s *StanServer) sendMsgToQueueGroup(qs *queueState, m *pb.MsgProto, force bool) (*subState, bool) {
sub := findBestQueueSub(qs.subs)
if sub == nil {
return nil, false, false
return nil, false
}
sub.Lock()
wasStalled := sub.stalled
didSend, sendMore := s.sendMsgToSub(sub, m, force)
didSend, _ := s.sendMsgToSub(sub, m, force)
// If this is not a redelivery and the sub was not stalled, but now is,
// bump the number of stalled members.
if !force && !wasStalled && sub.stalled {
Expand All @@ -3255,7 +3255,7 @@ func (s *StanServer) sendMsgToQueueGroup(qs *queueState, m *pb.MsgProto, force b
qs.lastSent = sub.LastSent
}
sub.Unlock()
return sub, didSend, sendMore
return sub, didSend
}

// processMsg will process a message, and possibly send to clients, etc.
Expand Down Expand Up @@ -3467,7 +3467,7 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
// otherwise this could cause a message to be redelivered to multiple members.
if !isClustered && qs != nil && !isStartup {
qs.Lock()
pick, sent, _ = s.sendMsgToQueueGroup(qs, m, forceDelivery)
pick, sent = s.sendMsgToQueueGroup(qs, m, forceDelivery)
qs.Unlock()
if pick == nil {
s.log.Errorf("[Client:%s] Unable to find queue subscriber for subid=%d", clientID, subID)
Expand Down Expand Up @@ -5108,7 +5108,7 @@ func (s *StanServer) sendAvailableMessagesToQueue(c *channel, qs *queueState) {
if nextMsg == nil {
break
}
if _, sent, sendMore := s.sendMsgToQueueGroup(qs, nextMsg, honorMaxInFlight); !sent || !sendMore {
if _, sent := s.sendMsgToQueueGroup(qs, nextMsg, honorMaxInFlight); !sent {
break
}
}
Expand Down
50 changes: 50 additions & 0 deletions server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,3 +1030,53 @@ func TestPersistentStoreNewOnHoldClearedAfterRestart(t *testing.T) {
}
}
}

func TestQueueGroupNotStalledOnMemberLeaving(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

sc1 := NewDefaultConnection(t)
defer sc1.Close()

sc2, err := stan.Connect(clusterName, clientName+"2")
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer sc2.Close()

total := 1000
for i := 0; i < total; i++ {
sc1.Publish("foo", []byte("hello"))
}

sub1GotIt := make(chan struct{}, 1)
if _, err := sc1.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
select {
case sub1GotIt <- struct{}{}:
default:
}
}, stan.DeliverAllAvailable(), stan.MaxInflight(5)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

ch := make(chan struct{}, 1)
waitForSub1 := true
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
if waitForSub1 {
waitForSub1 = false
<-sub1GotIt
sc1.Close()
}
if m.Sequence == uint64(total) {
ch <- struct{}{}
}
}, stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all msgs")
}
}