Skip to content

Commit

Permalink
Add merger option
Browse files Browse the repository at this point in the history
This option is used to be able to merge identical keys according to a
user defined function.
  • Loading branch information
AntoineGagne committed Dec 19, 2023
1 parent c19d3f8 commit f400b66
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
3 changes: 2 additions & 1 deletion include/rig.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
-type basedir() :: string().
-type config() :: {table(), file(), decoder(), options()}.
-type decoder() :: fun((binary()) -> tuple()) | term | {module(), function()}.
-type merger() :: fun((Key :: term(), Old :: term(), New :: term()) -> Merged :: term()) | {module(), function()}.
-type file() :: string().
-type key() :: term().
-type option() :: {key_element, pos_integer()} | {subscribers, [pid()]}.
-type option() :: {key_element, pos_integer()} | {subscribers, [pid()]} | {merger, merger()}.
-type options() :: [option()].
-type table() :: atom().
-type value() :: term().
Expand Down
30 changes: 24 additions & 6 deletions src/rig_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,46 @@ configs_validate(Configs) ->
configs_validate([], Acc) ->
lists:flatten(lists:reverse(Acc));
configs_validate([{BaseDir, Configs} | T], Acc) ->
Configs2 = [{Table, BaseDir ++ Filename, DecoderFun, Options} ||
Configs2 = [{Table, BaseDir ++ Filename, DecoderFun, options_validate(Options)} ||
{Table, Filename, DecoderFun, Options} <- Configs],
configs_validate(T, [configs_validate(Configs2) | Acc]);
configs_validate([{Table, Filename, term, Options} | T], Acc) ->
DecoderFun = fun erlang:binary_to_term/1,
configs_validate(T, [{Table, Filename, DecoderFun, Options} | Acc]);
configs_validate(T, [{Table, Filename, DecoderFun, options_validate(Options)} | Acc]);
configs_validate([{Table, Filename, {Module, Function}, Options} | T], Acc) ->
DecoderFun = fun Module:Function/1,
configs_validate(T, [{Table, Filename, DecoderFun, Options} | Acc]);
configs_validate(T, [{Table, Filename, DecoderFun, options_validate(Options)} | Acc]);

configs_validate([{Table, Filename, DecoderFun, Options} | T], Acc)
when is_function(DecoderFun, 1) ->
configs_validate(T,
[{Table, Filename, DecoderFun, Options} | Acc]);
[{Table, Filename, DecoderFun, options_validate(Options)} | Acc]);
configs_validate([{Table, Filename, Decoder, Options} | T], Acc) ->
case rig_utils:parse_fun(Decoder) of
{ok, DecoderFun} ->
configs_validate(T,
[{Table, Filename, DecoderFun, Options} | Acc]);
[{Table, Filename, DecoderFun, options_validate(Options)} | Acc]);
{error, invalid_fun} ->
configs_validate(T, Acc)
end.

options_validate(Options) ->
options_validate(Options, []).

options_validate([], Acc) ->
Acc;
options_validate([{key_element, N} | R], Acc) when is_integer(N), N > 0 ->
options_validate(R, [{key_element, N} | Acc]);
options_validate([{subscribers, Subscribers} | R], Acc) when is_list(Subscribers) ->
options_validate(R, [{subscribers, Subscribers} | Acc]);
options_validate([{merger, {Module, Function}} | R], Acc) ->
options_validate(R, [{merger, fun Module:Function/3} | Acc]);
options_validate([{merger, Function} | R], Acc) when is_function(Function, 3) ->
options_validate(R, [{merger, Function} | Acc]);
options_validate([{Name, Value} | R], Acc) ->
error_logger:warning_msg("invalid option '~p' with value '~p'", [Name, Value]),
options_validate(R, Acc).

new_table(Name, Tids) ->
New = ets:new(table, [public, {read_concurrency, true}]),
{Current, Generations} = case maps:get(Name, Tids, []) of
Expand All @@ -182,7 +199,8 @@ reload({Name, Filename, DecoderFun, Opts}, Current, New) ->
Timestamp = os:timestamp(),
{ok, File} = file:open(Filename, [binary, read]),
KeyElement = ?LOOKUP(key_element, Opts, ?DEFAULT_KEY_ELEMENT),
ok = rig_utils:read_file(File, DecoderFun, New, KeyElement),
Merge = ?LOOKUP(merger, Opts, undefined),
ok = rig_utils:read_file(File, DecoderFun, New, KeyElement, Merge),
ok = file:close(File),
ok = rig_index:add(Name, New),
Subscribers = ?LOOKUP(subscribers, Opts, ?DEFAULT_SUBSCRIBERS),
Expand Down
50 changes: 41 additions & 9 deletions src/rig_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
lookup/3,
match_all/1,
parse_fun/1,
read_file/4
read_file/5
]).

-type merge() :: fun((Key :: term(), Old :: term(), New :: term()) -> Merged :: term()).
-record(state, {
decoder :: fun((binary()) -> tuple()),
table :: ets:tid(),
key_position :: pos_integer(),
merge = undefined :: merge() | undefined
}).

%% public
-spec change_time(file:filename()) ->
pos_integer() | undefined.
Expand Down Expand Up @@ -79,11 +87,16 @@ parse_fun(Decoder) ->
{error, invalid_fun}
end.

-spec read_file(file:io_device(), decoder(), ets:tid(),
pos_integer()) -> ok.
-spec read_file(
file:io_device(),
decoder(),
ets:tid(),
pos_integer(),
merge() | undefined
) -> ok.

read_file(File, Decoder, Tid, KeyPos) ->
State = {Decoder, Tid, KeyPos},
read_file(File, Decoder, Tid, KeyPos, Merge) ->
State = #state{decoder = Decoder, table = Tid, key_position = KeyPos, merge = Merge},
read_file_buf(File, <<>>, 0, State).

%% private
Expand All @@ -107,20 +120,39 @@ parse_records(Bin, 0, State) when size(Bin) >= 4 ->
parse_records(Rest, Size, State);
parse_records(Bin, 0, _State) ->
{Bin, 0};
parse_records(Bin, Size, {Decoder, Tid, KeyPos} = State)
when size(Bin) >= Size ->
parse_records(
Bin,
Size,
#state{
decoder = Decoder,
key_position = KeyPos
} = State
) when
size(Bin) >= Size
->
<<Record:Size/binary, Rest/binary>> = Bin,
case Decoder(Record) of
{Key, Value} ->
true = ets:insert(Tid, {Key, Value});
handle_merge(Key, Value, State);
R when is_tuple(R) ->
Key = element(KeyPos, R),
true = ets:insert(Tid, {Key, R})
handle_merge(Key, R, State)
end,
parse_records(Rest, 0, State);
parse_records(Bin, Size, _State) ->
{Bin, Size}.

handle_merge(Key, Value, #state{merge = undefined, table = Tid}) ->
true = ets:insert(Tid, {Key, Value});
handle_merge(Key, New, #state{merge = Merge, table = Tid}) ->
case ets:lookup(Tid, Key) of
[] ->
true = ets:insert(Tid, {Key, New});
[Old] ->
Merged = Merge(Key, Old, New),
true = ets:insert(Tid, {Key, Merged})
end.

read_file_buf(File, Prefix, Size, State) ->
case file:read(File, ?FILE_READ_SIZE * 2) of
eof ->
Expand Down

0 comments on commit f400b66

Please sign in to comment.