Skip to content

Commit

Permalink
Merge pull request #3777 from esl/remove_domain/incremental_db_deletes
Browse files Browse the repository at this point in the history
Remove domain/incremental db deletes
  • Loading branch information
chrzaszcz authored Sep 28, 2022
2 parents 6ddc8b4 + 23618cb commit 975bb21
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 61 deletions.
23 changes: 16 additions & 7 deletions big_tests/tests/domain_removal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ all() ->
{group, auth_removal},
{group, cache_removal},
{group, mam_removal},
{group, mam_removal_incremental},
{group, inbox_removal},
{group, muc_light_removal},
{group, muc_removal},
Expand All @@ -35,6 +36,8 @@ groups() ->
{cache_removal, [], [cache_removal]},
{mam_removal, [], [mam_pm_removal,
mam_muc_removal]},
{mam_removal_incremental, [], [mam_pm_removal,
mam_muc_removal]},
{inbox_removal, [], [inbox_removal]},
{muc_light_removal, [], [muc_light_removal,
muc_light_blocking_removal]},
Expand Down Expand Up @@ -93,6 +96,10 @@ group_to_modules(mam_removal) ->
MucHost = subhost_pattern(muc_light_helper:muc_host_pattern()),
[{mod_mam, mam_helper:config_opts(#{pm => #{}, muc => #{host => MucHost}})},
{mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}];
group_to_modules(mam_removal_incremental) ->
MucHost = subhost_pattern(muc_light_helper:muc_host_pattern()),
[{mod_mam, mam_helper:config_opts(#{delete_domain_limit => 1, pm => #{}, muc => #{host => MucHost}})},
{mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}];
group_to_modules(muc_light_removal) ->
[{mod_muc_light, mod_config(mod_muc_light, #{backend => rdbms})}];
group_to_modules(muc_removal) ->
Expand Down Expand Up @@ -170,10 +177,11 @@ cache_removal(Config) ->

mam_pm_removal(Config) ->
F = fun(Alice, Bob) ->
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
escalus:wait_for_stanza(Bob),
mam_helper:wait_for_archive_size(Alice, 1),
mam_helper:wait_for_archive_size(Bob, 1),
N = 3,
[ escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)) || _ <- lists:seq(1, N) ],
escalus:wait_for_stanzas(Bob, N),
mam_helper:wait_for_archive_size(Alice, N),
mam_helper:wait_for_archive_size(Bob, N),
run_remove_domain(),
mam_helper:wait_for_archive_size(Alice, 0),
mam_helper:wait_for_archive_size(Bob, 0)
Expand All @@ -182,14 +190,15 @@ mam_pm_removal(Config) ->

mam_muc_removal(Config0) ->
F = fun(Config, Alice) ->
N = 3,
Room = muc_helper:fresh_room_name(),
MucHost = muc_light_helper:muc_host(),
muc_light_helper:create_room(Room, MucHost, alice,
[], Config, muc_light_helper:ver(1)),
RoomAddr = <<Room/binary, "@", MucHost/binary>>,
escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, <<"text">>)),
escalus:wait_for_stanza(Alice),
mam_helper:wait_for_room_archive_size(MucHost, Room, 1),
[ escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, <<"text">>)) || _ <- lists:seq(1, N) ],
escalus:wait_for_stanzas(Alice, N),
mam_helper:wait_for_room_archive_size(MucHost, Room, N),
run_remove_domain(),
mam_helper:wait_for_room_archive_size(MucHost, Room, 0)
end,
Expand Down
8 changes: 8 additions & 0 deletions doc/modules/mod_mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ This sets the maximum page size of returned results.
This enforces all mam lookups to be "simple", i.e., they skip the RSM count.
See [Message Archive Management extensions](../open-extensions/mam.md).

#### `modules.mod_mam.delete_domain_limit`

* **Syntax:** non-negative integer or the string `"infinity"`
* **Default:** `"infinity"`
* **Example:** `modules.mod_mam.delete_domain_limit = 10000`

Domain deletion can be an expensive operation, as it requires to delete potentially many thousands of records from the DB. By default, the delete operation deletes everything in a transaction, but it might be desired, to handle timeouts and table locks more gracefully, to delete the records in batches. This limit establishes the size of the batch.

#### `modules.mod_mam.db_jid_format`

* **Syntax:** string, one of `"mam_jid_rfc"`, `"mam_jid_rfc_trust"`, `"mam_jid_mini"` or a module implementing `mam_jid` behaviour
Expand Down
6 changes: 4 additions & 2 deletions src/mam/mod_mam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ common_config_items() ->
<<"default_result_limit">> => #option{type = integer,
validate = non_negative},
<<"enforce_simple_queries">> => #option{type = boolean},
<<"delete_domain_limit">> => #option{type = int_or_infinity,
validate = positive},
<<"max_result_limit">> => #option{type = integer,
validate = non_negative},
<<"db_jid_format">> => #option{type = atom,
Expand Down Expand Up @@ -296,7 +298,7 @@ parse_backend_opts(elasticsearch, Type, _Opts, Deps0) ->
-spec add_rdbms_deps(basic | user_cache | async_writer,
mam_type(), module_opts(), module_map()) -> module_map().
add_rdbms_deps(basic, Type, Opts, Deps) ->
Opts1 = maps:with([db_message_format, db_jid_format], Opts),
Opts1 = maps:with([db_message_format, db_jid_format, delete_domain_limit], Opts),
Deps1 = add_dep(rdbms_arch_module(Type), maps:merge(rdbms_arch_defaults(Type), Opts1), Deps),
add_dep(mod_mam_rdbms_user, user_db_types(Type), Deps1);
add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Deps) ->
Expand Down Expand Up @@ -328,7 +330,7 @@ rdbms_arch_defaults(muc) ->

rdbms_arch_defaults() ->
#{db_message_format => mam_message_compressed_eterm,
no_writer => false}.
no_writer => false, delete_domain_limit => infinity}.

rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.
Expand Down
60 changes: 38 additions & 22 deletions src/mam/mod_mam_muc_rdbms_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@

-include("mongoose.hrl").
-include("jlib.hrl").
-include_lib("exml/include/exml.hrl").
-include("mongoose_rsm.hrl").
-include("mongoose_mam.hrl").

%% ----------------------------------------------------------------------
Expand All @@ -54,9 +52,9 @@
%% Starting and stopping functions for users' archives

-spec start(host_type(), gen_mod:module_opts()) -> ok.
start(HostType, _Opts) ->
start(HostType, Opts) ->
start_hooks(HostType),
register_prepared_queries(),
register_prepared_queries(Opts),
ok.

-spec stop(host_type()) -> ok.
Expand Down Expand Up @@ -108,16 +106,25 @@ hooks(HostType) ->
%% ----------------------------------------------------------------------
%% SQL queries

register_prepared_queries() ->
register_prepared_queries(Opts) ->
prepare_insert(insert_mam_muc_message, 1),
mongoose_rdbms:prepare(mam_muc_archive_remove, mam_muc_message, [room_id],
<<"DELETE FROM mam_muc_message "
"WHERE room_id = ?">>),

%% Domain Removal
{MaybeLimitSQL, MaybeLimitMSSQL} = mod_mam_utils:batch_delete_limits(Opts),
IdTable = <<"(SELECT ", MaybeLimitMSSQL/binary,
" id from mam_server_user WHERE server = ? ", MaybeLimitSQL/binary, ")">>,
ServerTable = <<"(SELECT * FROM (SELECT", MaybeLimitMSSQL/binary,
" server FROM mam_server_user WHERE server = ? ", MaybeLimitSQL/binary, ") as t)">>,
mongoose_rdbms:prepare(mam_muc_remove_domain, mam_muc_message, ['mam_server_user.server'],
<<"DELETE FROM mam_muc_message "
"WHERE room_id IN (SELECT id FROM mam_server_user where server = ?)">>),
"WHERE room_id IN ", IdTable/binary>>),
mongoose_rdbms:prepare(mam_muc_remove_domain_users, mam_server_user, [server],
<<"DELETE FROM mam_server_user WHERE server = ?">>),
<<"DELETE ", MaybeLimitMSSQL/binary,
" FROM mam_server_user WHERE server IN", ServerTable/binary>>),

mongoose_rdbms:prepare(mam_muc_make_tombstone, mam_muc_message, [message, room_id, id],
<<"UPDATE mam_muc_message SET message = ?, search_body = '' "
"WHERE room_id = ? AND id = ?">>),
Expand Down Expand Up @@ -173,8 +180,8 @@ env_vars(HostType, ArcJID) ->
decode_row_fn => fun row_to_uniform_format/2,
has_message_retraction => mod_mam_utils:has_message_retraction(mod_mam_muc, HostType),
has_full_text_search => mod_mam_utils:has_full_text_search(mod_mam_muc, HostType),
db_jid_codec => db_jid_codec(HostType, ?MODULE),
db_message_codec => db_message_codec(HostType, ?MODULE)}.
db_jid_codec => mod_mam_utils:db_jid_codec(HostType, ?MODULE),
db_message_codec => mod_mam_utils:db_message_codec(HostType, ?MODULE)}.

row_to_uniform_format(Row, Env) ->
mam_decoder:decode_muc_row(Row, Env).
Expand All @@ -196,14 +203,6 @@ column_names(Mappings) ->
%% ----------------------------------------------------------------------
%% Options

-spec db_jid_codec(host_type(), module()) -> module().
db_jid_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_jid_format).

-spec db_message_codec(host_type(), module()) -> module().
db_message_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_message_format).

-spec get_retract_id(exml:element(), env_vars()) -> none | mod_mam_utils:retraction_id().
get_retract_id(Packet, #{has_message_retraction := Enabled}) ->
mod_mam_utils:get_retract_id(Enabled, Packet).
Expand Down Expand Up @@ -316,14 +315,31 @@ remove_archive(Acc, HostType, ArcID, _ArcJID) ->
mongoose_domain_api:remove_domain_acc().
remove_domain(Acc, HostType, Domain) ->
F = fun() ->
SubHosts = get_subhosts(HostType, Domain),
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() ->
[remove_domain_trans(HostType, SubHost) || SubHost <- SubHosts]
end),
case gen_mod:get_module_opt(HostType, ?MODULE, delete_domain_limit) of
infinity -> remove_domain_all(HostType, Domain);
Limit -> remove_domain_batch(HostType, Domain, Limit)
end,
Acc
end,
end,
mongoose_domain_api:remove_domain_wrapper(Acc, F, ?MODULE).

-spec remove_domain_all(host_type(), jid:lserver()) -> any().
remove_domain_all(HostType, Domain) ->
SubHosts = get_subhosts(HostType, Domain),
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() ->
[remove_domain_trans(HostType, SubHost) || SubHost <- SubHosts]
end).

-spec remove_domain_batch(host_type(), jid:lserver(), non_neg_integer()) -> any().
remove_domain_batch(HostType, Domain, Limit) ->
SubHosts = get_subhosts(HostType, Domain),
DeleteQueries = [mam_muc_remove_domain, mam_muc_remove_domain_users],
DelSubHost = [ mod_mam_utils:incremental_delete_domain(HostType, SubHost, Limit, DeleteQueries, 0)
|| SubHost <- SubHosts],
TotalDeleted = lists:sum(DelSubHost),
?LOG_INFO(#{what => mam_muc_domain_removal_completed, total_records_deleted => TotalDeleted,
domain => Domain, host_type => HostType}).

remove_domain_trans(HostType, MucHost) ->
mongoose_rdbms:execute_successfully(HostType, mam_muc_remove_domain, [MucHost]),
mongoose_rdbms:execute_successfully(HostType, mam_muc_remove_domain_users, [MucHost]).
Expand Down
59 changes: 36 additions & 23 deletions src/mam/mod_mam_rdbms_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
%% Starting and stopping functions for users' archives

-spec start(host_type(), gen_mod:module_opts()) -> ok.
start(HostType, _Opts) ->
start(HostType, Opts) ->
start_hooks(HostType),
register_prepared_queries(),
register_prepared_queries(Opts),
ok.

-spec stop(host_type()) -> ok.
Expand Down Expand Up @@ -125,21 +125,28 @@ hooks(HostType) ->
%% ----------------------------------------------------------------------
%% SQL queries

register_prepared_queries() ->
register_prepared_queries(Opts) ->
prepare_insert(insert_mam_message, 1),
mongoose_rdbms:prepare(mam_archive_remove, mam_message, [user_id],
<<"DELETE FROM mam_message "
"WHERE user_id = ?">>),

%% Domain Removal
{MaybeLimitSQL, MaybeLimitMSSQL} = mod_mam_utils:batch_delete_limits(Opts),
IdTable = <<"(SELECT ", MaybeLimitMSSQL/binary,
" id from mam_server_user WHERE server = ? ", MaybeLimitSQL/binary, ")">>,
ServerTable = <<"(SELECT * FROM (SELECT", MaybeLimitMSSQL/binary,
" server FROM mam_server_user WHERE server = ? ", MaybeLimitSQL/binary, ") as t)">>,
mongoose_rdbms:prepare(mam_remove_domain, mam_message, ['mam_server_user.server'],
<<"DELETE FROM mam_message "
"WHERE user_id IN "
"(SELECT id from mam_server_user WHERE server = ?)">>),
"WHERE user_id IN ", IdTable/binary>>),
mongoose_rdbms:prepare(mam_remove_domain_prefs, mam_config, ['mam_server_user.server'],
<<"DELETE FROM mam_config "
"WHERE user_id IN "
"(SELECT id from mam_server_user WHERE server = ?)">>),
"WHERE user_id IN ", IdTable/binary>>),
mongoose_rdbms:prepare(mam_remove_domain_users, mam_server_user, [server],
<<"DELETE FROM mam_server_user WHERE server = ?">>),
<<"DELETE ", MaybeLimitMSSQL/binary,
" FROM mam_server_user WHERE server IN ", ServerTable/binary>>),

mongoose_rdbms:prepare(mam_make_tombstone, mam_message, [message, user_id, id],
<<"UPDATE mam_message SET message = ?, search_body = '' "
"WHERE user_id = ? AND id = ?">>),
Expand Down Expand Up @@ -197,8 +204,8 @@ env_vars(HostType, ArcJID) ->
decode_row_fn => fun row_to_uniform_format/2,
has_message_retraction => mod_mam_utils:has_message_retraction(mod_mam_pm, HostType),
has_full_text_search => mod_mam_utils:has_full_text_search(mod_mam_pm, HostType),
db_jid_codec => db_jid_codec(HostType, ?MODULE),
db_message_codec => db_message_codec(HostType, ?MODULE)}.
db_jid_codec => mod_mam_utils:db_jid_codec(HostType, ?MODULE),
db_message_codec => mod_mam_utils:db_message_codec(HostType, ?MODULE)}.

row_to_uniform_format(Row, Env) ->
mam_decoder:decode_row(Row, Env).
Expand Down Expand Up @@ -226,14 +233,6 @@ column_names(Mappings) ->
%% ----------------------------------------------------------------------
%% Options

-spec db_jid_codec(host_type(), module()) -> module().
db_jid_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_jid_format).

-spec db_message_codec(host_type(), module()) -> module().
db_message_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_message_format).

-spec get_retract_id(exml:element(), env_vars()) -> none | mod_mam_utils:retraction_id().
get_retract_id(Packet, #{has_message_retraction := Enabled}) ->
mod_mam_utils:get_retract_id(Enabled, Packet).
Expand Down Expand Up @@ -345,15 +344,29 @@ remove_archive(Acc, HostType, ArcID, _ArcJID) ->
mongoose_domain_api:remove_domain_acc().
remove_domain(Acc, HostType, Domain) ->
F = fun() ->
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() ->
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain, [Domain]),
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_prefs, [Domain]),
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_users, [Domain])
end),
case gen_mod:get_module_opt(HostType, ?MODULE, delete_domain_limit) of
infinity -> remove_domain_all(HostType, Domain);
Limit -> remove_domain_batch(HostType, Domain, Limit)
end,
Acc
end,
mongoose_domain_api:remove_domain_wrapper(Acc, F, ?MODULE).

-spec remove_domain_all(host_type(), jid:lserver()) -> any().
remove_domain_all(HostType, Domain) ->
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() ->
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain, [Domain]),
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_prefs, [Domain]),
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_users, [Domain])
end).

-spec remove_domain_batch(host_type(), jid:lserver(), non_neg_integer()) -> any().
remove_domain_batch(HostType, Domain, Limit) ->
DeleteQueries = [mam_remove_domain, mam_remove_domain_prefs, mam_remove_domain_users],
TotalDeleted = mod_mam_utils:incremental_delete_domain(HostType, Domain, Limit, DeleteQueries, 0),
?LOG_INFO(#{what => mam_domain_removal_completed, total_records_deleted => TotalDeleted,
domain => Domain, host_type => HostType}).

%% GDPR logic
extract_gdpr_messages(Env, ArcID) ->
Filters = [{equal, user_id, ArcID}],
Expand Down
45 changes: 40 additions & 5 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@

%% Shared logic
-export([check_result_for_policy_violation/2,
lookup/3]).
lookup/3,
batch_delete_limits/1, incremental_delete_domain/5,
db_message_codec/2, db_jid_codec/2]).

-callback extra_fin_element(mongooseim:host_type(),
mam_iq:lookup_params(),
Expand Down Expand Up @@ -1159,7 +1161,7 @@ check_for_item_not_found(#rsm_in{direction = before, id = ID},
_PageSize, {TotalCount, Offset, MessageRows}) ->
case maybe_last(MessageRows) of
{ok, #{id := ID}} ->
{ok, {TotalCount, Offset, list_without_last(MessageRows)}};
{ok, {TotalCount, Offset, lists:droplast(MessageRows)}};
undefined ->
{error, item_not_found}
end;
Expand Down Expand Up @@ -1221,10 +1223,43 @@ set_complete_result_page_using_extra_message(PageSize, Params, Result = #{messag
remove_extra_message(Params, Messages) ->
case maps:get(ordering_direction, Params, forward) of
forward ->
list_without_last(Messages);
lists:droplast(Messages);
backward ->
tl(Messages)
end.

list_without_last(List) ->
lists:reverse(tl(lists:reverse(List))).
-spec db_jid_codec(mongooseim:host_type(), module()) -> module().
db_jid_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_jid_format).

-spec db_message_codec(mongooseim:host_type(), module()) -> module().
db_message_codec(HostType, Module) ->
gen_mod:get_module_opt(HostType, Module, db_message_format).

-spec batch_delete_limits(#{delete_domain_limit := infinity | non_neg_integer()}) ->
{binary(), binary()}.
batch_delete_limits(#{delete_domain_limit := infinity}) ->
{<<>>, <<>>};
batch_delete_limits(#{delete_domain_limit := Limit}) ->
rdbms_queries:get_db_specific_limits_binaries(Limit).

-spec incremental_delete_domain(
mongooseim:host_type(), jid:lserver(), non_neg_integer(), [atom()], non_neg_integer()) ->
non_neg_integer().
incremental_delete_domain(_HostType, _Domain, _Limit, [], TotalDeleted) ->
TotalDeleted;
incremental_delete_domain(HostType, Domain, Limit, [Query | MoreQueries] = AllQueries, TotalDeleted) ->
R1 = mongoose_rdbms:execute_successfully(HostType, Query, [Domain]),
case is_removing_done(R1, Limit) of
{done, N} ->
incremental_delete_domain(HostType, Domain, Limit, MoreQueries, N + TotalDeleted);
{remove_more, N} ->
incremental_delete_domain(HostType, Domain, Limit, AllQueries, N + TotalDeleted)
end.

-spec is_removing_done(LastResult :: {updated, non_neg_integer()}, Limit :: non_neg_integer()) ->
{done | remove_more, non_neg_integer()}.
is_removing_done({updated, N}, Limit) when N < Limit ->
{done, N};
is_removing_done({updated, N}, _)->
{remove_more, N}.
Loading

0 comments on commit 975bb21

Please sign in to comment.