diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index cfb4130bc6..6f7509e4ac 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -34,7 +34,7 @@ -define(MOD_SM, mod_stream_management). -define(CONSTRAINT_CHECK_TIMEOUT, 5000). -define(LONG_TIMEOUT, 3600). --define(SHORT_TIMEOUT, 3). +-define(SHORT_TIMEOUT, 1). -define(SMALL_SM_BUFFER, 3). %%-------------------------------------------------------------------- @@ -840,7 +840,8 @@ resume_session_with_wrong_h_does_not_leak_sessions(Config) -> escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), [] = sm_helper:get_user_present_resources(Alice), - {error, smid_not_found} = sm_helper:get_sid_by_stream_id(SMID), + HostType = host_type(), + {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID), escalus_connection:wait_for_close(Alice, timer:seconds(5)) end). @@ -860,7 +861,8 @@ resume_session_with_wrong_namespace_is_a_noop(Config) -> resume_dead_session_results_in_item_not_found(Config) -> SMID = base64:encode(crypto:strong_rand_bytes(21)), SID = {os:timestamp(), undefined}, - rpc(mim(), ?MOD_SM, register_smid, [SMID, SID]), + HostType = host_type(), + rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), session_resumption_expects_item_not_found(Config, SMID). session_resumption_expects_item_not_found(Config, SMID) -> diff --git a/big_tests/tests/sm_helper.erl b/big_tests/tests/sm_helper.erl index 3c80639401..dfe3689ee6 100644 --- a/big_tests/tests/sm_helper.erl +++ b/big_tests/tests/sm_helper.erl @@ -4,7 +4,7 @@ -export([client_to_spec0/1, client_to_spec/1, client_to_smid/1, - get_sid_by_stream_id/1]). + get_sid_by_stream_id/2]). %% Connection helpers -export([connect_fresh/3, @@ -55,8 +55,8 @@ client_to_spec(#client{props = Props}) -> client_to_spec0(#client{props = Props}) -> lists:foldl(fun proplists:delete/2, Props, [stream_id, resource]). -get_sid_by_stream_id(SMID) -> - rpc(mim(), ?MOD_SM, get_sid, [SMID]). +get_sid_by_stream_id(HostType, SMID) -> + rpc(mim(), ?MOD_SM, get_sid, [HostType, SMID]). %% Connection helpers diff --git a/doc/modules/mod_stream_management.md b/doc/modules/mod_stream_management.md index 87609851fd..0642b87db4 100644 --- a/doc/modules/mod_stream_management.md +++ b/doc/modules/mod_stream_management.md @@ -7,6 +7,11 @@ while the management of the session tables and configuration is implemented in ## Options +### `modules.mod_stream_management.backend` +* **Syntax:** string. +* **Default:** "mnesia" +* **Example:** `backend = "mnesia"` + ### `modules.mod_stream_management.buffer` * **Syntax:** boolean * **Default:** true diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index d43b5eb8cb..bbdcd9dd58 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2798,13 +2798,13 @@ maybe_enable_stream_mgmt(NextState, El, StateData = #state{host_type = HostType} c2s_stream_error(mongoose_xmpp_errors:invalid_namespace(), StateData) end. -enable_stream_resumption(SD) -> +enable_stream_resumption(SD = #state{host_type = HostType}) -> SMID = mod_stream_management:make_smid(), SID = case SD#state.sid of undefined -> ejabberd_sm:make_new_sid(); RSID -> RSID end, - ok = mod_stream_management:register_smid(SMID, SID), + ok = mod_stream_management:register_smid(HostType, SMID, SID), {SD#state{stream_mgmt_id = SMID, sid = SID}, stream_mgmt_enabled([{<<"id">>, SMID}, {<<"resume">>, <<"true">>}])}. @@ -3105,7 +3105,7 @@ do_resume_session(SMID, El, {sid, {_, Pid}}, StateData) -> Info = #{ip => NSD#state.ip, conn => NSD#state.conn, auth_module => NSD#state.auth_module }, ejabberd_sm:open_session(NSD#state.host_type, SID, NSD#state.jid, Priority, Info), - ok = mod_stream_management:register_smid(SMID, SID), + ok = mod_stream_management:register_smid(NSD#state.host_type, SMID, SID), try Resumed = stream_mgmt_resumed(NSD#state.stream_mgmt_id, NSD#state.stream_mgmt_in), diff --git a/src/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl similarity index 71% rename from src/mod_stream_management.erl rename to src/stream_management/mod_stream_management.erl index a5be68f811..b5edf8bb98 100644 --- a/src/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -21,28 +21,25 @@ get_buffer_max/2, get_ack_freq/2, get_resume_timeout/2, - register_smid/2]). + register_smid/3]). %% API for inspection and tests --export([get_sid/1, +-export([get_sid/2, get_stale_h/2, register_stale_smid_h/3, remove_stale_smid_h/2]). --ignore_xref([c2s_stream_features/3, get_sid/1, get_stale_h/2, remove_smid/5, +-ignore_xref([c2s_stream_features/3, get_sid/2, get_stale_h/2, remove_smid/5, register_stale_smid_h/3, remove_stale_smid_h/2, session_cleanup/5]). -type smid() :: base64:ascii_binary(). +-export_type([smid/0]). + -include("mongoose.hrl"). -include("jlib.hrl"). -include("mongoose_config_spec.hrl"). --record(sm_session, - {smid :: smid(), - sid :: ejabberd_sm:sid() - }). - -type buffer_max() :: pos_integer() | infinity | no_buffer. -type ack_freq() :: pos_integer() | never. %% @@ -50,13 +47,9 @@ %% start(HostType, Opts) -> + mod_stream_management_backend:init(HostType, Opts), ?LOG_INFO(#{what => stream_management_starting}), ejabberd_hooks:add(hooks(HostType)), - mnesia:create_table(sm_session, [{ram_copies, [node()]}, - {attributes, record_info(fields, sm_session)}]), - mnesia:add_table_index(sm_session, sid), - mnesia:add_table_copy(sm_session, node(), ram_copies), - stream_management_stale_h:maybe_start(Opts), ok. stop(HostType) -> @@ -72,7 +65,8 @@ hooks(HostType) -> -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{ - items = #{<<"buffer">> => #option{type = boolean}, + items = #{<<"backend">> => #option{type = atom, validate = {module, ?MODULE}}, + <<"buffer">> => #option{type = boolean}, <<"buffer_max">> => #option{type = int_or_infinity, validate = positive}, <<"ack">> => #option{type = boolean}, @@ -148,15 +142,12 @@ session_cleanup(Acc, _LUser, _LServer, _LResource, SID) -> mongoose_acc:t(). do_remove_smid(Acc, HostType, SID) -> H = mongoose_acc:get(stream_mgmt, h, undefined, Acc), - MaybeSMID = case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of - [] -> {error, smid_not_found}; - [#sm_session{smid = SMID}] -> - mnesia:dirty_delete(sm_session, SMID), - case H of - undefined -> ok; - _ -> register_stale_smid_h(HostType, SMID, H) - end, - {ok, SMID} + MaybeSMID = unregister_smid(HostType, SID), + case MaybeSMID of + {ok, SMID} when H =/= undefined -> + register_stale_smid_h(HostType, SMID, H); + _ -> + ok end, mongoose_acc:set(stream_mgmt, smid, MaybeSMID, Acc). @@ -172,26 +163,17 @@ make_smid() -> -spec get_session_from_smid(mongooseim:host_type(), smid()) -> {sid, ejabberd_sm:sid()} | {stale_h, non_neg_integer()} | {error, smid_not_found}. get_session_from_smid(HostType, SMID) -> - case get_sid(SMID) of + case get_sid(HostType, SMID) of {sid, SID} -> {sid, SID}; {error, smid_not_found} -> get_stale_h(HostType, SMID) end. --spec get_sid(smid()) -> - {sid, ejabberd_sm:sid()} | {error, smid_not_found}. -get_sid(SMID) -> - case mnesia:dirty_read(sm_session, SMID) of - [#sm_session{sid = SID}] -> {sid, SID}; - [] -> {error, smid_not_found} - end. - -spec get_stale_h(mongooseim:host_type(), SMID :: smid()) -> {stale_h, non_neg_integer()} | {error, smid_not_found}. get_stale_h(HostType, SMID) -> - MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), - case proplists:get_value(enabled, MaybeModOpts, false) of + case is_stale_h_enabled(HostType) of false -> {error, smid_not_found}; - true -> stream_management_stale_h:read_stale_h(SMID) + true -> read_stale_h(HostType, SMID) end. -spec get_buffer_max(mongooseim:host_type(), buffer_max()) -> buffer_max(). @@ -206,26 +188,61 @@ get_ack_freq(HostType, Default) -> get_resume_timeout(HostType, Default) -> gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout, Default). -%% Setters -register_smid(SMID, SID) -> - try - mnesia:sync_dirty(fun mnesia:write/1, - [#sm_session{smid = SMID, sid = SID}]), - ok - catch exit:Reason -> - {error, Reason} - end. register_stale_smid_h(HostType, SMID, H) -> - MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), - case proplists:get_value(enabled, MaybeModOpts, false) of + case is_stale_h_enabled(HostType) of false -> ok; - true -> stream_management_stale_h:write_stale_h(SMID, H) + true -> write_stale_h(HostType, SMID, H) end. remove_stale_smid_h(HostType, SMID) -> - MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), - case proplists:get_value(enabled, MaybeModOpts, false) of + case is_stale_h_enabled(HostType) of false -> ok; - true -> stream_management_stale_h:delete_stale_h(SMID) + true -> delete_stale_h(HostType, SMID) end. + +is_stale_h_enabled(HostType) -> + MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []), + proplists:get_value(enabled, MaybeModOpts, false). + +%% Backend operations + +-spec register_smid(HostType, SMID, SID) -> + ok | {error, term()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + SID :: ejabberd_sm:sid(). +register_smid(HostType, SMID, SID) -> + mod_stream_management_backend:register_smid(HostType, SMID, SID). + +-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> + {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. +unregister_smid(HostType, SID) -> + mod_stream_management_backend:unregister_smid(HostType, SID). + +-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> + {sid, ejabberd_sm:sid()} | {error, smid_not_found}. +get_sid(HostType, SMID) -> + mod_stream_management_backend:get_sid(HostType, SMID). + +%% stale_h + +-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + H :: non_neg_integer(). +write_stale_h(HostType, SMID, H) -> + mod_stream_management_backend:write_stale_h(HostType, SMID, H). + +-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +delete_stale_h(HostType, SMID) -> + mod_stream_management_backend:delete_stale_h(HostType, SMID). + +-spec read_stale_h(HostType, SMID) -> + {stale_h, non_neg_integer()} | {error, smid_not_found} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +read_stale_h(HostType, SMID) -> + mod_stream_management_backend:read_stale_h(HostType, SMID). diff --git a/src/stream_management/mod_stream_management_backend.erl b/src/stream_management/mod_stream_management_backend.erl new file mode 100644 index 0000000000..d7c0b95692 --- /dev/null +++ b/src/stream_management/mod_stream_management_backend.erl @@ -0,0 +1,105 @@ +-module(mod_stream_management_backend). +-export([init/2, + register_smid/3, + unregister_smid/2, + get_sid/2]). + +-export([read_stale_h/2, + write_stale_h/3, + delete_stale_h/2]). + +-define(MAIN_MODULE, mod_stream_management). + +%% ---------------------------------------------------------------------- +%% Callbacks +%% (exactly the same as specs in this module) + +-callback init(HostType, Opts) -> ok when + HostType :: mongooseim:host_type(), + Opts :: gen_mod:module_opts(). + +-callback register_smid(HostType, SMID, SID) -> + ok | {error, term()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + SID :: ejabberd_sm:sid(). + +-callback unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> + {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. + +-callback get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> + {sid, ejabberd_sm:sid()} | {error, smid_not_found}. + +%% stale_h functions + +-callback read_stale_h(HostType, SMID) -> + {stale_h, non_neg_integer()} | {error, smid_not_found} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). + +-callback write_stale_h(HostType, SMID, H) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + H :: non_neg_integer(). + +-callback delete_stale_h(HostType, SMID) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). + +%% ---------------------------------------------------------------------- +%% API Functions + +-spec init(HostType, Opts) -> ok when + HostType :: mongooseim:host_type(), + Opts :: gen_mod:module_opts(). +init(HostType, Opts) -> + TrackedFuns = [], + mongoose_backend:init(HostType, ?MAIN_MODULE, TrackedFuns, Opts), + Args = [HostType, Opts], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec register_smid(HostType, SMID, SID) -> + ok | {error, term()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + SID :: ejabberd_sm:sid(). +register_smid(HostType, SMID, SID) -> + Args = [HostType, SMID, SID], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> + {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. +unregister_smid(HostType, SID) -> + Args = [HostType, SID], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> + {sid, ejabberd_sm:sid()} | {error, smid_not_found}. +get_sid(HostType, SMID) -> + Args = [HostType, SMID], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +%% stale_h functions + +-spec read_stale_h(HostType, SMID) -> + {stale_h, non_neg_integer()} | {error, smid_not_found} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +read_stale_h(HostType, SMID) -> + Args = [HostType, SMID], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + H :: non_neg_integer(). +write_stale_h(HostType, SMID, H) -> + Args = [HostType, SMID, H], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +delete_stale_h(HostType, SMID) -> + Args = [HostType, SMID], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). diff --git a/src/stream_management/mod_stream_management_mnesia.erl b/src/stream_management/mod_stream_management_mnesia.erl new file mode 100644 index 0000000000..2a8edfcfe1 --- /dev/null +++ b/src/stream_management/mod_stream_management_mnesia.erl @@ -0,0 +1,174 @@ +-module(mod_stream_management_mnesia). +-behaviour(mod_stream_management_backend). +-behaviour(gen_server). + +-include("mongoose.hrl"). +-include("jlib.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([init/2, + register_smid/3, + unregister_smid/2, + get_sid/2]). + +-export([read_stale_h/2, + write_stale_h/3, + delete_stale_h/2]). + +%% Internal exports +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-ignore_xref([start_link/1]). + +-record(smgc_state, + {gc_repeat_after :: non_neg_integer(), + gc_geriatric :: non_neg_integer() }). + +-record(stream_mgmt_stale_h, + {smid :: mod_stream_management:smid(), + h :: non_neg_integer(), + stamp :: non_neg_integer() }). + +-record(sm_session, + {smid :: mod_stream_management:smid(), + sid :: ejabberd_sm:sid() }). + +init(_HostType, Opts) -> + mnesia:create_table(sm_session, [{ram_copies, [node()]}, + {attributes, record_info(fields, sm_session)}]), + mnesia:add_table_index(sm_session, sid), + mnesia:add_table_copy(sm_session, node(), ram_copies), + maybe_init_stale_h(Opts), + ok. + +maybe_init_stale_h(Opts) -> + StaleOpts = gen_mod:get_opt(stale_h, Opts, [{enabled, false}]), + case proplists:get_value(enabled, StaleOpts, false) of + false -> + ok; + true -> + ?LOG_INFO(#{what => stream_mgmt_stale_h_start}), + mnesia:create_table(stream_mgmt_stale_h, + [{ram_copies, [node()]}, + {attributes, record_info(fields, stream_mgmt_stale_h)}]), + mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies), + start_cleaner(StaleOpts) + end. + +-spec register_smid(HostType, SMID, SID) -> + ok | {error, term()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + SID :: ejabberd_sm:sid(). +register_smid(_HostType, SMID, SID) -> + try + mnesia:sync_dirty(fun mnesia:write/1, + [#sm_session{smid = SMID, sid = SID}]), + ok + catch exit:Reason -> + {error, Reason} + end. + +-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> + {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. +unregister_smid(_HostType, SID) -> + case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of + [] -> + {error, smid_not_found}; + [#sm_session{smid = SMID}] -> + mnesia:dirty_delete(sm_session, SMID), + {ok, SMID} + end. + +-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> + {sid, ejabberd_sm:sid()} | {error, smid_not_found}. +get_sid(_HostType, SMID) -> + case mnesia:dirty_read(sm_session, SMID) of + [#sm_session{sid = SID}] -> {sid, SID}; + [] -> {error, smid_not_found} + end. + +%% stale_h functions + +-spec read_stale_h(HostType, SMID) -> + {stale_h, non_neg_integer()} | {error, smid_not_found} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +read_stale_h(_HostType, SMID) -> + try + case mnesia:dirty_read(stream_mgmt_stale_h, SMID) of + [#stream_mgmt_stale_h{h = H}] -> {stale_h, H}; + [] -> {error, smid_not_found} + end + catch exit:_Reason -> + {error, smid_not_found} + end. + +-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(), + H :: non_neg_integer(). +write_stale_h(_HostType, SMID, H) -> + try + Stamp = erlang:monotonic_time(second), + mnesia:dirty_write(#stream_mgmt_stale_h{smid = SMID, h = H, stamp = Stamp}) + catch exit:Reason -> + {error, Reason} + end. + +-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when + HostType :: mongooseim:host_type(), + SMID :: mod_stream_management:smid(). +delete_stale_h(_HostType, SMID) -> + try + mnesia:dirty_delete(stream_mgmt_stale_h, SMID) + catch exit:Reason -> + {error, Reason} + end. + +%% stale_h cleaning logic + +start_cleaner(Opts) -> + MFA = {?MODULE, start_link, [Opts]}, + ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]}, + %% TODO cleaner should be a service + ejabberd_sup:start_child(ChildSpec). + +start_link(Opts) -> + gen_server:start_link({local, stream_management_stale_h}, ?MODULE, [Opts], []). + +init([Opts]) -> + %% In seconds + RepeatAfter = proplists:get_value(stale_h_repeat_after, Opts, 1800), + GeriatricAge = proplists:get_value(stale_h_geriatric, Opts, 3600), + State = #smgc_state{gc_repeat_after = RepeatAfter, + gc_geriatric = GeriatricAge}, + schedule_check(State), + {ok, State}. + +handle_call(Msg, From, State) -> + ?UNEXPECTED_CALL(Msg, From), + {reply, ok, State}. + +handle_cast(Msg, State) -> + ?UNEXPECTED_CAST(Msg), + {noreply, State}. + +handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) -> + clear_table(GeriatricAge), + schedule_check(State), + {noreply, State}; +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info), + {noreply, State}. + +schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) -> + erlang:send_after(RepeatAfter * 1000, self(), check). + +clear_table(GeriatricAge) -> + TimeToDie = erlang:monotonic_time(second) - GeriatricAge, + MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp = S}) when S < TimeToDie -> true end), + ets:select_delete(stream_mgmt_stale_h, MS). diff --git a/src/stream_management/stream_management_stale_h.erl b/src/stream_management/stream_management_stale_h.erl deleted file mode 100644 index 79fc1b4e08..0000000000 --- a/src/stream_management/stream_management_stale_h.erl +++ /dev/null @@ -1,121 +0,0 @@ --module(stream_management_stale_h). --behaviour(gen_server). - --include("mongoose.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). - --record(smgc_state, - {gc_repeat_after :: non_neg_integer(), - gc_geriatric :: non_neg_integer() - }). - --record(stream_mgmt_stale_h, - {smid :: mod_stream_management:smid(), - h :: non_neg_integer(), - stamp :: non_neg_integer() - }). - --export([read_stale_h/1, - write_stale_h/2, - delete_stale_h/1, - clear_table/1 - ]). - --export([maybe_start/1]). - -%% Internal exports --export([start_link/1]). -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2 - ]). - --ignore_xref([clear_table/1, start_link/1]). - --spec read_stale_h(SMID :: mod_stream_management:smid()) -> - {stale_h, non_neg_integer()} | {error, smid_not_found}. -read_stale_h(SMID) -> - try - case mnesia:dirty_read(stream_mgmt_stale_h, SMID) of - [#stream_mgmt_stale_h{h = H}] -> {stale_h, H}; - [] -> {error, smid_not_found} - end - catch exit:_Reason -> - {error, smid_not_found} - end. - --spec write_stale_h(SMID :: mod_stream_management:smid(), H :: non_neg_integer()) -> - ok | {error, any()}. -write_stale_h(SMID, H) -> - try - Stamp = erlang:monotonic_time(second), - mnesia:dirty_write(#stream_mgmt_stale_h{smid = SMID, h = H, stamp = Stamp}) - catch exit:Reason -> - {error, Reason} - end. - --spec delete_stale_h(SMID :: mod_stream_management:smid()) -> - ok | {error, any()}. -delete_stale_h(SMID) -> - try - mnesia:dirty_delete(stream_mgmt_stale_h, SMID) - catch exit:Reason -> - {error, Reason} - end. - - -%% -%% gen_server -maybe_start(Opts) -> - StaleOpts = gen_mod:get_opt(stale_h, Opts, [{enabled, false}]), - case proplists:get_value(enabled, StaleOpts, false) of - false -> - ok; - true -> - ?LOG_INFO(#{what => stream_mgmt_stale_h_start}), - mnesia:create_table(stream_mgmt_stale_h, - [{ram_copies, [node()]}, - {attributes, record_info(fields, stream_mgmt_stale_h)}]), - mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies), - start_cleaner(StaleOpts) - end. - -start_cleaner(Opts) -> - ChildSpec = {?MODULE, - {?MODULE, start_link, [Opts]}, - permanent, 5000, worker, [?MODULE]}, - ejabberd_sup:start_child(ChildSpec). - -start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - -init([GCOpts]) -> - RepeatAfter = proplists:get_value(stale_h_repeat_after, GCOpts, 1800), - GeriatricAge = proplists:get_value(stale_h_geriatric, GCOpts, 3600), - State = #smgc_state{gc_repeat_after = RepeatAfter, - gc_geriatric = GeriatricAge}, - {ok, State, RepeatAfter}. - -handle_call(Msg, From, State) -> - ?UNEXPECTED_CALL(Msg, From), - {reply, ok, State}. - -handle_cast(Msg, State) -> - ?UNEXPECTED_CAST(Msg), - {noreply, State}. - -handle_info(timeout, #smgc_state{gc_repeat_after = RepeatAfter, - gc_geriatric = GeriatricAge} = State) -> - clear_table(GeriatricAge), - {noreply, State, RepeatAfter}; -handle_info(Info, #smgc_state{gc_repeat_after = RepeatAfter, - gc_geriatric = _GeriatricAge} = State) -> - ?UNEXPECTED_INFO(Info), - {noreply, State, RepeatAfter}. - -clear_table(GeriatricAge) -> - TimeToDie = erlang:monotonic_time(second) - GeriatricAge, - MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp=S}) when S < TimeToDie -> true end), - ets:select_delete(stream_mgmt_stale_h, MS). diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 110e6cafbd..3ae81b2080 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -2900,12 +2900,15 @@ mod_stream_management(_Config) -> ?cfgh(M([{ack_freq, never}]), T(#{<<"ack">> => false})), ?cfgh(M([{ack_freq, 1}]), T(#{<<"ack_freq">> => 1})), ?cfgh(M([{resume_timeout, 600}]), T(#{<<"resume_timeout">> => 600})), + ?cfgh(M([{backend, mnesia}]), + T(#{<<"backend">> => <<"mnesia">>})), ?errh(T(#{<<"buffer">> => 0})), ?errh(T(#{<<"buffer_max">> => -1})), ?errh(T(#{<<"ack">> => <<"false">>})), ?errh(T(#{<<"ack_freq">> => 0})), - ?errh(T(#{<<"resume_timeout">> => true})). + ?errh(T(#{<<"resume_timeout">> => true})), + ?errh(T(#{<<"backend">> => <<"iloveyou">>})). mod_stream_management_stale_h(_Config) -> T = fun(Opts) -> #{<<"modules">> => diff --git a/test/mongoose_cleanup_SUITE.erl b/test/mongoose_cleanup_SUITE.erl index 82e9a247eb..41ac6a54a0 100644 --- a/test/mongoose_cleanup_SUITE.erl +++ b/test/mongoose_cleanup_SUITE.erl @@ -124,10 +124,10 @@ stream_management(_Config) -> {U, S, R, _JID, SID} = get_fake_session(), mod_stream_management:start(HostType, []), SMID = <<"123">>, - mod_stream_management:register_smid(SMID, SID), - {sid, SID} = mod_stream_management:get_sid(SMID), + mod_stream_management:register_smid(HostType, SMID, SID), + {sid, SID} = mod_stream_management:get_sid(HostType, SMID), mongoose_hooks:session_cleanup(S, new_acc(S), U, R, SID), - {error, smid_not_found} = mod_stream_management:get_sid(SMID). + {error, smid_not_found} = mod_stream_management:get_sid(HostType, SMID). local(_Config) -> ejabberd_local:start_link(),