Skip to content

Commit

Permalink
Cherry-picks for 2.10.22-RC.2 (#5984)
Browse files Browse the repository at this point in the history
Includes the following:

- #5918
- #5982
- #5983 (although only the 1.22.8 upgrade, since 1.21.x is no longer
receiving updates)

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Oct 10, 2024
2 parents 0fcc881 + 025bbc8 commit cc35c9a
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.22.7"
- "1.22.8"
- "1.21.13"

go_import_path: github.com/nats-io/nats-server
Expand Down
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,8 @@ func (a *Account) Interest(subject string) int {
var nms int
a.mu.RLock()
if a.sl != nil {
res := a.sl.Match(subject)
nms = len(res.psubs) + len(res.qsubs)
np, nq := a.sl.NumInterest(subject)
nms = np + nq
}
a.mu.RUnlock()
return nms
Expand Down
33 changes: 26 additions & 7 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -388,6 +390,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -439,6 +442,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) {
require_NoError(t, err)

nc := ac.Connect(nkeyOpt)
defer nc.Close()

// Make sure that the callout was called.
if atomic.LoadUint32(&callouts) != 1 {
Expand Down Expand Up @@ -658,6 +662,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send correct token. This should switch us to the test account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -678,6 +683,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken))
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -697,6 +703,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -922,10 +929,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

// Authorization services can optionally encrypt the responses using the server's public xkey.
ac.Connect(nats.UserInfo("dlc", "xxx"))
nc = ac.Connect(nats.UserInfo("dlc", "xxx"))
defer nc.Close()
}

func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
Expand Down Expand Up @@ -1017,10 +1026,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
defer removeFile(t, creds)

// This will receive an encrypted request to the auth service but send plaintext response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
defer nc.Close()

// This will receive an encrypted request to the auth service and send an encrypted response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
defer nc.Close()
}

func TestAuthCalloutServerTags(t *testing.T) {
Expand Down Expand Up @@ -1048,7 +1059,8 @@ func TestAuthCalloutServerTags(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

tags := <-tch
require_True(t, len(tags) == 2)
Expand Down Expand Up @@ -1081,7 +1093,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

cluster := <-ch
require_True(t, cluster == "HUB")
Expand Down Expand Up @@ -1184,7 +1197,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) {
require_NoError(t, err)

// This one will use callout since not defined in server config.
ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()
checkSubsPending(t, sub, 0)

checkAuthErrEvent := func(user, pass, reason string) {
Expand Down Expand Up @@ -1244,6 +1258,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) {

// Setup system user.
snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

// Allow this connect event to pass us by..
time.Sleep(250 * time.Millisecond)
Expand Down Expand Up @@ -1615,6 +1630,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {
// Send correct token. This should switch us to the A account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -1626,6 +1642,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {

nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1703,6 +1720,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1910,6 +1928,7 @@ func TestOperatorModeUserRevocation(t *testing.T) {
// connect the system user
sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds))
require_NoError(t, err)
defer sysNC.Close()

// Bearer token etc..
// This is used by all users, and the customization will be in other connect args.
Expand Down
1 change: 1 addition & 0 deletions server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) {

// Make sure we connect ok and to the correct account.
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down
25 changes: 13 additions & 12 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3189,7 +3189,7 @@ func (c *client) processUnsub(arg []byte) error {
func (c *client) checkDenySub(subject string) bool {
if denied, ok := c.mperms.dcache[subject]; ok {
return denied
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
} else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 {
c.mperms.dcache[subject] = true
return true
} else {
Expand Down Expand Up @@ -3711,13 +3711,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed := true
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := c.perms.pub.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && c.perms.pub.deny != nil {
r := c.perms.pub.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := c.perms.pub.deny.NumInterest(subject)
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
Expand Down Expand Up @@ -4649,17 +4649,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
}
continue
} else {
c.addSubToRouteTargets(sub)
// Clear rsub since we added a sub.
rsub = nil
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, sub.queue)
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
}
rsub = sub
}
break
}
Expand Down Expand Up @@ -4708,8 +4709,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote or leaf node.
// We are here if we have selected a leaf or route as the destination,
// or if we tried to deliver to a local qsub but failed.
c.addSubToRouteTargets(rsub)
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, rsub.queue)
Expand Down
5 changes: 4 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) {
}
nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...)
if expectedOk {
defer nc.Close()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -2979,15 +2980,17 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {

wait := make(chan error)

nca, err := nats.Connect(proxy.clientURL())
nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose())
require_NoError(t, err)
defer nca.Close()
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
wait <- err
close(wait)
})

ncb, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer ncb.Close()

_, err = nca.Subscribe("test", func(msg *nats.Msg) {
wait <- nil
Expand Down
10 changes: 3 additions & 7 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
defer s.Shutdown()
l := &captureFatalLogger{fatalCh: make(chan string, 1)}
s.SetLogger(l, false, false)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
// This does not block
s.Start()
select {
case e := <-l.fatalCh:
if !strings.Contains(e, errTxt) {
Expand All @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
t.Fatal("Should have got a fatal error")
}
s.Shutdown()
wg.Wait()
s.WaitForShutdown()
}

func TestGatewayListenError(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) {
for range time.NewTicker(200 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
send(t)
Expand Down
Loading

0 comments on commit cc35c9a

Please sign in to comment.