Skip to content

Commit

Permalink
Reimplement CSI using the new c2s framework
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Nov 29, 2022
1 parent af90cc5 commit 68a863f
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 39 deletions.
4 changes: 1 addition & 3 deletions doc/modules/mod_csi.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
## Module Description
Enables [XEP-0352: Client State Indication](http://xmpp.org/extensions/xep-0352.html) functionality.
It is implemented mostly in `ejabberd_c2s`, this module is just a "starter", to advertise the `csi` stream feature.

The Client State Indication functionality will be possible to use even without enabling this module, but the feature will not be present in the stream features list.
Enables [XEP-0352: Client State Indication](http://xmpp.org/extensions/xep-0352.html) functionality.

The XEP doesn't **require** any specific server behaviour in response to CSI stanzas, there are only some suggestions.
The implementation in MongooseIM will simply buffer all packets (up to a configured limit) when the session is "inactive" and will flush the buffer when it becomes "active" again.
Expand Down
162 changes: 126 additions & 36 deletions src/mod_csi.erl
Original file line number Diff line number Diff line change
@@ -1,51 +1,52 @@
%% @doc Client State Indication.
%%
%% Includes support for XEP-0352: Client State Indication.
%%
-module(mod_csi).
-xep([{xep, 352}, {version, "0.2"}]).

-xep([{xep, 352}, {version, "1.0.0"}]).

-include("mongoose_config_spec.hrl").
-include("jlib.hrl").

-behaviour(gen_mod).
-behaviour(mongoose_module_metrics).

%% gen_mod callbacks
-export([start/2,
stop/1,
config_spec/0,
supported_features/0]).
-export([start/2, stop/1, config_spec/0, supported_features/0]).

%% Hook handlers
-export([c2s_stream_features/3]).

-ignore_xref([c2s_stream_features/3]).
-export([c2s_stream_features/3,
user_receive_packet/3,
user_send_xmlel/3,
reroute_unacked_messages/3
]).

-include("jlib.hrl").
-include("mongoose_config_spec.hrl").

-type state() :: active | inactive.
-record(csi_state, {
state = active :: state(),
buffer = [] :: [mongoose_acc:t()],
buffer_max = 20 :: non_neg_integer()
}).

-export_type([state/0]).
-type state() :: active | inactive | flushing.
-type csi_state() :: #csi_state{}.

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
start(HostType, _Opts) ->
gen_hook:add_handlers(hooks(HostType)),
ensure_metrics(HostType),
ok.
gen_hook:add_handlers(hooks(HostType)).

-spec stop(mongooseim:host_type()) -> ok.
stop(HostType) ->
gen_hook:delete_handlers(hooks(HostType)),
ok.
gen_hook:delete_handlers(hooks(HostType)).

hooks(HostType) ->
[{c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 60}].
[
{c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 60},
{user_receive_packet, HostType, fun ?MODULE:user_receive_packet/3, #{}, 10}, %% before stream management!
{user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 60},
{reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 70}
].

ensure_metrics(HostType) ->
mongoose_metrics:ensure_metric(HostType, [HostType, modCSIInactive], spiral),
mongoose_metrics:ensure_metric(HostType, [HostType, modCSIInactive], spiral).

%%%
%%% config_spec
%%%
mongoose_metrics:ensure_metric(HostType, [HostType, modCSIActive], spiral).

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
Expand All @@ -59,17 +60,106 @@ config_spec() ->
supported_features() ->
[dynamic_domains].

%%%
%%% Hook handlers
%%%

-spec c2s_stream_features(Acc, Params, Extra) -> {ok, Acc} when
Acc :: [exml:element()],
Params :: map(),
Extra :: map().
%%% Hook handlers
-spec c2s_stream_features(Acc, map(), gen_hook:extra()) -> {ok, Acc} when Acc :: [exml:element()].
c2s_stream_features(Acc, _, _) ->
{ok, lists:keystore(<<"csi">>, #xmlel.name, Acc, csi())}.

%% The XEP doesn't require any specific server behaviour in response to CSI stanzas,
%% there are only some suggestions. The implementation in MongooseIM will simply buffer
%% all packets (up to a configured limit) when the session is "inactive" and will flush
%% the buffer when it becomes "active" again.
-spec user_receive_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
user_receive_packet(Acc, #{c2s_data := C2SData}, _Extra) ->
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Csi} ->
handle_receive_packet(Acc, Csi);
_ ->
{ok, Acc}
end.

-spec handle_receive_packet(mongoose_acc:t(), csi_state()) -> mongoose_c2s_hooks:result().
handle_receive_packet(Acc, Csi = #csi_state{state = inactive, buffer = Buffer, buffer_max = BMax}) ->
case length(Buffer) + 1 >= BMax of
true ->
NewBuffer = [Acc | Buffer],
NewCsi = Csi#csi_state{state = flushing, buffer = NewBuffer},
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {route, lists:reverse(NewBuffer)}],
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
_ ->
NewCsi = Csi#csi_state{buffer = [Acc | Buffer]},
{stop, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})}
end;
handle_receive_packet(Acc, Csi = #csi_state{state = flushing, buffer = []}) ->
NewCsi = Csi#csi_state{state = inactive},
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
handle_receive_packet(Acc, Csi = #csi_state{state = flushing, buffer = [_ | Rest]}) ->
NewCsi = Csi#csi_state{buffer = Rest},
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
handle_receive_packet(Acc, _) ->
{ok, Acc}.

-spec user_send_xmlel(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
user_send_xmlel(Acc, Params, _Extra) ->
El = mongoose_acc:element(Acc),
case exml_query:attr(El, <<"xmlns">>) of
?NS_CSI ->
{stop, handle_csi_request(Acc, Params, El)};
_ ->
{ok, Acc}
end.

-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
reroute_unacked_messages(Acc, #{c2s_data := C2SData}, _) ->
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Csi = #csi_state{buffer = Buffer}} ->
mongoose_c2s:reroute_buffer(C2SData, Buffer),
NewCsi = Csi#csi_state{state = active, buffer = []},
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
_ ->
{ok, Acc}
end.

-spec handle_csi_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
mongoose_acc:t().
handle_csi_request(Acc, Params, #xmlel{name = <<"inactive">>}) ->
handle_inactive_request(Acc, Params);
handle_csi_request(Acc, Params, #xmlel{name = <<"active">>}) ->
handle_active_request(Acc, Params).

-spec handle_inactive_request(mongoose_acc:t(), mongoose_c2s_hooks:params()) -> mongoose_acc:t().
handle_inactive_request(Acc, #{c2s_data := C2SData} = _Params) ->
HostType = mongoose_c2s:get_host_type(C2SData),
mongoose_metrics:update(HostType, modCSIInactive, 1),
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{error, not_found} ->
BMax = gen_mod:get_module_opt(HostType, ?MODULE, buffer_max),
Csi = #csi_state{state = inactive, buffer_max = BMax},
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, Csi});
{ok, Csi = #csi_state{}} ->
NewCsi = Csi#csi_state{state = inactive},
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi});
_ ->
Acc
end.

-spec handle_active_request(mongoose_acc:t(), mongoose_c2s_hooks:params()) -> mongoose_acc:t().
handle_active_request(Acc, #{c2s_data := C2SData}) ->
HostType = mongoose_c2s:get_host_type(C2SData),
mongoose_metrics:update(HostType, modCSIActive, 1),
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Csi = #csi_state{state = inactive, buffer = Buffer}} ->
NewCsi = Csi#csi_state{state = active, buffer = []},
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {route, lists:reverse(Buffer)}],
mongoose_c2s_acc:to_acc_many(Acc, ToAcc);
_ ->
Acc
end.

-spec csi() -> exml:element().
csi() ->
#xmlel{name = <<"csi">>,
attrs = [{<<"xmlns">>, ?NS_CSI}]}.
#xmlel{name = <<"csi">>, attrs = [{<<"xmlns">>, ?NS_CSI}]}.

0 comments on commit 68a863f

Please sign in to comment.