Skip to content

Commit

Permalink
Fix difference in KV CAS operations for R1 vs R3 (#5841)
Browse files Browse the repository at this point in the history
A regression was introduced in
#5821 where CAS operations on
a R1 stream would succeed even if they should be rejected. Whereas on
R3/clustered they would be rejected.

Resolves: #5840

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

---------

Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Co-authored-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
MauriceVanVeen and wallyqs authored Aug 28, 2024
1 parent f1f3cdc commit 5b9de02
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
39 changes: 19 additions & 20 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3046,29 +3046,28 @@ func TestJetStreamClusterKeyValueLastSeqMismatch(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "mismatch",
Replicas: 3,
})
require_NoError(t, err)

revision, err := kv.Create("foo", []byte("1"))
require_NoError(t, err)
require_Equal(t, revision, 1)
for _, r := range []int{1, 3} {
t.Run(fmt.Sprintf("R=%d", r), func(t *testing.T) {
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: fmt.Sprintf("mismatch_%v", r),
Replicas: r,
})
require_NoError(t, err)

revision, err = kv.Create("bar", []byte("2"))
require_NoError(t, err)
require_Equal(t, revision, 2)
revision, err := kv.Create("foo", []byte("1"))
require_NoError(t, err)
require_Equal(t, revision, 1)

// Now delete foo from sequence 1.
// This needs to be low level remove (or system level) to test the condition we want here.
err = js.DeleteMsg("KV_mismatch", 1)
require_Error(t, err)
revision, err = kv.Create("bar", []byte("2"))
require_NoError(t, err)
require_Equal(t, revision, 2)

// Now say we want to update baz but iff last was revision 1.
_, err = kv.Update("baz", []byte("3"), uint64(1))
require_Error(t, err)
require_Equal(t, err.Error(), `nats: wrong last sequence: 0`)
// Now say we want to update baz but iff last was revision 1.
_, err = kv.Update("baz", []byte("3"), uint64(1))
require_Error(t, err)
require_Equal(t, err.Error(), `nats: wrong last sequence: 0`)
})
}
}

func TestJetStreamClusterPubAckSequenceDupe(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4690,7 +4690,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if err == ErrStoreMsgNotFound {
if seq == 0 {
fseq, err = 0, nil
} else {
} else if mset.isClustered() {
// Do not bump clfs in case message was not found and could have been deleted.
var ss StreamState
store.FastState(&ss)
Expand Down

0 comments on commit 5b9de02

Please sign in to comment.