Skip to content

Commit

Permalink
[CHANGED] MQTT s.clear() do not wait for JS responses when disconnect…
Browse files Browse the repository at this point in the history
…ing the session (#5575)

MQTT s.clear(): do not wait for JS responses when disconnecting the
session

Related to #5471

Previously we were making `jsa.NewRequest` as it is needed when
connecting a clean session. On disconnect, there is no reason to wait
for the response (and tie up the MQTT read loop of the client).

This should specifically help situations when a client app with many
MQTT connections and QOS subscriptions disconnects suddenly, causing a
flood of JSAPI deleteConsumer requests.

Test: n/a, not sure how to instrument for it.
  • Loading branch information
levb authored Jun 21, 2024
1 parent 477c8ba commit ee9af0f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 68 deletions.
24 changes: 15 additions & 9 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(); err != nil {
if err := sess.clear(true); err != nil {
c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1701,8 +1701,14 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
return ccr, ccr.ToError()
}

func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
if noWait {
jsa.sendMsg(subj, nil)
return nil, nil
}

cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3184,7 +3190,7 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear() error {
func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -3212,19 +3218,19 @@ func (sess *mqttSession) clear() error {
sess.mu.Unlock()

for _, dur := range durs {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
}

if seq > 0 {
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
Expand Down Expand Up @@ -3490,7 +3496,7 @@ func (sess *mqttSession) untrackPubRel(pi uint16) (jsAckSubject string) {
func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) {
sess.mu.Lock()
sess.tmaxack -= cc.MaxAckPending
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))})
sess.jsa.deleteConsumer(mqttStreamName, cc.Durable, true)
sess.mu.Unlock()
}

Expand Down Expand Up @@ -3829,7 +3835,7 @@ CHECK:
// This Session lasts as long as the Network Connection. State data
// associated with this Session MUST NOT be reused in any subsequent
// Session.
if err := es.clear(); err != nil {
if err := es.clear(false); err != nil {
asm.removeSession(es, true)
return err
}
Expand Down
59 changes: 0 additions & 59 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6789,65 +6789,6 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) {
}
}

type unableToDeleteConsLogger struct {
DummyLogger
errCh chan string
}

func (l *unableToDeleteConsLogger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "unable to delete consumer") {
l.errCh <- msg
}
}

func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
org := mqttJSAPITimeout
mqttJSAPITimeout = 1000 * time.Millisecond
defer func() { mqttJSAPITimeout = org }()

cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()

o := cl.opts[0]
s1 := cl.servers[0]
// Plug error logger to s1
l := &unableToDeleteConsLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)

nc, js := jsClientConnect(t, s1)
defer nc.Close()

mc, r := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, r)

// Now shutdown server 2, we should lose quorum
cl.servers[1].Shutdown()

// Close the MQTT client:
testMQTTDisconnect(t, mc, nil)

// We should have reported that there was an error deleting the consumer
select {
case <-l.errCh:
// OK
case <-time.After(time.Second):
t.Fatal("Server did not report any error")
}

// Now restart the server 2 so that we can check that the session is still persisted.
cl.restartAllSamePorts()
cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName)

si, err := js.StreamInfo(mqttSessStreamName)
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)
}

// Test for auto-cleanup of consumers.
func TestMQTTConsumerInactiveThreshold(t *testing.T) {
tdir := t.TempDir()
Expand Down

0 comments on commit ee9af0f

Please sign in to comment.