Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Duplicate shadow durable queue sub due to race #951

Merged
merged 1 commit into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,9 +907,7 @@ func (ss *subStore) Store(sub *subState) error {
return err
}

ss.Lock()
ss.updateState(sub)
ss.Unlock()

return nil
}
Expand Down Expand Up @@ -938,6 +936,9 @@ func (ss *subStore) updateState(sub *subState) {
// maybe due to upgrades from much older releases that had bugs?).
// So don't panic and use as the shadow the one with the highest LastSent
// value.
if qs.shadow != nil {
ss.stan.log.Warnf("Duplicate shadow durable queue consumer (subid=%v) for group %q", sub.ID, sub.QGroup)
}
if qs.shadow == nil || sub.LastSent > qs.lastSent {
qs.shadow = sub
}
Expand Down Expand Up @@ -2361,17 +2362,21 @@ func (s *StanServer) processRecoveredChannels(channels map[string]*stores.Recove
return nil, err
}
if !s.isClustered {
ss := channel.ss
ss.Lock()
// Get the recovered subscriptions for this channel.
for _, recSub := range recoveredChannel.Subscriptions {
sub := s.recoverOneSub(channel, recSub.Sub, recSub.Pending, nil)
if sub != nil {
// Subscribe to subscription ACKs
if err := sub.startAckSub(s.nca, s.processAckMsg); err != nil {
ss.Unlock()
return nil, err
}
allSubs = append(allSubs, sub)
}
}
ss.Unlock()
// Now that we have recovered possible subscriptions for this channel,
// check if we should start the delete timer.
if channel.activity != nil {
Expand Down Expand Up @@ -3013,17 +3018,17 @@ func (s *StanServer) checkClientHealth(clientID string) {
// close the client (connection). This locks the
// client object internally so unlock here.
client.Unlock()
// If clustered, thread operations through Raft.
if s.isClustered {
s.barrier(func() {
s.barrier(func() {
// If clustered, thread operations through Raft.
if s.isClustered {
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}); err != nil {
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
clientID, err)
}
})
} else {
s.closeClient(clientID)
}
} else {
s.closeClient(clientID)
}
})
return
}
} else {
Expand Down Expand Up @@ -4605,15 +4610,13 @@ func (s *StanServer) updateDurable(ss *subStore, sub *subState, clientID string)
if err != nil {
return err
}
ss.Lock()
// Do this only for durable subscribers (not durable queue subscribers).
if sub.isDurableSubscriber() {
// Add back into plain subscribers
ss.psubs = append(ss.psubs, sub)
}
// And in ackInbox lookup map.
ss.acks[sub.AckInbox] = sub
ss.Unlock()

return nil
}
Expand All @@ -4633,6 +4636,9 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
sub *subState
ss = c.ss
)

ss.Lock()

// Will be true for durable queue subscribers and durable subscribers alike.
isDurable := false
// Will be set to false for en existing durable subscriber or existing
Expand All @@ -4646,6 +4652,7 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
if strings.Contains(sr.DurableName, ":") {
s.log.Errorf("[Client:%s] Invalid DurableName (%q) for queue subscriber from %s",
sr.ClientID, sr.DurableName, sr.Subject)
ss.Unlock()
return nil, ErrInvalidDurName
}
isDurable = true
Expand All @@ -4656,7 +4663,6 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
}
// Lookup for an existing group. Only interested in situation where
// the group exist, but is empty and had a shadow subscriber.
ss.RLock()
qs := ss.qsubs[sr.QGroup]
if qs != nil {
qs.Lock()
Expand All @@ -4668,15 +4674,15 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
qs.Unlock()
setStartPos = false
}
ss.RUnlock()
} else if sr.DurableName != "" {
// Check for DurableSubscriber status
if sub = ss.LookupByDurable(durableKey(sr)); sub != nil {
if sub = ss.durables[durableKey(sr)]; sub != nil {
sub.RLock()
clientID := sub.ClientID
sub.RUnlock()
if clientID != "" {
s.log.Errorf("[Client:%s] Duplicate durable subscription registration", sr.ClientID)
ss.Unlock()
return nil, ErrDupDurable
}
setStartPos = false
Expand Down Expand Up @@ -4766,14 +4772,13 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
sub.ID = subID
err = s.addSubscription(ss, sub)
if err == nil && subID > 0 {
ss.Lock()
if subID >= c.nextSubID {
c.nextSubID = subID + 1
}
ss.Unlock()
}
}
}
ss.Unlock()
if err == nil && (!s.isClustered || s.isLeader()) {
err = sub.startAckSub(s.nca, s.processAckMsg)
if err == nil {
Expand Down
108 changes: 108 additions & 0 deletions server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,3 +1161,111 @@ func TestQueueNoRedeliveryDuringSubClose(t *testing.T) {
default:
}
}

func TestPersistentStoreDurableQueueSubRaceBetweenCreateAndClose(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)

opts := getTestDefaultOptsForPersistentStore()
s := runServerWithOpts(t, opts, nil)
defer shutdownRestartedServerOnTestExit(&s)

sc := NewDefaultConnection(t)
defer sc.Close()

qsub1, err := sc.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

cs := channelsGet(t, s.channels, "foo")
ss := &mockedSubStore{SubStore: cs.store.Subs}
cs.store.Subs = ss

// Will make the store create sub pause so that the addition of the new
// queue sub in the go routine will be delayed.
ss.Lock()
ss.ch = make(chan bool, 1)
ss.Unlock()
closeErrCh := make(chan error)
createErrCh := make(chan error)

sc2, err := stan.Connect(clusterName, clientName+"2")
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc2.Close()

// Since close operation uses a barrier that ensures that
// other operations complete, we need to start processing
// of qsub1.Close() start before we start the new queue sub,
// but ensure that it stops at one point. We will use the
// closeMu mutex to artificially block it.
s.closeMu.Lock()
go func() {
closeErrCh <- qsub1.Close()
}()
// Let the Close() proceed a bit...
time.Sleep(100 * time.Millisecond)
go func() {
_, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {}, stan.DurableName("dur"))
createErrCh <- err
}()

// Let the QueueSubscribe() proceed a bit..
time.Sleep(100 * time.Millisecond)

// Now release the qsub1 Close()
s.closeMu.Unlock()

// Let the Close() complete, but we have to release the CreateSub()
// blockage from a different go routine otherwise with the fix,
// the Close() would still be blocked.
go func() {
time.Sleep(100 * time.Millisecond)
// Now release the addition to the subStore of the second queue sub
ss.ch <- true
}()

// Wait for the close to complete
if err := <-closeErrCh; err != nil {
t.Fatalf("Error on close: %v", err)
}
// Wait for create to complete
if err := <-createErrCh; err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close 2nd queue sub (connection close will do that for us)
sc2.Close()
// We don't need that connection either.
sc.Close()

// Shutdown and restart the server and make sure that we did not
// introduce a 2nd shadow subscription.
s.Shutdown()
l := &captureWarnLogger{}
opts.CustomLogger = l
s = runServerWithOpts(t, opts, nil)

cs = channelsGet(t, s.channels, "foo")
qs := cs.ss.qsubs["dur:bar"]
qs.RLock()
hasShadow := qs.shadow != nil
qs.RUnlock()
if !hasShadow {
t.Fatalf("Should have a shadow subscription")
}
var hasDuplicate bool
l.Lock()
for _, w := range l.warnings {
if strings.Contains(w, "Duplicate shadow durable") {
hasDuplicate = true
break
}
}
l.Unlock()
if hasDuplicate {
t.Fatalf("Duplicate shadow subscription found!")
}
}
6 changes: 6 additions & 0 deletions server/server_storefailures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type mockedSubStore struct {
sync.RWMutex
fail bool
failFlushOnce bool
ch chan bool
}

func (ms *mockedStore) CreateChannel(name string) (*stores.Channel, error) {
Expand Down Expand Up @@ -253,7 +254,12 @@ func TestMsgLookupFailures(t *testing.T) {
func (ss *mockedSubStore) CreateSub(sub *spb.SubState) error {
ss.RLock()
fail := ss.fail
ch := ss.ch
ss.RUnlock()
if ch != nil {
// Wait for notification that we can continue
<-ch
}
if fail {
return fmt.Errorf("On purpose")
}
Expand Down
2 changes: 1 addition & 1 deletion server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe
c.lSeqChecked = false

for _, ss := range sc.Subscriptions {
s.recoverOneSub(c, ss.State, nil, ss.AcksPending)
c.ss.Lock()
s.recoverOneSub(c, ss.State, nil, ss.AcksPending)
if ss.State.ID >= c.nextSubID {
c.nextSubID = ss.State.ID + 1
}
Expand Down