Skip to content

Commit

Permalink
[FIXED] Clustering: possible wrong pending count
Browse files Browse the repository at this point in the history
This is a regression introduced by PR #1213 which is in v0.23.0
(the latest release).

When a queue subscription left the group and its last_sent was the
same than the group's last_sent, the remaining queue member should
have its last_sent updated. To do that, the replication of a "sent"
event for this sequence was issued, which was wrong since it would
possibly add a pending on the remaining queue sub. Moreover, this
did not account for last_sent being 0, which with the aforementioned
behavior, a "sent" of sequence 0 would be replicated causing the
remaining queue sub to show a pending_count of 1.

The fix for both wrong pending_count and original last_sent issue
is to have all nodes (leader and followers) detect that when a
member is removed, if that member's last_sent value was the one
of the queue group, then update the runtime version of the remaining
queue member.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Nov 10, 2021
1 parent fcf3b54 commit 3dc9bea
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 112 deletions.
280 changes: 180 additions & 100 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8384,123 +8384,140 @@ func TestClusteringSnapshotQSubLastSent(t *testing.T) {
}
}

func TestClusteringMonitorQueueLastSentAfterHBTimeout(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)
func TestClusteringMonitorQueueLastSentAndPendingAfterLeavingGroup(t *testing.T) {
for _, test := range []struct {
name string
hbtimeout bool
}{
{"connection close", false},
{"hb timeout", true},
} {
t.Run(test.name, func(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()
// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ClientHBInterval = 100 * time.Millisecond
s1sOpts.ClientHBTimeout = 100 * time.Millisecond
s1sOpts.ClientHBFailCount = 5
s1nOpts := defaultMonitorOptions
s1 := runServerWithOpts(t, s1sOpts, &s1nOpts)
defer s1.Shutdown()
// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ClientHBInterval = 100 * time.Millisecond
s1sOpts.ClientHBTimeout = 100 * time.Millisecond
s1sOpts.ClientHBFailCount = 5
s1nOpts := defaultMonitorOptions
s1 := runServerWithOpts(t, s1sOpts, &s1nOpts)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ClientHBInterval = 100 * time.Millisecond
s2sOpts.ClientHBTimeout = 100 * time.Millisecond
s2sOpts.ClientHBFailCount = 5
s2nOpts := defaultMonitorOptions
s2nOpts.HTTPPort = monitorPort + 1
s2 := runServerWithOpts(t, s2sOpts, &s2nOpts)
defer s2.Shutdown()
// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ClientHBInterval = 100 * time.Millisecond
s2sOpts.ClientHBTimeout = 100 * time.Millisecond
s2sOpts.ClientHBFailCount = 5
s2nOpts := defaultMonitorOptions
s2nOpts.HTTPPort = monitorPort + 1
s2 := runServerWithOpts(t, s2sOpts, &s2nOpts)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ClientHBInterval = 100 * time.Millisecond
s3sOpts.ClientHBTimeout = 100 * time.Millisecond
s3sOpts.ClientHBFailCount = 5
s3nOpts := defaultMonitorOptions
s3nOpts.HTTPPort = monitorPort + 2
s3 := runServerWithOpts(t, s3sOpts, &s3nOpts)
defer s3.Shutdown()
// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ClientHBInterval = 100 * time.Millisecond
s3sOpts.ClientHBTimeout = 100 * time.Millisecond
s3sOpts.ClientHBFailCount = 5
s3nOpts := defaultMonitorOptions
s3nOpts.HTTPPort = monitorPort + 2
s3 := runServerWithOpts(t, s3sOpts, &s3nOpts)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)
getLeader(t, 10*time.Second, s1, s2, s3)

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
defer sc.Close()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
defer sc.Close()

ch := make(chan bool, 1)
count := 0
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
count++
if count == 3 {
ch <- true
}
}, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ch := make(chan bool, 1)
count := 0
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
count++
if count == 3 {
ch <- true
}
}, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 3; i++ {
sc.Publish("foo", []byte("msg"))
}
for i := 0; i < 3; i++ {
sc.Publish("foo", []byte("msg"))
}

if err := Wait(ch); err != nil {
t.Fatalf("Did not get all messages: %v", err)
}
if err := Wait(ch); err != nil {
t.Fatalf("Did not get all messages: %v", err)
}

checkLastSent := func() {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} {
resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON)
defer resp.Body.Close()

cz := Channelz{}
if err := json.Unmarshal(body, &cz); err != nil {
return fmt.Errorf("Got an error unmarshalling the body: %v", err)
}
resp.Body.Close()
checkMonitor := func() {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} {
resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON)
defer resp.Body.Close()

sub := cz.Subscriptions[0]
if sub.LastSent != 3 {
return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent)
}
cz := Channelz{}
if err := json.Unmarshal(body, &cz); err != nil {
return fmt.Errorf("Got an error unmarshalling the body: %v", err)
}
resp.Body.Close()

sub := cz.Subscriptions[0]
if sub.LastSent != 3 {
return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent)
}
if sub.PendingCount != 0 {
return fmt.Errorf("Unexpected pending_count: %v", sub.PendingCount)
}
}
return nil
})
}
return nil
})
}

// Check that all see last_sent == 3
checkLastSent()
// Check that all see last_sent == 3 and pending_count == 0
checkMonitor()

// Start a new connection and create the same queue durable
sc2 := NewDefaultConnection(t)
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Start a new connection and create the same queue durable
sc2 := NewDefaultConnection(t)
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close the underlying NATS connection to simulate loss of HB
// and for the server to close the connection.
nc.Close()
// Now either close the STAN connection or the underlying NATS connection
// to simulate loss of HB and for the server to close the connection.
if test.hbtimeout {
nc.Close()
} else {
sc.Close()
}

// Wait for the server to close the old client
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
waitForNumClients(t, s3, 1)
// Wait for the server to close the old client
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
waitForNumClients(t, s3, 1)

// Now make sure that all servers reflect that the sole queue member's
// last_sent is set to 3.
checkLastSent()
// Now make sure that all servers reflect that the sole queue member's
// last_sent is set to 3 and pending_count is 0.
checkMonitor()
})
}
}

func TestClusteringQueueRedelivery(t *testing.T) {
Expand Down Expand Up @@ -8684,3 +8701,66 @@ func TestClusteringQueueRedeliverySentAndAck(t *testing.T) {
return nil
})
}

func TestClusteringQueueNoPendingCountIfNoMsg(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

// Create two queue subs
qsub1, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {},
stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {},
stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close the first
qsub1.Close()

// We wait for more than the replication interval
time.Sleep(2 * testLazyReplicationInterval)

// Make sure that the remaining queue sub on both servers does not show
// a pending count of 1.
waitFor(t, time.Second, 15*time.Millisecond, func() error {
for _, srv := range servers {
subs := srv.clients.getSubs(clientName)
if len(subs) != 1 {
return fmt.Errorf("2 queue subs still present")
}
qsub := subs[0]
qsub.RLock()
pending := len(qsub.acksPending)
qsub.RUnlock()
if pending != 0 {
return fmt.Errorf("Pending count should be 0, got %v", pending)
}
}
return nil
})
}
17 changes: 5 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,17 +1175,17 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
if sub.stalled && qs.stalledSubCount > 0 {
qs.stalledSubCount--
}
sub.RLock()
// Need to update if this member was the one with the last
// message of the group.
storageUpdate = sub.LastSent == qs.lastSent
if standaloneOrLeader {
// Set expiration in the past to force redelivery
expirationTime := time.Now().UnixNano() - int64(time.Second)
// If there are pending messages in this sub, they need to be
// transferred to remaining queue subscribers.
numQSubs := len(qs.subs)
idx := 0
sub.RLock()
// Need to update if this member was the one with the last
// message of the group.
storageUpdate = sub.LastSent == qs.lastSent
sortedPendingMsgs := sub.makeSortedPendingMsgs()
for _, pm := range sortedPendingMsgs {
// Get one of the remaning queue subscribers.
Expand Down Expand Up @@ -1226,8 +1226,8 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
idx = 0
}
}
sub.RUnlock()
}
sub.RUnlock()
// Even for durable queue subscribers, if this is not the last
// member, we need to delete from storage (we did that higher in
// that function for non durable case). Issue #215.
Expand All @@ -1247,13 +1247,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
qsub.Lock()
qsub.LastSent = qs.lastSent
qsub.store.UpdateSub(&qsub.SubState)
// In cluster mode, let send a "sent" event for this queue sub so that
// followers can have an updated version of the last sent, which otherwise
// may stay at 0 until new messages are delivered in some cases.
// See https://github.com/nats-io/nats-streaming-server/issues/1189
if s.isClustered {
s.collectSentOrAck(qsub, true, qs.lastSent)
}
qsub.Unlock()
}
qs.Unlock()
Expand Down

0 comments on commit 3dc9bea

Please sign in to comment.