Skip to content

Commit

Permalink
Merge pull request #455 from rabbitmq/checkpoints-fix
Browse files Browse the repository at this point in the history
Do not update ra_log_snapshot_state from snapshot writer process.
  • Loading branch information
kjnilsson authored Jul 15, 2024
2 parents 4fe9b43 + 8987cb3 commit 88e2ab2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
11 changes: 8 additions & 3 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -686,22 +686,27 @@ set_snapshot_state(SnapState, State) ->

-spec install_snapshot(ra_idxterm(), ra_snapshot:state(), state()) ->
{state(), effects()}.
install_snapshot({SnapIdx, _} = IdxTerm, SnapState,
install_snapshot({SnapIdx, _} = IdxTerm, SnapState0,
#?MODULE{cfg = Cfg,
cache = Cache} = State0) ->
cache = Cache} = State0) ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_INSTALLED, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, SnapIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, SnapIdx),
{State, Effs} = delete_segments(SnapIdx, State0),
{SnapState, Checkpoints} =
ra_snapshot:take_older_checkpoints(SnapIdx, SnapState0),
CPEffects = [{delete_snapshot,
ra_snapshot:directory(SnapState, checkpoint),
Checkpoint} || Checkpoint <- Checkpoints],
{State#?MODULE{snapshot_state = SnapState,
first_index = SnapIdx + 1,
last_index = SnapIdx,
%% TODO: update last_term too?
%% cache can be reset
cache = ra_log_cache:reset(Cache),
last_written_index_term = IdxTerm},
Effs}.
Effs ++ CPEffects}.

-spec recover_snapshot(State :: state()) ->
option({ra_snapshot:meta(), term()}).
Expand Down
4 changes: 1 addition & 3 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ begin_snapshot(#{index := Idx, term := Term} = Meta, MacRef, SnapKind,
-spec promote_checkpoint(Idx :: ra_index(), State0 :: state()) ->
{boolean(), State :: state(), Effects :: [effect()]}.
promote_checkpoint(PromotionIdx,
#?MODULE{uid = UId,
module = Mod,
#?MODULE{module = Mod,
snapshot_directory = SnapDir,
checkpoint_directory = CheckpointDir,
checkpoints = Checkpoints0} = State0) ->
Expand All @@ -369,7 +368,6 @@ promote_checkpoint(PromotionIdx,
%% into a snapshot.
ok = Mod:sync(Checkpoint),
ok = prim_file:rename(Checkpoint, Snapshot),
true = ets:insert(?ETSTBL, {UId, Idx}),
Self ! {ra_log_event,
{snapshot_written,
{Idx, Term}, snapshot}}
Expand Down
50 changes: 50 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ all_tests() ->
% snapshot_recovery,
snapshot_installation,
snapshot_written_after_installation,
oldcheckpoints_deleted_after_snapshot_install,
append_after_snapshot_installation,
written_event_after_snapshot_installation,
update_release_cursor,
Expand Down Expand Up @@ -822,6 +823,55 @@ snapshot_written_after_installation(Config) ->

ok.

oldcheckpoints_deleted_after_snapshot_install(Config) ->
Log0 = ra_log_init(Config, #{min_snapshot_interval => 2,
min_checkpoint_interval => 2}),
%% log 1 .. 9, should create a single segment
Log1 = write_and_roll(1, 10, 1, Log0),
{Log2, _} = ra_log:checkpoint(5, #{}, 1, <<"one-five">>, Log1),
DelayedSnapWritten = receive
{ra_log_event, {snapshot_written, {5, 1},
checkpoint} = Evt} ->
Evt
after 1000 ->
flush(),
exit(snapshot_written_timeout)
end,
{Log3, Efx4} = ra_log:handle_event(DelayedSnapWritten, Log2),

Meta = meta(15, 2, [?N1]),
Context = #{},
Chunk = create_snapshot_chunk(Config, Meta, Context),
SnapState0 = ra_log:snapshot_state(Log3),
{ok, SnapState1} = ra_snapshot:begin_accept(Meta, SnapState0),
{ok, SnapState} = ra_snapshot:accept_chunk(Chunk, 1, last, SnapState1),
{Log4, Effs4} = ra_log:install_snapshot({15, 2}, SnapState, Log3),
?assert(lists:any(fun (E) -> element(1, E) == delete_snapshot end, Effs4)),
%% write some more to create another segment
Log5 = write_and_roll(16, 20, 2, Log4),
{19, _} = ra_log:last_index_term(Log5),
{19, _} = ra_log:last_written(Log5),

[begin
case E of
{delete_snapshot, Dir, S} ->
ra_snapshot:delete(Dir, S);
_ ->
ok
end
end || E <- Efx4],

SnapStateAfter1 = ra_log:snapshot_state(Log5),
{false, SnapsStateAfter, _} =
ra_snapshot:promote_checkpoint(19, SnapStateAfter1),
%% assert there is no pending snapshot as checkpoint promotion should
%% not have promoted anything
?assertEqual(undefined, ra_snapshot:pending(SnapsStateAfter)),

_ = ra_log:close(ra_log_init(Config, #{min_snapshot_interval => 2})),

ok.

snapshot_installation(Config) ->
logger:set_primary_config(level, all),
% write a few entries
Expand Down

0 comments on commit 88e2ab2

Please sign in to comment.