Skip to content

Commit

Permalink
Cherry picks for v2.10.19 RC4 (#5769)
Browse files Browse the repository at this point in the history
Includes:

- #5760
- #5761
- #5763
- #5767
  • Loading branch information
wallyqs authored Aug 9, 2024
2 parents 5b92633 + 913e43f commit 0153f6a
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 51 deletions.
5 changes: 3 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3908,9 +3908,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
// Match may use the subject here to populate a cache, so can not use bytesToString here.
r = acc.sl.Match(string(c.pa.subject))
if len(r.psubs)+len(r.qsubs) > 0 {
c.in.results[string(c.pa.subject)] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
if len(c.in.results) >= maxResultCacheSize {
n := 0
for subject := range c.in.results {
delete(c.in.results, subject)
Expand All @@ -3919,6 +3918,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) {
}
}
}
// Then add the new cache entry.
c.in.results[string(c.pa.subject)] = r
}
}

Expand Down
61 changes: 38 additions & 23 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,9 +2027,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {

switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.processAckMsg(sseq, dseq, dc, reply, true)
// We handle replies for acks in updateAcks
skipAckReply = true
if !o.processAckMsg(sseq, dseq, dc, reply, true) {
// We handle replies for acks in updateAcks
skipAckReply = true
}
case bytes.HasPrefix(msg, AckNext):
o.processAckMsg(sseq, dseq, dc, _EMPTY_, true)
o.processNextMsgRequest(reply, msg[len(AckNext):])
Expand All @@ -2043,9 +2044,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) {
if buf := msg[len(AckTerm):]; len(buf) > 0 {
reason = string(bytes.TrimSpace(buf))
}
o.processTerm(sseq, dseq, dc, reason, reply)
// We handle replies for acks in updateAcks
skipAckReply = true
if !o.processTerm(sseq, dseq, dc, reason, reply) {
// We handle replies for acks in updateAcks
skipAckReply = true
}
}

// Ack the ack if requested.
Expand Down Expand Up @@ -2395,9 +2397,12 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
}

// Process a TERM
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) {
// Returns `true` if the ack was processed in place and the sender can now respond
// to the client, or `false` if there was an error or the ack is replicated (in which
// case the reply will be sent later).
func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool {
// Treat like an ack to suppress redelivery.
o.processAckMsg(sseq, dseq, dc, reply, false)
ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false)

o.mu.Lock()
defer o.mu.Unlock()
Expand All @@ -2420,11 +2425,14 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) {

j, err := json.Marshal(e)
if err != nil {
return
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}

subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
return ackedInPlace
}

// Introduce a small delay in when timer fires to check pending.
Expand Down Expand Up @@ -2733,11 +2741,15 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
o.sendAdvisory(o.ackEventT, j)
}

func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) {
// Process an ACK.
// Returns `true` if the ack was processed in place and the sender can now respond
// to the client, or `false` if there was an error or the ack is replicated (in which
// case the reply will be sent later).
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool {
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return
return false
}

// Check if this ack is above the current pointer to our next to deliver.
Expand All @@ -2749,9 +2761,14 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return
return false
}

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered (o.node != nil) this will be handled by
// processReplicatedAck after the ack has propagated.
ackInPlace := o.node == nil && o.retention != LimitsPolicy

var sgap, floor uint64
var needSignal bool

Expand Down Expand Up @@ -2790,7 +2807,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return
return ackInPlace
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
Expand Down Expand Up @@ -2822,22 +2839,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
case AckNone:
// FIXME(dlc) - This is error but do we care?
o.mu.Unlock()
return
return ackInPlace
}

// No ack replication, so we set reply to "" so that updateAcks does not
// send the reply. The caller will.
if ackInPlace {
reply = _EMPTY_
}
// Update underlying store.
o.updateAcks(dseq, sseq, reply)

// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
clustered := o.node != nil
o.mu.Unlock()

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && retention != LimitsPolicy {
if ackInPlace {
if sgap > 1 {
// FIXME(dlc) - This can very inefficient, will need to fix.
for seq := sseq; seq >= floor; seq-- {
Expand All @@ -2852,6 +2866,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if needSignal {
o.signalNewMessages()
}
return ackInPlace
}

// Determine if this is a truly filtered consumer. Modern clients will place filtered subjects
Expand Down
24 changes: 15 additions & 9 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,17 +1141,24 @@ type recoveryUpdates struct {
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

// Can not hold jetstream lock while trying to delete streams or consumers.
js.mu.Lock()
s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

// We only want to cleanup any orphans if we know we are current with the meta-leader.
meta := cc.meta
if meta == nil || meta.GroupLeader() == _EMPTY_ {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
return
}
if !meta.Healthy() {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader")
return
}

var streams []*stream
var consumers []*consumer

Expand All @@ -1164,8 +1171,7 @@ func (js *jetStream) checkForOrphans() {
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
if sa.consumers[o.String()] == nil {
consumers = append(consumers, o)
}
}
Expand Down Expand Up @@ -1369,7 +1375,7 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
time.AfterFunc(30*time.Second, js.checkForOrphans)
// Do a health check here as well.
go checkHealth()
continue
Expand Down
56 changes: 56 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ func TestJetStreamClusterConsumerNRGCleanup(t *testing.T) {
numStreams++
}
}
f.Close()
}
require_Equal(t, numConsumers, 0)
require_Equal(t, numStreams, 0)
Expand Down Expand Up @@ -2360,3 +2361,58 @@ func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) {
require_NoError(t, err)
require_Equal(t, sinfo.State.Msgs, 1)
}

func TestJetStreamClusterMetaSyncOrphanCleanup(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3,
func(serverName, clusterName, storeDir, conf string) string {
return fmt.Sprintf("%s\nserver_tags: [server:%s]", conf, serverName)
})
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Create a bunch of streams on S1
for i := 0; i < 100; i++ {
stream := fmt.Sprintf("TEST-%d", i)
subject := fmt.Sprintf("TEST.%d", i)
_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Subjects: []string{subject},
Storage: nats.FileStorage,
Placement: &nats.Placement{Tags: []string{"server:S-1"}},
})
require_NoError(t, err)
// Put in 10 msgs to each
for j := 0; j < 10; j++ {
_, err := js.Publish(subject, nil)
require_NoError(t, err)
}
}

// Now we will shutdown S1 and remove all of its meta-data to trip the condition.
s := c.serverByName("S-1")
require_True(t, s != nil)

sd := s.JetStreamConfig().StoreDir
nd := filepath.Join(sd, "$SYS", "_js_", "_meta_")
s.Shutdown()
s.WaitForShutdown()
os.RemoveAll(nd)
s = c.restartServer(s)
c.waitOnServerCurrent(s)
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_Equal(t, jsz.Streams, 100)

// These will be recreated by the meta layer, but if the orphan detection deleted them they will be empty,
// so check all streams to make sure they still have data.
acc := s.GlobalAccount()
var state StreamState
for i := 0; i < 100; i++ {
mset, err := acc.lookupStream(fmt.Sprintf("TEST-%d", i))
require_NoError(t, err)
mset.store.FastState(&state)
require_Equal(t, state.Msgs, 10)
}
}
5 changes: 3 additions & 2 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,9 +2677,8 @@ func (c *client) processInboundLeafMsg(msg []byte) {
// Go back to the sublist data structure.
if !ok {
r = c.acc.sl.Match(subject)
c.in.results[subject] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
if len(c.in.results) >= maxResultCacheSize {
n := 0
for subj := range c.in.results {
delete(c.in.results, subj)
Expand All @@ -2688,6 +2687,8 @@ func (c *client) processInboundLeafMsg(msg []byte) {
}
}
}
// Then add the new cache entry.
c.in.results[subject] = r
}

// Collect queue names if needed.
Expand Down
73 changes: 73 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10775,3 +10775,76 @@ func TestNoRaceJetStreamClusterMirrorSkipSequencingBug(t *testing.T) {
return nil
})
}

func TestNoRaceJetStreamStandaloneDontReplyToAckBeforeProcessingIt(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "WQ",
Discard: nats.DiscardNew,
MaxMsgsPerSubject: 1,
DiscardNewPerSubject: true,
Retention: nats.WorkQueuePolicy,
Subjects: []string{"queue.>"},
})
require_NoError(t, err)

// Keep this low since we are going to run as many go routines
// to consume, ack and republish the message.
total := 10000
// Populate the queue, one message per subject.
for i := 0; i < total; i++ {
js.Publish(fmt.Sprintf("queue.%d", i), []byte("hello"))
}

_, err = js.AddConsumer("WQ", &nats.ConsumerConfig{
Durable: "cons",
AckPolicy: nats.AckExplicitPolicy,
MaxWaiting: 20000,
MaxAckPending: -1,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("queue.>", "cons", nats.BindStream("WQ"))
require_NoError(t, err)

errCh := make(chan error, total)

var wg sync.WaitGroup
for iter := 0; iter < 3; iter++ {
wg.Add(total)
for i := 0; i < total; i++ {
go func() {
defer wg.Done()
msgs, err := sub.Fetch(1)
if err != nil {
errCh <- err
return
}
msg := msgs[0]
err = msg.AckSync()
if err != nil {
errCh <- err
return
}
_, err = js.Publish(msg.Subject, []byte("hello"))
if err != nil {
errCh <- err
return
}
}()
}
wg.Wait()
select {
case err := <-errCh:
t.Fatalf("Test failed, first error was: %v", err)
default:
// OK!
}
}
}
Loading

0 comments on commit 0153f6a

Please sign in to comment.