Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Clustering: possible wrong pending count #1220

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 180 additions & 100 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8384,123 +8384,140 @@ func TestClusteringSnapshotQSubLastSent(t *testing.T) {
}
}

func TestClusteringMonitorQueueLastSentAfterHBTimeout(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)
func TestClusteringMonitorQueueLastSentAndPendingAfterLeavingGroup(t *testing.T) {
for _, test := range []struct {
name string
hbtimeout bool
}{
{"connection close", false},
{"hb timeout", true},
} {
t.Run(test.name, func(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()
// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ClientHBInterval = 100 * time.Millisecond
s1sOpts.ClientHBTimeout = 100 * time.Millisecond
s1sOpts.ClientHBFailCount = 5
s1nOpts := defaultMonitorOptions
s1 := runServerWithOpts(t, s1sOpts, &s1nOpts)
defer s1.Shutdown()
// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ClientHBInterval = 100 * time.Millisecond
s1sOpts.ClientHBTimeout = 100 * time.Millisecond
s1sOpts.ClientHBFailCount = 5
s1nOpts := defaultMonitorOptions
s1 := runServerWithOpts(t, s1sOpts, &s1nOpts)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ClientHBInterval = 100 * time.Millisecond
s2sOpts.ClientHBTimeout = 100 * time.Millisecond
s2sOpts.ClientHBFailCount = 5
s2nOpts := defaultMonitorOptions
s2nOpts.HTTPPort = monitorPort + 1
s2 := runServerWithOpts(t, s2sOpts, &s2nOpts)
defer s2.Shutdown()
// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ClientHBInterval = 100 * time.Millisecond
s2sOpts.ClientHBTimeout = 100 * time.Millisecond
s2sOpts.ClientHBFailCount = 5
s2nOpts := defaultMonitorOptions
s2nOpts.HTTPPort = monitorPort + 1
s2 := runServerWithOpts(t, s2sOpts, &s2nOpts)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ClientHBInterval = 100 * time.Millisecond
s3sOpts.ClientHBTimeout = 100 * time.Millisecond
s3sOpts.ClientHBFailCount = 5
s3nOpts := defaultMonitorOptions
s3nOpts.HTTPPort = monitorPort + 2
s3 := runServerWithOpts(t, s3sOpts, &s3nOpts)
defer s3.Shutdown()
// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ClientHBInterval = 100 * time.Millisecond
s3sOpts.ClientHBTimeout = 100 * time.Millisecond
s3sOpts.ClientHBFailCount = 5
s3nOpts := defaultMonitorOptions
s3nOpts.HTTPPort = monitorPort + 2
s3 := runServerWithOpts(t, s3sOpts, &s3nOpts)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)
getLeader(t, 10*time.Second, s1, s2, s3)

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
defer sc.Close()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()
sc, err := stan.Connect(clusterName, "instance1", stan.NatsConn(nc))
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v", err)
}
defer sc.Close()

ch := make(chan bool, 1)
count := 0
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
count++
if count == 3 {
ch <- true
}
}, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ch := make(chan bool, 1)
count := 0
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
count++
if count == 3 {
ch <- true
}
}, stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 3; i++ {
sc.Publish("foo", []byte("msg"))
}
for i := 0; i < 3; i++ {
sc.Publish("foo", []byte("msg"))
}

if err := Wait(ch); err != nil {
t.Fatalf("Did not get all messages: %v", err)
}
if err := Wait(ch); err != nil {
t.Fatalf("Did not get all messages: %v", err)
}

checkLastSent := func() {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} {
resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON)
defer resp.Body.Close()

cz := Channelz{}
if err := json.Unmarshal(body, &cz); err != nil {
return fmt.Errorf("Got an error unmarshalling the body: %v", err)
}
resp.Body.Close()
checkMonitor := func() {
t.Helper()
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
for _, port := range []int{monitorPort, monitorPort + 1, monitorPort + 2} {
resp, body := getBodyEx(t, http.DefaultClient, "http", ChannelsPath+"?channel=foo&subs=1", port, http.StatusOK, expectedJSON)
defer resp.Body.Close()

sub := cz.Subscriptions[0]
if sub.LastSent != 3 {
return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent)
}
cz := Channelz{}
if err := json.Unmarshal(body, &cz); err != nil {
return fmt.Errorf("Got an error unmarshalling the body: %v", err)
}
resp.Body.Close()

sub := cz.Subscriptions[0]
if sub.LastSent != 3 {
return fmt.Errorf("Unexpected last_sent: %v", sub.LastSent)
}
if sub.PendingCount != 0 {
return fmt.Errorf("Unexpected pending_count: %v", sub.PendingCount)
}
}
return nil
})
}
return nil
})
}

// Check that all see last_sent == 3
checkLastSent()
// Check that all see last_sent == 3 and pending_count == 0
checkMonitor()

// Start a new connection and create the same queue durable
sc2 := NewDefaultConnection(t)
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Start a new connection and create the same queue durable
sc2 := NewDefaultConnection(t)
defer sc2.Close()
if _, err := sc2.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.DurableName("dur"), stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close the underlying NATS connection to simulate loss of HB
// and for the server to close the connection.
nc.Close()
// Now either close the STAN connection or the underlying NATS connection
// to simulate loss of HB and for the server to close the connection.
if test.hbtimeout {
nc.Close()
} else {
sc.Close()
}

// Wait for the server to close the old client
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
waitForNumClients(t, s3, 1)
// Wait for the server to close the old client
waitForNumClients(t, s1, 1)
waitForNumClients(t, s2, 1)
waitForNumClients(t, s3, 1)

// Now make sure that all servers reflect that the sole queue member's
// last_sent is set to 3.
checkLastSent()
// Now make sure that all servers reflect that the sole queue member's
// last_sent is set to 3 and pending_count is 0.
checkMonitor()
})
}
}

func TestClusteringQueueRedelivery(t *testing.T) {
Expand Down Expand Up @@ -8684,3 +8701,66 @@ func TestClusteringQueueRedeliverySentAndAck(t *testing.T) {
return nil
})
}

func TestClusteringQueueNoPendingCountIfNoMsg(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

// Create two queue subs
qsub1, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {},
stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if _, err := sc.QueueSubscribe("foo", "queue", func(m *stan.Msg) {},
stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close the first
qsub1.Close()

// We wait for more than the replication interval
time.Sleep(2 * testLazyReplicationInterval)

// Make sure that the remaining queue sub on both servers does not show
// a pending count of 1.
waitFor(t, time.Second, 15*time.Millisecond, func() error {
for _, srv := range servers {
subs := srv.clients.getSubs(clientName)
if len(subs) != 1 {
return fmt.Errorf("2 queue subs still present")
}
qsub := subs[0]
qsub.RLock()
pending := len(qsub.acksPending)
qsub.RUnlock()
if pending != 0 {
return fmt.Errorf("Pending count should be 0, got %v", pending)
}
}
return nil
})
}
17 changes: 5 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,17 +1175,17 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
if sub.stalled && qs.stalledSubCount > 0 {
qs.stalledSubCount--
}
sub.RLock()
// Need to update if this member was the one with the last
// message of the group.
storageUpdate = sub.LastSent == qs.lastSent
if standaloneOrLeader {
// Set expiration in the past to force redelivery
expirationTime := time.Now().UnixNano() - int64(time.Second)
// If there are pending messages in this sub, they need to be
// transferred to remaining queue subscribers.
numQSubs := len(qs.subs)
idx := 0
sub.RLock()
// Need to update if this member was the one with the last
// message of the group.
storageUpdate = sub.LastSent == qs.lastSent
sortedPendingMsgs := sub.makeSortedPendingMsgs()
for _, pm := range sortedPendingMsgs {
// Get one of the remaning queue subscribers.
Expand Down Expand Up @@ -1226,8 +1226,8 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
idx = 0
}
}
sub.RUnlock()
}
sub.RUnlock()
// Even for durable queue subscribers, if this is not the last
// member, we need to delete from storage (we did that higher in
// that function for non durable case). Issue #215.
Expand All @@ -1247,13 +1247,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
qsub.Lock()
qsub.LastSent = qs.lastSent
qsub.store.UpdateSub(&qsub.SubState)
// In cluster mode, let send a "sent" event for this queue sub so that
// followers can have an updated version of the last sent, which otherwise
// may stay at 0 until new messages are delivered in some cases.
// See https://github.com/nats-io/nats-streaming-server/issues/1189
if s.isClustered {
s.collectSentOrAck(qsub, true, qs.lastSent)
}
qsub.Unlock()
}
qs.Unlock()
Expand Down