Skip to content

Commit

Permalink
Wrap catchup error to preserve original error message
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Oct 10, 2024
1 parent f30ee8f commit d421d9b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
18 changes: 17 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8280,6 +8280,19 @@ var (
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

type catchupErr struct {
msg string
err error
}

func (e *catchupErr) Error() string {
return e.msg
}

func (e *catchupErr) Unwrap() error {
return e.err
}

// Process a stream snapshot.
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
// Update any deletes, etc.
Expand Down Expand Up @@ -8380,7 +8393,10 @@ RETRY:
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
return errors.Join(errCatchupAbortedNoLeader, fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name()))
return &catchupErr{
msg: fmt.Sprintf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name()),
err: errCatchupAbortedNoLeader,
}
}

// If we have a sub clear that here.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4119,7 +4119,7 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
var snap StreamReplicatedState
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
err := mset.processSnapshot(&snap)
require_Error(t, err, errCatchupAbortedNoLeader)
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
require_True(t, isClusterResetErr(err))
mset.resetClusteredState(err)
},
Expand Down

0 comments on commit d421d9b

Please sign in to comment.