Skip to content

Commit

Permalink
Merge pull request #12392 from rabbitmq/loic-fix-cq-scan
Browse files Browse the repository at this point in the history
CQ: Fix shared store scanner missing messages
  • Loading branch information
michaelklishin authored Oct 8, 2024
2 parents 83d094d + 639e905 commit 6feca5f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 94 deletions.
179 changes: 91 additions & 88 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-export([compact_file/2, truncate_file/4, delete_file/2]). %% internal

-export([scan_file_for_valid_messages/1]). %% salvage tool
-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/4, prioritise_cast/3,
Expand Down Expand Up @@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) ->

-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB

scan_file_for_valid_messages(Dir, FileName) ->
scan_file_for_valid_messages(form_filename(Dir, FileName)).

%% Exported as a salvage tool. Not as accurate as node recovery
%% because it doesn't have the queue index.
scan_file_for_valid_messages(Path) ->
scan_file_for_valid_messages(Path, fun(Obj) -> {valid, Obj} end).

scan_file_for_valid_messages(Path, Fun) ->
case file:open(Path, [read, binary, raw]) of
{ok, Fd} ->
{ok, FileSize} = file:position(Fd, eof),
{ok, _} = file:position(Fd, bof),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []),
ok = file:close(Fd),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end;
{ok, Messages};
{error, enoent} ->
{ok, [], 0};
{ok, []};
{error, Reason} ->
{error, {unable_to_scan_file,
filename:basename(Path),
Reason}}
end.

scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
eof ->
Acc;
Expand All @@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
<<>> -> Data0;
_ -> <<Buffer/binary, Data0/binary>>
end,
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
end.

%% Message might have been found.
scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
Fd, Offset, FileSize, MsgIdsFound, Acc)
Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when Size >= 16 ->
<<MsgIdInt:128, _/bits>> = MsgIdAndMsg,
case MsgIdsFound of
Expand All @@ -1519,26 +1516,37 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc);
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true},
[{MsgId, TotalSize, Offset}|Acc])
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
end
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).

%%----------------------------------------------------------------------------
%% Ets index
Expand Down Expand Up @@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},

build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
File, Files) ->
FileName = filenum_to_name(File),
Path = form_filename(Dir, filenum_to_name(File)),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
[form_filename(Dir, FileName), length(Files)]),
[Path, length(Files)]),
%% The scan function already dealt with duplicate messages
%% within the file. We then get messages in reverse order.
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, FileName),
%% Valid messages are in file order so the last message is
%% the last message from the list.
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{[Obj | VMAcc], VTSAcc + TotalSize};
_ ->
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
FileSize1 =
case Files of
%% if it's the last file, we'll truncate to remove any
%% rubbish above the last valid message. This affects the
%% file size.
[] -> case ValidMessages of
[] -> 0;
_ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end;
[_|_] -> FileSize
end,
%% within the file, and only returns valid messages (we do
%% the index lookup in the fun). But we get messages in reverse order.
{ok, Messages} = scan_file_for_valid_messages(Path,
fun (Obj = {MsgId, TotalSize, Offset}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{valid, Obj};
_ ->
invalid
end
end),
ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages),
%% Any file may have rubbish at the end of it that we will want truncated.
%% Note that the last message in the file is the first in the list.
FileSize = case Messages of
[] ->
0;
[{_, TotalSize, Offset}|_] ->
Offset + TotalSize
end,
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
file_size = FileSize1,
file_size = FileSize,
locked = false }),
ok = gatherer:finish(Gatherer).

Expand Down Expand Up @@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
%% Load the messages. It's possible to get 0 messages here;
%% that's OK. That means we have little to do as the file is
%% about to be deleted.
{Messages, _} = scan_and_vacuum_message_file(File, State),
Messages = scan_and_vacuum_message_file(File, State),
%% Blank holes. We must do this first otherwise the file is left
%% with data that may confuse the code (for example data that looks
%% like a message, isn't a message, but spans over a real message).
Expand Down Expand Up @@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
_ ->
[#file_summary{ valid_total_size = 0,
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
{[], 0} = scan_and_vacuum_message_file(File, State),
[] = scan_and_vacuum_message_file(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true = ets:delete(FileSummaryEts, File),
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),
Expand All @@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,

scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
Acc;
not_found ->
Acc
end
end, {[], 0}, Messages).
Path = form_filename(Dir, filenum_to_name(File)),
{ok, Messages} = scan_file_for_valid_messages(Path,
fun ({MsgId, TotalSize, Offset}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
%% The message was valid, but since we have now deleted
%% it due to having no ref_count, it becomes invalid.
%% We still want to let the scan function skip though.
previously_valid;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{valid, Entry};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
invalid;
not_found ->
invalid
end
end),
%% @todo Do we really need to reverse messages?
lists:reverse(Messages).
23 changes: 17 additions & 6 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,22 @@ msg_store_file_scan1(Config) ->
%% Messages with no content.
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
ok = Scan([{msg, gen_id(), <<>>}]),
%% Tricky messages.
%%
%% These only get properly detected when the index is populated.
%% In this test case we simulate the index with a fun.
TrickyScan = fun (Blocks, Expected, Fun) ->
Path = gen_msg_file(Config, Blocks),
Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun),
case Result of
Expected -> ok;
_ -> {expected, Expected, got, Result}
end
end,
ok = TrickyScan(
[{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}],
{ok, [{<<"idididididididid">>, 4378, 1}]},
fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end),
%% All good!!
passed.

Expand Down Expand Up @@ -662,12 +678,7 @@ gen_msg_file(Config, Blocks) ->

gen_result(Blocks) ->
Messages = gen_result(Blocks, 0, []),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end.
{ok, Messages}.

gen_result([], _, Acc) ->
Acc;
Expand Down

0 comments on commit 6feca5f

Please sign in to comment.