Skip to content

Commit

Permalink
Add pending to jsz response and statsz (#5923)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>

---------

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Sep 24, 2024
1 parent c3eda57 commit 4954cc0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
3 changes: 3 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,9 @@ func (s *Server) sendStatsz(subj string) {
Peer: getHash(leader),
Size: mg.ClusterSize(),
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jStat.Meta.Pending = ipq.len()
}
}
}
jStat.Limits = &s.getOpts().JetStreamLimits
Expand Down
44 changes: 44 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3903,3 +3903,47 @@ func TestJetStreamClusterAPILimitAdvisory(t *testing.T) {
require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set.
require_Equal(t, advisory.Dropped, queueLimit) // Configured queue limit.
}

func TestJetStreamPendingRequestsInJsz(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()
metaleader := c.leader()

sjs := metaleader.getJetStream()
sjs.mu.Lock()
sub := &subscription{
subject: []byte("$JS.API.VERY_SLOW"),
icb: func(sub *subscription, client *client, acc *Account, subject, reply string, rmsg []byte) {
select {
case <-client.srv.quitCh:
case <-time.After(time.Second * 3):
}
},
}
err := metaleader.getJetStream().apiSubs.Insert(sub)
sjs.mu.Unlock()

require_NoError(t, err)

nc, _ := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

inbox := nc.NewRespInbox()
msg := &nats.Msg{
Subject: "$JS.API.VERY_SLOW",
Reply: inbox,
}

// Fall short of hitting the API limit by a little bit,
// otherwise the requests get drained away.
for i := 0; i < JSDefaultRequestQueueLimit-10; i++ {
require_NoError(t, nc.PublishMsg(msg))
}

jsz, err := metaleader.Jsz(nil)
require_NoError(t, err)
require_True(t, jsz.Meta != nil)
require_NotEqual(t, jsz.Meta.Pending, 0)
}
7 changes: 7 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,9 @@ func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
if ci.Leader == s.info.Name {
v.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
v.Meta.Pending = ipq.len()
}
}
}
}
Expand Down Expand Up @@ -2794,6 +2797,7 @@ type MetaClusterInfo struct {
Peer string `json:"peer,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Size int `json:"cluster_size"`
Pending int `json:"pending"`
}

// JSInfo has detailed information on JetStream.
Expand Down Expand Up @@ -2996,6 +3000,9 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
if isLeader {
jsi.Meta.Replicas = ci.Replicas
}
if ipq := s.jsAPIRoutedReqs; ipq != nil {
jsi.Meta.Pending = ipq.len()
}
}
}

Expand Down

0 comments on commit 4954cc0

Please sign in to comment.