From 3dc9bea5a4d6031b530df63ecddd47ba2bfefe9e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 10 Nov 2021 12:01:31 -0700 Subject: [PATCH] [FIXED] Clustering: possible wrong pending count This is a regression introduced by PR #1213 which is in v0.23.0 (the latest release). When a queue subscription left the group and its last_sent was the same than the group's last_sent, the remaining queue member should have its last_sent updated. To do that, the replication of a "sent" event for this sequence was issued, which was wrong since it would possibly add a pending on the remaining queue sub. Moreover, this did not account for last_sent being 0, which with the aforementioned behavior, a "sent" of sequence 0 would be replicated causing the remaining queue sub to show a pending_count of 1. The fix for both wrong pending_count and original last_sent issue is to have all nodes (leader and followers) detect that when a member is removed, if that member's last_sent value was the one of the queue group, then update the runtime version of the remaining queue member. Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 280 ++++++++++++++++++++++++-------------- server/server.go | 17 +-- 2 files changed, 185 insertions(+), 112 deletions(-) diff --git a/server/clustering_test.go b/server/clustering_test.go index a2864e74..0d2711f4 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -8384,123 +8384,140 @@ func TestClusteringSnapshotQSubLastSent(t *testing.T) { } } -func TestClusteringMonitorQueueLastSentAfterHBTimeout(t *testing.T) { - resetPreviousHTTPConnections() - cleanupDatastore(t) - defer cleanupDatastore(t) - cleanupRaftLog(t) - defer cleanupRaftLog(t) +func TestClusteringMonitorQueueLastSentAndPendingAfterLeavingGroup(t *testing.T) { + for _, test := range []struct { + name string + hbtimeout bool + }{ + {"connection close", false}, + {"hb timeout", true}, + } { + t.Run(test.name, func(t *testing.T) { + resetPreviousHTTPConnections() + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) - // For this test, use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() - // Configure first server - s1sOpts := getTestDefaultOptsForClustering("a", true) - s1sOpts.ClientHBInterval = 100 * time.Millisecond - s1sOpts.ClientHBTimeout = 100 * time.Millisecond - s1sOpts.ClientHBFailCount = 5 - s1nOpts := defaultMonitorOptions - s1 := runServerWithOpts(t, s1sOpts, &s1nOpts) - defer s1.Shutdown() + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1sOpts.ClientHBInterval = 100 * time.Millisecond + s1sOpts.ClientHBTimeout = 100 * time.Millisecond + s1sOpts.ClientHBFailCount = 5 + s1nOpts := defaultMonitorOptions + s1 := runServerWithOpts(t, s1sOpts, &s1nOpts) + defer s1.Shutdown() - // Configure second server. - s2sOpts := getTestDefaultOptsForClustering("b", false) - s2sOpts.ClientHBInterval = 100 * time.Millisecond - s2sOpts.ClientHBTimeout = 100 * time.Millisecond - s2sOpts.ClientHBFailCount = 5 - s2nOpts := defaultMonitorOptions - s2nOpts.HTTPPort = monitorPort + 1 - s2 := runServerWithOpts(t, s2sOpts, &s2nOpts) - defer s2.Shutdown() + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2sOpts.ClientHBInterval = 100 * time.Millisecond + s2sOpts.ClientHBTimeout = 100 * time.Millisecond + s2sOpts.ClientHBFailCount = 5 + s2nOpts := defaultMonitorOptions + s2nOpts.HTTPPort = monitorPort + 1 + s2 := runServerWithOpts(t, s2sOpts, &s2nOpts) + defer s2.Shutdown() - // Configure third server. - s3sOpts := getTestDefaultOptsForClustering("c", false) - s3sOpts.ClientHBInterval = 100 * time.Millisecond - s3sOpts.ClientHBTimeout = 100 * time.Millisecond - s3sOpts.ClientHBFailCount = 5 - s3nOpts := defaultMonitorOptions - s3nOpts.HTTPPort = monitorPort + 2 - s3 := runServerWithOpts(t, s3sOpts, &s3nOpts) - defer s3.Shutdown() + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3sOpts.ClientHBInterval = 100 * time.Millisecond + s3sOpts.ClientHBTimeout = 100 * time.Millisecond + s3sOpts.ClientHBFailCount = 5 + s3nOpts := defaultMonitorOptions + s3nOpts.HTTPPort = monitorPort + 2 + s3 := runServerWithOpts(t, s3sOpts, &s3nOpts) + defer s3.Shutdown() - getLeader(t, 10*time.Second, s1, s2, s3) + getLeader(t, 10*time.Second, s1, s2, s3) - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatalf("Unexpected error on connect: %v", err) - } - defer nc.Close() - sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc)) - if err != nil { - t.Fatalf("Expected to connect correctly, got err %v", err) - } - defer sc.Close() + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Fatalf("Unexpected error on connect: %v", err) + } + defer nc.Close() + sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc)) + if err != nil { + t.Fatalf("Expected to connect correctly, got err %v", err) + } + defer sc.Close() - ch := make(chan bool, 1) - count := 0 - if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) { - count++ - if count == 3 { - ch <- true - } - }, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil { - t.Fatalf("Error on subscribe: %v", err) - } + ch := make(chan bool, 1) + count := 0 + if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) { + count++ + if count == 3 { + ch <- true + } + }, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } - for i := 0; i < 3; i++ { - sc.Publish("foo", []byte("msg")) - } + for i := 0; i < 3; i++ { + sc.Publish("foo", []byte("msg")) + } - if err := Wait(ch); err != nil { - t.Fatalf("Did not get all messages: %v", err) - } + if err := Wait(ch); err != nil { + t.Fatalf("Did not get all messages: %v", err) + } - checkLastSent := func() { - t.Helper() - waitFor(t, 2*time.Second, 100*time.Millisecond, func() error { - for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} { - resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON) - defer resp.Body.Close() - - cz := Channelz{} - if err := json.Unmarshal(body, &cz); err != nil { - return fmt.Errorf("Got an error unmarshalling the body: %v", err) - } - resp.Body.Close() + checkMonitor := func() { + t.Helper() + waitFor(t, 2*time.Second, 100*time.Millisecond, func() error { + for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} { + resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON) + defer resp.Body.Close() - sub := cz.Subscriptions[0] - if sub.LastSent != 3 { - return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent) - } + cz := Channelz{} + if err := json.Unmarshal(body, &cz); err != nil { + return fmt.Errorf("Got an error unmarshalling the body: %v", err) + } + resp.Body.Close() + + sub := cz.Subscriptions[0] + if sub.LastSent != 3 { + return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent) + } + if sub.PendingCount != 0 { + return fmt.Errorf("Unexpected pending_count: %v", sub.PendingCount) + } + } + return nil + }) } - return nil - }) - } - // Check that all see last_sent == 3 - checkLastSent() + // Check that all see last_sent == 3 and pending_count == 0 + checkMonitor() - // Start a new connection and create the same queue durable - sc2 := NewDefaultConnection(t) - defer sc2.Close() - if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {}, - stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil { - t.Fatalf("Error on subscribe: %v", err) - } + // Start a new connection and create the same queue durable + sc2 := NewDefaultConnection(t) + defer sc2.Close() + if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {}, + stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } - // Now close the underlying NATS connection to simulate loss of HB - // and for the server to close the connection. - nc.Close() + // Now either close the STAN connection or the underlying NATS connection + // to simulate loss of HB and for the server to close the connection. + if test.hbtimeout { + nc.Close() + } else { + sc.Close() + } - // Wait for the server to close the old client - waitForNumClients(t, s1, 1) - waitForNumClients(t, s2, 1) - waitForNumClients(t, s3, 1) + // Wait for the server to close the old client + waitForNumClients(t, s1, 1) + waitForNumClients(t, s2, 1) + waitForNumClients(t, s3, 1) - // Now make sure that all servers reflect that the sole queue member's - // last_sent is set to 3. - checkLastSent() + // Now make sure that all servers reflect that the sole queue member's + // last_sent is set to 3 and pending_count is 0. + checkMonitor() + }) + } } func TestClusteringQueueRedelivery(t *testing.T) { @@ -8684,3 +8701,66 @@ func TestClusteringQueueRedeliverySentAndAck(t *testing.T) { return nil }) } + +func TestClusteringQueueNoPendingCountIfNoMsg(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + servers := []*StanServer{s1, s2} + getLeader(t, 10*time.Second, servers...) + + sc := NewDefaultConnection(t) + defer sc.Close() + + // Create two queue subs + qsub1, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {}, + stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + if _, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {}, + stan.DurableName("dur")); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Now close the first + qsub1.Close() + + // We wait for more than the replication interval + time.Sleep(2 * testLazyReplicationInterval) + + // Make sure that the remaining queue sub on both servers does not show + // a pending count of 1. + waitFor(t, time.Second, 15*time.Millisecond, func() error { + for _, srv := range servers { + subs := srv.clients.getSubs(clientName) + if len(subs) != 1 { + return fmt.Errorf("2 queue subs still present") + } + qsub := subs[0] + qsub.RLock() + pending := len(qsub.acksPending) + qsub.RUnlock() + if pending != 0 { + return fmt.Errorf("Pending count should be 0, got %v", pending) + } + } + return nil + }) +} diff --git a/server/server.go b/server/server.go index 9cd34c00..2ee3750c 100644 --- a/server/server.go +++ b/server/server.go @@ -1175,6 +1175,10 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { if sub.stalled && qs.stalledSubCount > 0 { qs.stalledSubCount-- } + sub.RLock() + // Need to update if this member was the one with the last + // message of the group. + storageUpdate = sub.LastSent == qs.lastSent if standaloneOrLeader { // Set expiration in the past to force redelivery expirationTime := time.Now().UnixNano() - int64(time.Second) @@ -1182,10 +1186,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { // transferred to remaining queue subscribers. numQSubs := len(qs.subs) idx := 0 - sub.RLock() - // Need to update if this member was the one with the last - // message of the group. - storageUpdate = sub.LastSent == qs.lastSent sortedPendingMsgs := sub.makeSortedPendingMsgs() for _, pm := range sortedPendingMsgs { // Get one of the remaning queue subscribers. @@ -1226,8 +1226,8 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { idx = 0 } } - sub.RUnlock() } + sub.RUnlock() // Even for durable queue subscribers, if this is not the last // member, we need to delete from storage (we did that higher in // that function for non durable case). Issue #215. @@ -1247,13 +1247,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { qsub.Lock() qsub.LastSent = qs.lastSent qsub.store.UpdateSub(&qsub.SubState) - // In cluster mode, let send a "sent" event for this queue sub so that - // followers can have an updated version of the last sent, which otherwise - // may stay at 0 until new messages are delivered in some cases. - // See https://github.com/nats-io/nats-streaming-server/issues/1189 - if s.isClustered { - s.collectSentOrAck(qsub, true, qs.lastSent) - } qsub.Unlock() } qs.Unlock()