From 3c66e8e8183218262bec77c549262774fc6d8752 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 7 Aug 2024 17:11:40 +0100 Subject: [PATCH 1/4] In client and leafnode results cache, populate new entry after pruning Otherwise we might add the entry into the cache and then immediately prune it afterwards, depending on luck/map iteration order. Signed-off-by: Neil Twigg --- server/client.go | 5 +++-- server/leafnode.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/server/client.go b/server/client.go index 3c48a0ae9e2..b6894c09e3e 100644 --- a/server/client.go +++ b/server/client.go @@ -3908,9 +3908,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { // Match may use the subject here to populate a cache, so can not use bytesToString here. r = acc.sl.Match(string(c.pa.subject)) if len(r.psubs)+len(r.qsubs) > 0 { - c.in.results[string(c.pa.subject)] = r // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { + if len(c.in.results) >= maxResultCacheSize { n := 0 for subject := range c.in.results { delete(c.in.results, subject) @@ -3919,6 +3918,8 @@ func (c *client) processInboundClientMsg(msg []byte) (bool, bool) { } } } + // Then add the new cache entry. + c.in.results[string(c.pa.subject)] = r } } diff --git a/server/leafnode.go b/server/leafnode.go index 3e66909c9c5..e40cfcab894 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2677,9 +2677,8 @@ func (c *client) processInboundLeafMsg(msg []byte) { // Go back to the sublist data structure. if !ok { r = c.acc.sl.Match(subject) - c.in.results[subject] = r // Prune the results cache. Keeps us from unbounded growth. Random delete. - if len(c.in.results) > maxResultCacheSize { + if len(c.in.results) >= maxResultCacheSize { n := 0 for subj := range c.in.results { delete(c.in.results, subj) @@ -2688,6 +2687,8 @@ func (c *client) processInboundLeafMsg(msg []byte) { } } } + // Then add the new cache entry. + c.in.results[subject] = r } // Collect queue names if needed. From 14bf800a3ad6d0962578c260c03ca756c8c99279 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 7 Aug 2024 15:46:06 -0600 Subject: [PATCH 2/4] [FIXED] JetStream: sending reply to Ack before processed (standalone) The server would send the reply back to the client before removing message from the stream, which could cause some issues if the client expected the message to be removed after the AckSync() call returns. For instance, a WorkQueue that would have a max message of 1 per subject and discard new message would fail a client that got a message, ack it, and re-puplish this message. Depending on timing, the client may get and error `maximum messages per subject exceeded`. Used @pcsegal description of the problem and example code to write a test that demonstrated the issue before the fix. Resolves #5702 Signed-off-by: Ivan Kozlovic --- server/consumer.go | 61 ++++++++++++++++++++++-------------- server/norace_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 23 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a2693470cf2..f8a9b2e7562 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2027,9 +2027,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - o.processAckMsg(sseq, dseq, dc, reply, true) - // We handle replies for acks in updateAcks - skipAckReply = true + if !o.processAckMsg(sseq, dseq, dc, reply, true) { + // We handle replies for acks in updateAcks + skipAckReply = true + } case bytes.HasPrefix(msg, AckNext): o.processAckMsg(sseq, dseq, dc, _EMPTY_, true) o.processNextMsgRequest(reply, msg[len(AckNext):]) @@ -2043,9 +2044,10 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { if buf := msg[len(AckTerm):]; len(buf) > 0 { reason = string(bytes.TrimSpace(buf)) } - o.processTerm(sseq, dseq, dc, reason, reply) - // We handle replies for acks in updateAcks - skipAckReply = true + if !o.processTerm(sseq, dseq, dc, reason, reply) { + // We handle replies for acks in updateAcks + skipAckReply = true + } } // Ack the ack if requested. @@ -2395,9 +2397,12 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { } // Process a TERM -func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) { +// Returns `true` if the ack was processed in place and the sender can now respond +// to the client, or `false` if there was an error or the ack is replicated (in which +// case the reply will be sent later). +func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool { // Treat like an ack to suppress redelivery. - o.processAckMsg(sseq, dseq, dc, reply, false) + ackedInPlace := o.processAckMsg(sseq, dseq, dc, reply, false) o.mu.Lock() defer o.mu.Unlock() @@ -2420,11 +2425,14 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) { j, err := json.Marshal(e) if err != nil { - return + // We had an error during the marshal, so we can't send the advisory, + // but we still need to tell the caller that the ack was processed. + return ackedInPlace } subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name o.sendAdvisory(subj, j) + return ackedInPlace } // Introduce a small delay in when timer fires to check pending. @@ -2733,11 +2741,15 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { o.sendAdvisory(o.ackEventT, j) } -func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) { +// Process an ACK. +// Returns `true` if the ack was processed in place and the sender can now respond +// to the client, or `false` if there was an error or the ack is replicated (in which +// case the reply will be sent later). +func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample bool) bool { o.mu.Lock() if o.closed { o.mu.Unlock() - return + return false } // Check if this ack is above the current pointer to our next to deliver. @@ -2749,9 +2761,14 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() - return + return false } + // Let the owning stream know if we are interest or workqueue retention based. + // If this consumer is clustered (o.node != nil) this will be handled by + // processReplicatedAck after the ack has propagated. + ackInPlace := o.node == nil && o.retention != LimitsPolicy + var sgap, floor uint64 var needSignal bool @@ -2790,7 +2807,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b // no-op if dseq <= o.adflr || sseq <= o.asflr { o.mu.Unlock() - return + return ackInPlace } if o.maxp > 0 && len(o.pending) >= o.maxp { needSignal = true @@ -2822,22 +2839,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b case AckNone: // FIXME(dlc) - This is error but do we care? o.mu.Unlock() - return + return ackInPlace } + // No ack replication, so we set reply to "" so that updateAcks does not + // send the reply. The caller will. + if ackInPlace { + reply = _EMPTY_ + } // Update underlying store. o.updateAcks(dseq, sseq, reply) - - // In case retention changes for a stream, this ought to have been updated - // using the consumer lock to avoid a race. - retention := o.retention - clustered := o.node != nil o.mu.Unlock() - // Let the owning stream know if we are interest or workqueue retention based. - // If this consumer is clustered this will be handled by processReplicatedAck - // after the ack has propagated. - if !clustered && mset != nil && retention != LimitsPolicy { + if ackInPlace { if sgap > 1 { // FIXME(dlc) - This can very inefficient, will need to fix. for seq := sseq; seq >= floor; seq-- { @@ -2852,6 +2866,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b if needSignal { o.signalNewMessages() } + return ackInPlace } // Determine if this is a truly filtered consumer. Modern clients will place filtered subjects diff --git a/server/norace_test.go b/server/norace_test.go index e4d118261d4..738abda46f3 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10775,3 +10775,76 @@ func TestNoRaceJetStreamClusterMirrorSkipSequencingBug(t *testing.T) { return nil }) } + +func TestNoRaceJetStreamStandaloneDontReplyToAckBeforeProcessingIt(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "WQ", + Discard: nats.DiscardNew, + MaxMsgsPerSubject: 1, + DiscardNewPerSubject: true, + Retention: nats.WorkQueuePolicy, + Subjects: []string{"queue.>"}, + }) + require_NoError(t, err) + + // Keep this low since we are going to run as many go routines + // to consume, ack and republish the message. + total := 10000 + // Populate the queue, one message per subject. + for i := 0; i < total; i++ { + js.Publish(fmt.Sprintf("queue.%d", i), []byte("hello")) + } + + _, err = js.AddConsumer("WQ", &nats.ConsumerConfig{ + Durable: "cons", + AckPolicy: nats.AckExplicitPolicy, + MaxWaiting: 20000, + MaxAckPending: -1, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("queue.>", "cons", nats.BindStream("WQ")) + require_NoError(t, err) + + errCh := make(chan error, total) + + var wg sync.WaitGroup + for iter := 0; iter < 3; iter++ { + wg.Add(total) + for i := 0; i < total; i++ { + go func() { + defer wg.Done() + msgs, err := sub.Fetch(1) + if err != nil { + errCh <- err + return + } + msg := msgs[0] + err = msg.AckSync() + if err != nil { + errCh <- err + return + } + _, err = js.Publish(msg.Subject, []byte("hello")) + if err != nil { + errCh <- err + return + } + }() + } + wg.Wait() + select { + case err := <-errCh: + t.Fatalf("Test failed, first error was: %v", err) + default: + // OK! + } + } +} From 50ab0ad2cd6908b00125aa4fc1f5d19e3a8874c4 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 8 Aug 2024 09:51:30 +0100 Subject: [PATCH 3/4] Remove `ordered` constraint now that we can use `cmp.Ordered` Signed-off-by: Neil Twigg --- server/test_test.go | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/server/test_test.go b/server/test_test.go index cf7c5e9baae..1dc3a4e90ab 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -14,6 +14,7 @@ package server import ( + "cmp" "fmt" "math/rand" "net/url" @@ -133,7 +134,7 @@ func require_Len(t testing.TB, a, b int) { } } -func require_LessThan[T ordered](t *testing.T, a, b T) { +func require_LessThan[T cmp.Ordered](t *testing.T, a, b T) { t.Helper() if a >= b { t.Fatalf("require %v to be less than %v", a, b) @@ -349,17 +350,3 @@ func runSolicitLeafServerToURL(surl string) (*Server, *Options) { o.LeafNode.ReconnectInterval = 100 * time.Millisecond return RunServer(&o), &o } - -// ordered is a constraint that permits any ordered type. -// -// To avoid a dependency on go1.21+ or golang.org/x/exp, we copy the ordered -// interface def from go 1.21.3:src/cmp/cmp.go (https://pkg.go.dev/cmp#Ordered). -// -// When this repo is updated to go 1.21+, this should be deleted and references -// replaced by cmp.Ordered. -type ordered interface { - ~int | ~int8 | ~int16 | ~int32 | ~int64 | - ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr | - ~float32 | ~float64 | - ~string -} From 913e43f2211968a0e0469884958b6b5e551ed5ca Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 8 Aug 2024 13:47:24 -0700 Subject: [PATCH 4/4] When checking for orphaned assets in a cluster, make sure we are properly synched with the leader before proceeding. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 24 ++++++++----- server/jetstream_cluster_4_test.go | 56 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 78b8f9e7d5a..402fd728946 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1141,17 +1141,24 @@ type recoveryUpdates struct { // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { - consumerName := func(o *consumer) string { - o.mu.RLock() - defer o.mu.RUnlock() - return o.name - } - // Can not hold jetstream lock while trying to delete streams or consumers. js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") + // We only want to cleanup any orphans if we know we are current with the meta-leader. + meta := cc.meta + if meta == nil || meta.GroupLeader() == _EMPTY_ { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, no meta-leader") + return + } + if !meta.Healthy() { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader") + return + } + var streams []*stream var consumers []*consumer @@ -1164,8 +1171,7 @@ func (js *jetStream) checkForOrphans() { } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { - consumer := consumerName(o) - if sa.consumers[consumer] == nil { + if sa.consumers[o.String()] == nil { consumers = append(consumers, o) } } @@ -1369,7 +1375,7 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") - js.checkForOrphans() + time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() continue diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 5f5e726c676..3d619e933dc 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -950,6 +950,7 @@ func TestJetStreamClusterConsumerNRGCleanup(t *testing.T) { numStreams++ } } + f.Close() } require_Equal(t, numConsumers, 0) require_Equal(t, numStreams, 0) @@ -2360,3 +2361,58 @@ func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) { require_NoError(t, err) require_Equal(t, sinfo.State.Msgs, 1) } + +func TestJetStreamClusterMetaSyncOrphanCleanup(t *testing.T) { + c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3, + func(serverName, clusterName, storeDir, conf string) string { + return fmt.Sprintf("%s\nserver_tags: [server:%s]", conf, serverName) + }) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Create a bunch of streams on S1 + for i := 0; i < 100; i++ { + stream := fmt.Sprintf("TEST-%d", i) + subject := fmt.Sprintf("TEST.%d", i) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subject}, + Storage: nats.FileStorage, + Placement: &nats.Placement{Tags: []string{"server:S-1"}}, + }) + require_NoError(t, err) + // Put in 10 msgs to each + for j := 0; j < 10; j++ { + _, err := js.Publish(subject, nil) + require_NoError(t, err) + } + } + + // Now we will shutdown S1 and remove all of its meta-data to trip the condition. + s := c.serverByName("S-1") + require_True(t, s != nil) + + sd := s.JetStreamConfig().StoreDir + nd := filepath.Join(sd, "$SYS", "_js_", "_meta_") + s.Shutdown() + s.WaitForShutdown() + os.RemoveAll(nd) + s = c.restartServer(s) + c.waitOnServerCurrent(s) + jsz, err := s.Jsz(nil) + require_NoError(t, err) + require_Equal(t, jsz.Streams, 100) + + // These will be recreated by the meta layer, but if the orphan detection deleted them they will be empty, + // so check all streams to make sure they still have data. + acc := s.GlobalAccount() + var state StreamState + for i := 0; i < 100; i++ { + mset, err := acc.lookupStream(fmt.Sprintf("TEST-%d", i)) + require_NoError(t, err) + mset.store.FastState(&state) + require_Equal(t, state.Msgs, 10) + } +}