Skip to content

Commit

Permalink
CQ: Fix shared store scanner missing messages
Browse files Browse the repository at this point in the history
It was still possible, although rare, to have message store
files lose message data, when the following conditions were
met:

 * the message data contains byte values 255
   (255 is used as an OK marker after a message)
 * the message is located after a 0-filled hole in the file
 * the length of the data is at least 4096 bytes and
   if we misread it (as detailed below) we encounter
   a 255 byte where we expect the OK marker

The trick for the code to previously misread the length can
be explained as follow:

A message is stored in the following format:

  <<Len:64, MsgIdAndMsg:Len/unit:8, 255>>

With MsgId always being 16 bytes in length. So Len is always
at least 16, if the message data Msg is empty. But technically
it never is.

Now if we have a zero filled hole just before this message,
we may end up with this:

  <<0, Len:64, MsgIdAndMsg:Len/unit:8, 255>>

When we are scanning we are testing bytes to see if there is
a message there or not. We look for a Len that gives us byte
255 after MsgIdAndMsg.

Len of value 4096 looks like this in binary:

  <<0:48, 16, 0>>

Problem is if we have leading zeroes, Len may look like this:

  <<0, 0:48, 16, 0>>

If we take the first 64 bits we get a potential length of 16.
We look at the byte after the next 16 bytes. If it is 255, we
think this is a message and skip by this amount of bytes, and
mistakenly miss the real message.

Solving this by changing the file format would be simple enough,
but we don't have the luxury to afford that. A different solution
was found, which is to combine file scanning with checking that
the message exists in the message store index (populated from
queues at startup, and kept up to date over the life time of
the store). Then we know for sure that the message above
doesn't exist, because the MsgId won't be found in the index.
If it is, then the file number and offset will not match,
and the check will fail.

There remains a small chance that we get it wrong during dirty
recovery. Only a better file format would improve that.
  • Loading branch information
lhoguin committed Sep 30, 2024
1 parent 36a84f4 commit dd8870c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 94 deletions.
179 changes: 91 additions & 88 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-export([compact_file/2, truncate_file/4, delete_file/2]). %% internal

-export([scan_file_for_valid_messages/1]). %% salvage tool
-export([scan_file_for_valid_messages/1, scan_file_for_valid_messages/2]). %% salvage tool

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_call/4, prioritise_cast/3,
Expand Down Expand Up @@ -1472,31 +1472,28 @@ list_sorted_filenames(Dir, Ext) ->

-define(SCAN_BLOCK_SIZE, 4194304). %% 4MB

scan_file_for_valid_messages(Dir, FileName) ->
scan_file_for_valid_messages(form_filename(Dir, FileName)).

%% Exported as a salvage tool. Not as accurate as node recovery
%% because it doesn't have the queue index.
scan_file_for_valid_messages(Path) ->
scan_file_for_valid_messages(Path, fun(Obj) -> {valid, Obj} end).

scan_file_for_valid_messages(Path, Fun) ->
case file:open(Path, [read, binary, raw]) of
{ok, Fd} ->
{ok, FileSize} = file:position(Fd, eof),
{ok, _} = file:position(Fd, bof),
Messages = scan(<<>>, Fd, 0, FileSize, #{}, []),
Messages = scan(<<>>, Fd, Fun, 0, FileSize, #{}, []),
ok = file:close(Fd),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end;
{ok, Messages};
{error, enoent} ->
{ok, [], 0};
{ok, []};
{error, Reason} ->
{error, {unable_to_scan_file,
filename:basename(Path),
Reason}}
end.

scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan(Buffer, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
case file:read(Fd, ?SCAN_BLOCK_SIZE) of
eof ->
Acc;
Expand All @@ -1505,12 +1502,12 @@ scan(Buffer, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
<<>> -> Data0;
_ -> <<Buffer/binary, Data0/binary>>
end,
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
end.

%% Message might have been found.
scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
Fd, Offset, FileSize, MsgIdsFound, Acc)
Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when Size >= 16 ->
<<MsgIdInt:128, _/bits>> = MsgIdAndMsg,
case MsgIdsFound of
Expand All @@ -1519,26 +1516,37 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
%% simply be a coincidence. Try the next byte.
#{MsgIdInt := true} ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Offset + 1, FileSize, MsgIdsFound, Acc);
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc);
%% Data looks to be a message.
_ ->
%% Avoid sub-binary construction.
MsgId = <<MsgIdInt:128>>,
TotalSize = Size + 9,
scan_data(Rest, Fd, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true},
[{MsgId, TotalSize, Offset}|Acc])
case Fun({MsgId, TotalSize, Offset}) of
%% Confirmed to be a message by the provided fun.
{valid, Entry} ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, [Entry|Acc]);
%% Confirmed to be a message but we don't need it anymore.
previously_valid ->
scan_data(Rest, Fd, Fun, Offset + TotalSize, FileSize,
MsgIdsFound#{MsgIdInt => true}, Acc);
%% Not a message, try the next byte.
invalid ->
<<_, Rest2/bits>> = Data,
scan_data(Rest2, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc)
end
end;
%% This might be the start of a message.
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan_data(<<Size:64, Rest/bits>> = Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Rest) < Size + 1, Size < FileSize - Offset ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Offset, FileSize, MsgIdsFound, Acc)
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
when byte_size(Data) < 8 ->
scan(Data, Fd, Offset, FileSize, MsgIdsFound, Acc);
scan(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc);
%% This is definitely not a message. Try the next byte.
scan_data(<<_, Rest/bits>>, Fd, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Offset + 1, FileSize, MsgIdsFound, Acc).
scan_data(<<_, Rest/bits>>, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc) ->
scan_data(Rest, Fd, Fun, Offset + 1, FileSize, MsgIdsFound, Acc).

%%----------------------------------------------------------------------------
%% Ets index
Expand Down Expand Up @@ -1742,47 +1750,39 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},

build_index_worker(Gatherer, #msstate { index_ets = IndexEts, dir = Dir },
File, Files) ->
FileName = filenum_to_name(File),
Path = form_filename(Dir, filenum_to_name(File)),
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
[form_filename(Dir, FileName), length(Files)]),
[Path, length(Files)]),
%% The scan function already dealt with duplicate messages
%% within the file. We then get messages in reverse order.
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, FileName),
%% Valid messages are in file order so the last message is
%% the last message from the list.
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{[Obj | VMAcc], VTSAcc + TotalSize};
_ ->
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
FileSize1 =
case Files of
%% if it's the last file, we'll truncate to remove any
%% rubbish above the last valid message. This affects the
%% file size.
[] -> case ValidMessages of
[] -> 0;
_ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end;
[_|_] -> FileSize
end,
%% within the file, and only returns valid messages (we do
%% the index lookup in the fun). But we get messages in reverse order.
{ok, Messages} = scan_file_for_valid_messages(Path,
fun (Obj = {MsgId, TotalSize, Offset}) ->
%% Fan-out may result in the same message data in multiple
%% files so we have to guard against it.
case index_lookup(IndexEts, MsgId) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(IndexEts, StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize }),
{valid, Obj};
_ ->
invalid
end
end),
ValidTotalSize = lists:foldl(fun({_, TotalSize, _}, Acc) -> Acc + TotalSize end, 0, Messages),
%% Any file may have rubbish at the end of it that we will want truncated.
%% Note that the last message in the file is the first in the list.
FileSize = case Messages of
[] ->
0;
[{_, TotalSize, Offset}|_] ->
Offset + TotalSize
end,
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
file_size = FileSize1,
file_size = FileSize,
locked = false }),
ok = gatherer:finish(Gatherer).

Expand Down Expand Up @@ -1933,7 +1933,7 @@ compact_file(File, State = #gc_state { index_ets = IndexEts,
%% Load the messages. It's possible to get 0 messages here;
%% that's OK. That means we have little to do as the file is
%% about to be deleted.
{Messages, _} = scan_and_vacuum_message_file(File, State),
Messages = scan_and_vacuum_message_file(File, State),
%% Blank holes. We must do this first otherwise the file is left
%% with data that may confuse the code (for example data that looks
%% like a message, isn't a message, but spans over a real message).
Expand Down Expand Up @@ -2087,7 +2087,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
_ ->
[#file_summary{ valid_total_size = 0,
file_size = FileSize }] = ets:lookup(FileSummaryEts, File),
{[], 0} = scan_and_vacuum_message_file(File, State),
[] = scan_and_vacuum_message_file(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true = ets:delete(FileSummaryEts, File),
rabbit_log:debug("Deleted empty file number ~tp; reclaimed ~tp bytes", [File, FileSize]),
Expand All @@ -2096,28 +2096,31 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,

scan_and_vacuum_message_file(File, #gc_state{ index_ets = IndexEts, dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
Acc;
not_found ->
Acc
end
end, {[], 0}, Messages).
Path = form_filename(Dir, filenum_to_name(File)),
{ok, Messages} = scan_file_for_valid_messages(Path,
fun ({MsgId, TotalSize, Offset}) ->
case index_lookup(IndexEts, MsgId) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
index_delete_object(IndexEts, Entry),
%% The message was valid, but since we have now deleted
%% it due to having no ref_count, it becomes invalid.
%% We still want to let the scan function skip though.
previously_valid;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
{valid, Entry};
%% Fan-out may remove the entry but also write a new
%% entry in a different file when it needs to write
%% a message and the existing reference is in a file
%% that's about to be deleted. So we explicitly accept
%% these cases and ignore this message.
#msg_location { file = OtherFile, total_size = TotalSize }
when File =/= OtherFile ->
invalid;
not_found ->
invalid
end
end),
%% @todo Do we really need to reverse messages?
lists:reverse(Messages).
23 changes: 17 additions & 6 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,22 @@ msg_store_file_scan1(Config) ->
%% Messages with no content.
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
ok = Scan([{msg, gen_id(), <<>>}]),
%% Tricky messages.
%%
%% These only get properly detected when the index is populated.
%% In this test case we simulate the index with a fun.
TrickyScan = fun (Blocks, Expected, Fun) ->
Path = gen_msg_file(Config, Blocks),
Result = rabbit_msg_store:scan_file_for_valid_messages(Path, Fun),
case Result of
Expected -> ok;
_ -> {expected, Expected, got, Result}
end
end,
ok = TrickyScan(
[{bin, <<0, 0:48, 17, 17, "idididididididid", 255, 0:4352/unit:8, 255>>}],
{ok, [{<<"idididididididid">>, 4378, 1}]},
fun(Obj = {<<"idididididididid">>, 4378, 1}) -> {valid, Obj}; (_) -> invalid end),
%% All good!!
passed.

Expand Down Expand Up @@ -661,12 +677,7 @@ gen_msg_file(Config, Blocks) ->

gen_result(Blocks) ->
Messages = gen_result(Blocks, 0, []),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end.
{ok, Messages}.

gen_result([], _, Acc) ->
Acc;
Expand Down

0 comments on commit dd8870c

Please sign in to comment.