Skip to content

Commit

Permalink
Make sure to not forward a message across a route for dq sub when we …
Browse files Browse the repository at this point in the history
…are a spoke leaf node.

Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed Oct 11, 2023
1 parent 0ac7895 commit 9f16edd
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
6 changes: 6 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4375,6 +4375,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}

// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}

// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
Expand Down
75 changes: 75 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5561,3 +5561,78 @@ func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *tes
closeSubs(rsubs)
checkFor(t, time.Second, 200*time.Millisecond, checkInterest)
}

// https://github.com/nats-io/nats-server/issues/4367
func TestLeafNodeDQMultiAccountExportImport(t *testing.T) {
bConf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: cluster-b-0
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
AGG: {
exports: [ { service: "PING.>" } ]
users: [ { user: agg, password: agg } ]
}
}
leaf { listen: 127.0.0.1:-1 }
`))

sb, ob := RunServerWithConfig(bConf)
defer sb.Shutdown()

tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: { store_dir: '%s' }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
A: {
mappings: { "A.>" : ">" }
exports: [ { service: A.> } ]
users: [ { user: a, password: a } ]
},
AGG: {
imports: [ { service: { subject: A.>, account: A } } ]
users: [ { user: agg, password: agg } ]
},
}
leaf {
remotes: [ {
urls: [ nats-leaf://agg:agg@127.0.0.1:{LEAF_PORT} ]
account: AGG
} ]
}
`
tmpl = strings.Replace(tmpl, "{LEAF_PORT}", fmt.Sprintf("%d", ob.LeafNode.Port), 1)
c := createJetStreamCluster(t, tmpl, "cluster-a", "cluster-a-", 3, 22110, false)
defer c.shutdown()

// Make sure all servers are connected via leafnode to the hub, the b server.
for _, s := range c.servers {
checkLeafNodeConnectedCount(t, s, 1)
}

// Connect to a server in the cluster and create a DQ listener.
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "a"))
defer nc.Close()

var got atomic.Int32

natsQueueSub(t, nc, "PING", "Q", func(m *nats.Msg) {
got.Add(1)
m.Respond([]byte("REPLY"))
})

// Now connect to B and send the request.
ncb, _ := jsClientConnect(t, sb, nats.UserInfo("agg", "agg"))
defer ncb.Close()

_, err := ncb.Request("A.PING", []byte("REQUEST"), time.Second)
require_NoError(t, err)
require_Equal(t, got.Load(), 1)
}

0 comments on commit 9f16edd

Please sign in to comment.