From 386cbca50129e21ea2a314eef8a41640bccc73a3 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 23 Sep 2024 16:33:51 +0100 Subject: [PATCH 1/4] Add `NumInterest` to optimise searching for number of sublist entries Signed-off-by: Neil Twigg --- server/accounts.go | 4 +- server/client.go | 10 +-- server/mqtt.go | 8 +- server/sublist.go | 33 ++++++-- server/sublist_test.go | 172 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 211 insertions(+), 16 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index f9897034d28..08112f67a5e 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -846,8 +846,8 @@ func (a *Account) Interest(subject string) int { var nms int a.mu.RLock() if a.sl != nil { - res := a.sl.Match(subject) - nms = len(res.psubs) + len(res.qsubs) + np, nq := a.sl.NumInterest(subject) + nms = np + nq } a.mu.RUnlock() return nms diff --git a/server/client.go b/server/client.go index fa0b445d279..120720caaa0 100644 --- a/server/client.go +++ b/server/client.go @@ -3189,7 +3189,7 @@ func (c *client) processUnsub(arg []byte) error { func (c *client) checkDenySub(subject string) bool { if denied, ok := c.mperms.dcache[subject]; ok { return denied - } else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 { + } else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 { c.mperms.dcache[subject] = true return true } else { @@ -3711,13 +3711,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo allowed := true // Cache miss, check allow then deny as needed. if c.perms.pub.allow != nil { - r := c.perms.pub.allow.Match(subject) - allowed = len(r.psubs) != 0 + np, _ := c.perms.pub.allow.NumInterest(subject) + allowed = np != 0 } // If we have a deny list and are currently allowed, check that as well. if allowed && c.perms.pub.deny != nil { - r := c.perms.pub.deny.Match(subject) - allowed = len(r.psubs) == 0 + np, _ := c.perms.pub.deny.NumInterest(subject) + allowed = np == 0 } // If we are currently not allowed but we are tracking reply subjects diff --git a/server/mqtt.go b/server/mqtt.go index 1c6a98a2dbf..35c18ba154d 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -4354,13 +4354,13 @@ func generatePubPerms(perms *Permissions) *perm { func pubAllowed(perms *perm, subject string) bool { allowed := true if perms.allow != nil { - r := perms.allow.Match(subject) - allowed = len(r.psubs) != 0 + np, _ := perms.allow.NumInterest(subject) + allowed = np != 0 } // If we have a deny list and are currently allowed, check that as well. if allowed && perms.deny != nil { - r := perms.deny.Match(subject) - allowed = len(r.psubs) == 0 + np, _ := perms.deny.NumInterest(subject) + allowed = np == 0 } return allowed } diff --git a/server/sublist.go b/server/sublist.go index 7171eef27ad..86f5a5ad0f3 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -539,7 +539,14 @@ func (s *Sublist) MatchBytes(subject []byte) *SublistResult { // HasInterest will return whether or not there is any interest in the subject. // In cases where more detail is not required, this may be faster than Match. func (s *Sublist) HasInterest(subject string) bool { - return s.hasInterest(subject, true) + return s.hasInterest(subject, true, nil, nil) +} + +// NumInterest will return the number of subs/qsubs interested in the subject. +// In cases where more detail is not required, this may be faster than Match. +func (s *Sublist) NumInterest(subject string) (np, nq int) { + s.hasInterest(subject, true, &np, &nq) + return } func (s *Sublist) matchNoLock(subject string) *SublistResult { @@ -623,7 +630,7 @@ func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *Sublis return result } -func (s *Sublist) hasInterest(subject string, doLock bool) bool { +func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool { // Check cache first. if doLock { s.RLock() @@ -631,6 +638,10 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool { var matched bool if s.cache != nil { if r, ok := s.cache[subject]; ok { + if np != nil && nq != nil { + *np += len(r.psubs) + *nq += len(r.qsubs) + } matched = len(r.psubs)+len(r.qsubs) > 0 } } @@ -663,7 +674,7 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool { s.RLock() defer s.RUnlock() } - return matchLevelForAny(s.root, tokens) + return matchLevelForAny(s.root, tokens, np, nq) } // Remove entries in the cache until we are under the maximum. @@ -778,17 +789,21 @@ func matchLevel(l *level, toks []string, results *SublistResult) { } } -func matchLevelForAny(l *level, toks []string) bool { +func matchLevelForAny(l *level, toks []string, np, nq *int) bool { var pwc, n *node for i, t := range toks { if l == nil { return false } if l.fwc != nil { + if np != nil && nq != nil { + *np += len(l.fwc.psubs) + *nq += len(l.fwc.qsubs) + } return true } if pwc = l.pwc; pwc != nil { - if match := matchLevelForAny(pwc.next, toks[i+1:]); match { + if match := matchLevelForAny(pwc.next, toks[i+1:], np, nq); match { return true } } @@ -800,9 +815,17 @@ func matchLevelForAny(l *level, toks []string) bool { } } if n != nil { + if np != nil && nq != nil { + *np += len(n.psubs) + *nq += len(n.qsubs) + } return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 } if pwc != nil { + if np != nil && nq != nil { + *np += len(pwc.psubs) + *nq += len(pwc.qsubs) + } return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0 } return false diff --git a/server/sublist_test.go b/server/sublist_test.go index c5718f8b0d9..3c875d42b07 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1772,6 +1772,178 @@ func TestSublistHasInterest(t *testing.T) { sl.Remove(qsub) } +func TestSublistNumInterest(t *testing.T) { + sl := NewSublistWithCache() + fooSub := newSub("foo") + sl.Insert(fooSub) + + require_NumInterest := func(t *testing.T, subj string, wnp, wnq int) { + t.Helper() + np, nq := sl.NumInterest(subj) + require_Equal(t, np, wnp) + require_Equal(t, nq, wnq) + } + + // Expect to find that "foo" matches but "bar" doesn't. + // At this point nothing should be in the cache. + require_NumInterest(t, "foo", 1, 0) + require_NumInterest(t, "bar", 0, 0) + require_Equal(t, sl.cacheHits, 0) + + // Now call Match(), which will populate the cache. + sl.Match("foo") + require_Equal(t, sl.cacheHits, 0) + + // Future calls to HasInterest() should hit the cache now. + for i := uint64(1); i <= 5; i++ { + require_NumInterest(t, "foo", 1, 0) + require_Equal(t, sl.cacheHits, i) + } + + // Call Match on a subject we know there is no match. + sl.Match("bar") + require_NumInterest(t, "bar", 0, 0) + + // Remove fooSub and check interest again + sl.Remove(fooSub) + require_NumInterest(t, "foo", 0, 0) + + // Try with some wildcards + sub := newSub("foo.*") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove sub, there should be no interest + sl.Remove(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + sub = newSub("foo.>") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 1, 0) + + sl.Remove(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + sub = newSub("*.>") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 1, 0) + sl.Remove(sub) + + sub = newSub("*.bar") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + sl.Remove(sub) + + sub = newSub("*") + sl.Insert(sub) + require_NumInterest(t, "foo", 1, 0) + require_NumInterest(t, "foo.bar", 0, 0) + sl.Remove(sub) + + // Try with queues now. + qsub := newQSub("foo", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + + qsub2 := newQSub("foo", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 2) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove last. + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + + // With wildcards now + qsub = newQSub("foo.*", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Add another queue to the group + qsub2 = newQSub("foo.*", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 0) + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove last + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub = newQSub("foo.>", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + + // Add another queue to the group + qsub2 = newQSub("foo.>", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 2) + + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + + // Remove last + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub = newQSub("*.>", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + sl.Remove(qsub) + + qsub = newQSub("*.bar", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + sl.Remove(qsub) + + qsub = newQSub("*", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + sl.Remove(qsub) +} + func subsInit(pre string, toks []string) { var sub string for _, t := range toks { From 52af203c9c38c4a50a20a824340525e2917be9db Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 9 Oct 2024 17:26:39 -0600 Subject: [PATCH 2/4] [FIXED] LeafNode's queue group load balancing and Sublist.NumInterest While writing the test, I needed to make sure that each server in the hub has registered interest for 2 queue subscribers from the same group. I noticed that `Sublist.NumInterest()` (that I was invoking from `Account.Interest()` was returning 1, even after I knew that the propagation should have happened. It turns out that `NumInterest()` was returning the number of queue groups, not the number of queue subs in all those queue groups. For the leafnode queue balancing issue, the code was favoring local/routed queue subscriptions, so in the described issue, the message would always go from HUB1->HUB2->LEAF2->QSub instead of HUB1->LEAF1->QSub. Since we had another test that was a bit reversed where we had a HUB and LEAF1<->LEAF2 connecting to HUB and a qsub on both HUB and LEAF1 and requests originated from LEAF2, and we were expecting all responses to come from LEAF1 (instead of the responder on HUB), I went with the following approach: If the message originates from a client that connects to a server that has a connection from a remote LEAF, then we pick that LEAF the same as if it was a local client or routed server. However, if the client connects to a server that has a leaf connection to another server, then we keep track of the sub but do not sent to that one if we have local or routed qsubs. This makes the 2 tests pass, solving the new test and maintaining the behavior for the old test. Signed-off-by: Ivan Kozlovic --- server/client.go | 15 ++-- server/leafnode_test.go | 160 +++++++++++++++++++++++++++++++++------- server/sublist.go | 16 +++- server/sublist_test.go | 44 ++++++++++- 4 files changed, 194 insertions(+), 41 deletions(-) diff --git a/server/client.go b/server/client.go index 120720caaa0..f9ddbebc2d9 100644 --- a/server/client.go +++ b/server/client.go @@ -4649,17 +4649,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { if (src == LEAF || src == CLIENT) && dst == LEAF { + // Remember that leaf in case we don't find any other candidate. if rsub == nil { rsub = sub } continue } else { - c.addSubToRouteTargets(sub) - // Clear rsub since we added a sub. - rsub = nil - if flags&pmrCollectQueueNames != 0 { - queues = append(queues, sub.queue) + // We would be picking a route, but if we had remembered a "hub" leaf, + // then pick that one instead of the route. + if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() { + break } + rsub = sub } break } @@ -4708,8 +4709,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } if rsub != nil { - // If we are here we tried to deliver to a local qsub - // but failed. So we will send it to a remote or leaf node. + // We are here if we have selected a leaf or route as the destination, + // or if we tried to deliver to a local qsub but failed. c.addSubToRouteTargets(rsub) if flags&pmrCollectQueueNames != 0 { queues = append(queues, rsub.queue) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 2d8f2d83ecc..1eccbcb34dc 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2314,39 +2314,30 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { ncSrv1 := natsConnect(t, srv1.ClientURL()) defer ncSrv1.Close() natsQueueSub(t, ncSrv1, "foo", "queue", func(m *nats.Msg) { - m.Respond([]byte("from srv1")) + m.Data = []byte("from srv1") + m.RespondMsg(m) }) ncLeaf1 := natsConnect(t, leaf1.ClientURL()) defer ncLeaf1.Close() natsQueueSub(t, ncLeaf1, "foo", "queue", func(m *nats.Msg) { - m.Respond([]byte("from leaf1")) + m.Data = []byte("from leaf1") + m.RespondMsg(m) }) ncLeaf2 := natsConnect(t, leaf2.ClientURL()) defer ncLeaf2.Close() // Check that "foo" interest is available everywhere. - // For this test, we want to make sure that the 2 queue subs are - // registered on all servers, so we don't use checkSubInterest - // which would simply return "true" if there is any interest on "foo". - servers := []*Server{srv1, leaf1, leaf2} - checkFor(t, time.Second, 15*time.Millisecond, func() error { - for _, s := range servers { - acc, err := s.LookupAccount(globalAccountName) - if err != nil { - return err - } - acc.mu.RLock() - r := acc.sl.Match("foo") - ok := len(r.qsubs) == 1 && len(r.qsubs[0]) == 2 - acc.mu.RUnlock() - if !ok { - return fmt.Errorf("interest not propagated on %q", s.Name()) + for _, s := range []*Server{srv1, leaf1, leaf2} { + gacc := s.GlobalAccount() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != 2 { + return fmt.Errorf("Expected interest for %q to be 2, got %v", "foo", n) } - } - return nil - }) + return nil + }) + } // Send requests (from leaf2). For this test to make sure that // there is no duplicate, we want to make sure that we check for @@ -2360,15 +2351,22 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { checkSubInterest(t, leaf1, globalAccountName, "reply_subj", time.Second) checkSubInterest(t, leaf2, globalAccountName, "reply_subj", time.Second) - for i := 0; i < 5; i++ { + for i := 0; i < 100; i++ { // Now send the request - natsPubReq(t, ncLeaf2, "foo", sub.Subject, []byte("req")) + reqID := fmt.Sprintf("req.%d", i) + msg := nats.NewMsg("foo") + msg.Data = []byte("req") + msg.Header.Set("ReqId", reqID) + msg.Reply = sub.Subject + if err := ncLeaf2.PublishMsg(msg); err != nil { + t.Fatalf("Error on publish: %v", err) + } // Check that we get the reply replyMsg := natsNexMsg(t, sub, time.Second) - // But make sure we received only 1! - if otherReply, _ := sub.NextMsg(100 * time.Millisecond); otherReply != nil { - t.Fatalf("Received duplicate reply, first was %q, followed by %q", - replyMsg.Data, otherReply.Data) + // But make sure no duplicate. We do so by checking that the reply's + // header ReqId matches our current reqID. + if respReqID := replyMsg.Header.Get("ReqId"); respReqID != reqID { + t.Fatalf("Current request is %q, got duplicate with %q", reqID, respReqID) } // We also should have preferred the queue sub that is in the leaf cluster. if string(replyMsg.Data) != "from leaf1" { @@ -4010,6 +4008,114 @@ func TestLeafNodeInterestPropagationDaisychain(t *testing.T) { checkSubInterest(t, sAA, "$G", "foo", time.Second) // failure issue 2448 } +func TestLeafNodeQueueGroupDistribution(t *testing.T) { + hc := createClusterWithName(t, "HUB", 3) + defer hc.shutdown() + + // Now have a cluster of leafnodes with each one connecting to corresponding HUB(n) node. + c1 := ` + server_name: LEAF1 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port))) + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + server_name: LEAF2 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[1].LeafNode.Port))) + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + c3 := ` + server_name: LEAF3 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port))) + ln3, _ := RunServerWithConfig(lconf3) + defer ln3.Shutdown() + + // Check leaf cluster is formed and all connected to the HUB. + lnServers := []*Server{ln1, ln2, ln3} + checkClusterFormed(t, lnServers...) + for _, s := range lnServers { + checkLeafNodeConnected(t, s) + } + // Check each node in the hub has 1 connection from the leaf cluster. + for i := 0; i < 3; i++ { + checkLeafNodeConnectedCount(t, hc.servers[i], 1) + } + + // Create a client and qsub on LEAF1 and LEAF2. + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + var qsub1Count atomic.Int32 + natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) { + qsub1Count.Add(1) + }) + natsFlush(t, nc1) + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + var qsub2Count atomic.Int32 + natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) { + qsub2Count.Add(1) + }) + natsFlush(t, nc2) + + // Make sure that the propagation interest is done before sending. + for _, s := range hc.servers { + gacc := s.GlobalAccount() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != 2 { + return fmt.Errorf("Expected interest for %q to be 2, got %v", "foo", n) + } + return nil + }) + } + + sendAndCheck := func(idx int) { + t.Helper() + nchub := natsConnect(t, hc.servers[idx].ClientURL()) + defer nchub.Close() + total := 1000 + for i := 0; i < total; i++ { + natsPub(t, nchub, "foo", []byte("from hub")) + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total { + return fmt.Errorf("Expected %v messages, got %v", total, trecv) + } + return nil + }) + // Now that we have made sure that all messages were received, + // check that qsub1 and qsub2 are getting at least some. + if n := int(qsub1Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub1 to get some messages, but got %v", n) + } + if n := int(qsub2Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub2 to get some messages, but got %v", n) + } + // Reset the counters. + qsub1Count.Store(0) + qsub2Count.Store(0) + } + // Send from HUB1 + sendAndCheck(0) + // Send from HUB2 + sendAndCheck(1) + // Send from HUB3 + sendAndCheck(2) +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /* diff --git a/server/sublist.go b/server/sublist.go index 86f5a5ad0f3..5c1325cc681 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -640,7 +640,9 @@ func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool { if r, ok := s.cache[subject]; ok { if np != nil && nq != nil { *np += len(r.psubs) - *nq += len(r.qsubs) + for _, qsub := range r.qsubs { + *nq += len(qsub) + } } matched = len(r.psubs)+len(r.qsubs) > 0 } @@ -798,7 +800,9 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { if l.fwc != nil { if np != nil && nq != nil { *np += len(l.fwc.psubs) - *nq += len(l.fwc.qsubs) + for _, qsub := range l.fwc.qsubs { + *nq += len(qsub) + } } return true } @@ -817,14 +821,18 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { if n != nil { if np != nil && nq != nil { *np += len(n.psubs) - *nq += len(n.qsubs) + for _, qsub := range n.qsubs { + *nq += len(qsub) + } } return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 } if pwc != nil { if np != nil && nq != nil { *np += len(pwc.psubs) - *nq += len(pwc.qsubs) + for _, qsub := range pwc.qsubs { + *nq += len(qsub) + } } return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0 } diff --git a/server/sublist_test.go b/server/sublist_test.go index 3c875d42b07..e0642903de3 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1863,13 +1863,24 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo", 0, 2) require_NumInterest(t, "foo.bar", 0, 0) + // Add a second qsub to the second queue group + qsub3 := newQSub("foo", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 3) + require_NumInterest(t, "foo.bar", 0, 0) + // Remove first queue sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 2) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove second + sl.Remove(qsub2) require_NumInterest(t, "foo", 0, 1) require_NumInterest(t, "foo.bar", 0, 0) // Remove last. - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) @@ -1886,18 +1897,32 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 2) require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub3 = newQSub("foo.*", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 3) + require_NumInterest(t, "foo.bar.baz", 0, 0) + // Remove first queue sl.Remove(qsub) require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove second + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 1) require_NumInterest(t, "foo.bar.baz", 0, 0) // Remove last - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) require_NumInterest(t, "foo.bar.baz", 0, 0) + // With > wildcard qsub = newQSub("foo.>", "bar") sl.Insert(qsub) require_NumInterest(t, "foo", 0, 0) @@ -1911,14 +1936,27 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo.bar", 0, 2) require_NumInterest(t, "foo.bar.baz", 0, 2) + // Add another queue to second group. + qsub3 = newQSub("foo.>", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 3) + require_NumInterest(t, "foo.bar.baz", 0, 3) + // Remove first queue sl.Remove(qsub) require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 2) + + // Remove second + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 1) require_NumInterest(t, "foo.bar.baz", 0, 1) // Remove last - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) require_NumInterest(t, "foo.bar.baz", 0, 0) From 6681910cb5d7c8fd255dc84073930d704d9c2a3f Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 9 Oct 2024 21:48:11 -0600 Subject: [PATCH 3/4] Fixed some tests that would not close clients or shutdown servers, etc.. Signed-off-by: Ivan Kozlovic --- server/auth_callout_test.go | 33 ++++++++++++++++++++++++++------- server/auth_test.go | 1 + server/client_test.go | 5 ++++- server/gateway_test.go | 10 +++------- server/jwt_test.go | 1 + server/leafnode_test.go | 10 +++++++--- server/monitor_test.go | 9 +++++++++ server/server_test.go | 3 +++ 8 files changed, 54 insertions(+), 18 deletions(-) diff --git a/server/auth_callout_test.go b/server/auth_callout_test.go index b9282b4dc76..29b7d83b049 100644 --- a/server/auth_callout_test.go +++ b/server/auth_callout_test.go @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) { // This one will use callout since not defined in server config. nc := at.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) { // This one will use callout since not defined in server config. nc := at.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -388,6 +390,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) { nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"), nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"), ) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -439,6 +442,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) { require_NoError(t, err) nc := ac.Connect(nkeyOpt) + defer nc.Close() // Make sure that the callout was called. if atomic.LoadUint32(&callouts) != 1 { @@ -658,6 +662,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // Send correct token. This should switch us to the test account. nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken)) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -678,6 +683,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // Send the signing key token. This should switch us to the test account, but the user // is signed with the account signing key nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken)) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -697,6 +703,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // is signed with the account signing key nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken)) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -922,10 +929,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect(nats.UserInfo("dlc", "zzz")) + nc := ac.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() // Authorization services can optionally encrypt the responses using the server's public xkey. - ac.Connect(nats.UserInfo("dlc", "xxx")) + nc = ac.Connect(nats.UserInfo("dlc", "xxx")) + defer nc.Close() } func TestAuthCalloutOperatorModeEncryption(t *testing.T) { @@ -1017,10 +1026,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) { defer removeFile(t, creds) // This will receive an encrypted request to the auth service but send plaintext response. - ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA)) + nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA)) + defer nc.Close() // This will receive an encrypted request to the auth service and send an encrypted response. - ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB)) + nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB)) + defer nc.Close() } func TestAuthCalloutServerTags(t *testing.T) { @@ -1048,7 +1059,8 @@ func TestAuthCalloutServerTags(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect() + nc := ac.Connect() + defer nc.Close() tags := <-tch require_True(t, len(tags) == 2) @@ -1081,7 +1093,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect() + nc := ac.Connect() + defer nc.Close() cluster := <-ch require_True(t, cluster == "HUB") @@ -1184,7 +1197,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) { require_NoError(t, err) // This one will use callout since not defined in server config. - ac.Connect(nats.UserInfo("dlc", "zzz")) + nc := ac.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() checkSubsPending(t, sub, 0) checkAuthErrEvent := func(user, pass, reason string) { @@ -1244,6 +1258,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) { // Setup system user. snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() // Allow this connect event to pass us by.. time.Sleep(250 * time.Millisecond) @@ -1615,6 +1630,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) { // Send correct token. This should switch us to the A account. nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA")) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1626,6 +1642,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) { nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB")) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1703,6 +1720,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) { nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"), nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"), ) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1910,6 +1928,7 @@ func TestOperatorModeUserRevocation(t *testing.T) { // connect the system user sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds)) require_NoError(t, err) + defer sysNC.Close() // Bearer token etc.. // This is used by all users, and the customization will be in other connect args. diff --git a/server/auth_test.go b/server/auth_test.go index 15720144f18..05c3402f7d6 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) { // Make sure we connect ok and to the correct account. nc := natsConnect(t, s.ClientURL()) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) response := ServerAPIResponse{Data: &UserInfo{}} diff --git a/server/client_test.go b/server/client_test.go index 59e394f35fa..aa5daf2a71f 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) { } nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...) if expectedOk { + defer nc.Close() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2979,8 +2980,9 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { wait := make(chan error) - nca, err := nats.Connect(proxy.clientURL()) + nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose()) require_NoError(t, err) + defer nca.Close() nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) { wait <- err close(wait) @@ -2988,6 +2990,7 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { ncb, err := nats.Connect(s.ClientURL()) require_NoError(t, err) + defer ncb.Close() _, err = nca.Subscribe("test", func(msg *nats.Msg) { wait <- nil diff --git a/server/gateway_test.go b/server/gateway_test.go index 4c8eb774c68..b397bfedc10 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) { defer s.Shutdown() l := &captureFatalLogger{fatalCh: make(chan string, 1)} s.SetLogger(l, false, false) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - s.Start() - wg.Done() - }() + // This does not block + s.Start() select { case e := <-l.fatalCh: if !strings.Contains(e, errTxt) { @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) { t.Fatal("Should have got a fatal error") } s.Shutdown() - wg.Wait() + s.WaitForShutdown() } func TestGatewayListenError(t *testing.T) { diff --git a/server/jwt_test.go b/server/jwt_test.go index d45b1dd53e1..f4235a2b973 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) { for range time.NewTicker(200 * time.Millisecond).C { select { case <-ctx.Done(): + return default: } send(t) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 1eccbcb34dc..07577487857 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3042,6 +3042,7 @@ func TestLeafNodeWSSubPath(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { attempts <- r.URL.String() })) + defer ts.Close() u, _ := url.Parse(fmt.Sprintf("%v/some/path", ts.URL)) u.Scheme = "ws" lo2.LeafNode.Remotes = []*RemoteLeafOpts{ @@ -4773,6 +4774,7 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) { // Connect client to the hub. nc, err := nats.Connect(s.ClientURL()) require_NoError(t, err) + defer nc.Close() // This should not be seen on leafnode side since we only allow pub to "foo" _, err = nc.SubscribeSync("baz") @@ -6786,6 +6788,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t // Now connect and send responses from EFG in cloud. nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("efg", "p")) + defer nc.Close() for i := 0; i < 100; i++ { require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) @@ -6918,6 +6921,7 @@ func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *tes // Now connect and send responses from EFG in cloud. nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p")) + defer nc.Close() for i := 0; i < 100; i++ { require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) } @@ -7155,15 +7159,15 @@ func TestLeafNodeTwoRemotesToSameHubAccountWithClusters(t *testing.T) { nc := natsConnect(t, s.ClientURL(), nats.UserInfo(user, "pwd")) conns = append(conns, nc) } - for _, nc := range conns { - defer nc.Close() - } createConn(sh1, "HA") createConn(sh2, "HA") createConn(sp1, "A") createConn(sp2, "A") createConn(sp1, "B") createConn(sp2, "B") + for _, nc := range conns { + defer nc.Close() + } check := func(subConn *nats.Conn, subj string, checkA, checkB bool) { t.Helper() diff --git a/server/monitor_test.go b/server/monitor_test.go index c6c29245e58..7003de12316 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4864,6 +4864,7 @@ func TestServerIDZRequest(t *testing.T) { nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) require_NoError(t, err) + defer nc.Close() subject := fmt.Sprintf(serverPingReqSubj, "IDZ") resp, err := nc.Request(subject, nil, time.Second) @@ -5337,9 +5338,17 @@ func TestHealthzStatusError(t *testing.T) { // Intentionally causing an error in readyForConnections(). // Note: Private field access, taking advantage of having the tests in the same package. + s.mu.Lock() + sl := s.listener s.listener = nil + s.mu.Unlock() checkHealthzEndpoint(t, s.MonitorAddr().String(), http.StatusInternalServerError, "error") + + // Restore for proper shutdown. + s.mu.Lock() + s.listener = sl + s.mu.Unlock() } func TestHealthzStatusUnavailable(t *testing.T) { diff --git a/server/server_test.go b/server/server_test.go index d1ae37e0498..aa3b8dfbe21 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -225,6 +225,9 @@ func TestTLSMinVersionConfig(t *testing.T) { } opts = append(opts, nats.RootCAs("../test/configs/certs/ca.pem")) nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...) + if err == nil { + defer nc.Close() + } if expectedErr == nil { if err != nil { t.Fatalf("Unexpected error: %v", err) From 025bbc8df36eeae14e1458051bfa51c3b29b2190 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 10 Oct 2024 06:02:54 -0700 Subject: [PATCH 4/4] Update Go versions --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ecc2c29e6ae..91fadd4df45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ language: go go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - - "1.22.7" + - "1.22.8" - "1.21.13" go_import_path: github.com/nats-io/nats-server