Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C2S/csi #3880

Merged
merged 21 commits into from
Jan 4, 2023
Merged

C2S/csi #3880

Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
20bc77e
Enable csi tests
NelsonVides Nov 26, 2022
810e6c3
Cleanup CSI test suite
NelsonVides Nov 17, 2022
8743faf
Clean some details about CSI tests
NelsonVides Dec 27, 2022
8e21d57
Ensure inactive is not reordered behind messages
NelsonVides Dec 27, 2022
fdb16ef
Test that inactive twice does not reset CSI buffer
NelsonVides Nov 17, 2022
a71af4b
Use hook state functionality on stop as well
NelsonVides Nov 26, 2022
6aaf0c4
Consider new data when hard_stopping after a hook
NelsonVides Dec 21, 2022
1aa04b7
Put routing and rerouting a buffer in the c2s module
NelsonVides Nov 29, 2022
963ec26
stream management can stop after handling its unique namespace
NelsonVides Nov 29, 2022
d265908
Fix flaky sm test
NelsonVides Dec 21, 2022
2247f50
Reuse prepared return type name for hooks in sm
NelsonVides Dec 15, 2022
ffaf612
Remove 4-tuple-routes from sm
NelsonVides Dec 15, 2022
ab5f93f
Define external c2s state type and use in sm
NelsonVides Dec 15, 2022
0a05485
Define a small previd helper for sm
NelsonVides Dec 15, 2022
ddcd1a9
Refactor helpers for opening and closing c2s sessions
NelsonVides Dec 21, 2022
4bac10c
Refactor Stream Management
NelsonVides Dec 21, 2022
f4a8942
Plug stream_management into presend and create handle_flush
NelsonVides Dec 23, 2022
4fe77d9
SM flaky test: give time for the backend to remove the value
NelsonVides Dec 23, 2022
881d827
Reimplement CSI using the new c2s framework
NelsonVides Nov 29, 2022
f7cd5c1
Fix naming variables and type priority
NelsonVides Jan 2, 2023
a356ac5
Test that it returns an error if invalid CSI request
NelsonVides Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
{suites, "tests", vcard_SUITE}.
{suites, "tests", vcard_simple_SUITE}.
{suites, "tests", websockets_SUITE}.
% {suites, "tests", xep_0352_csi_SUITE}.
{suites, "tests", xep_0352_csi_SUITE}.
{suites, "tests", service_domain_db_SUITE}.
{skip_cases, "tests", service_domain_db_SUITE,
[rest_delete_domain_cleans_data_from_mam], "this test tries to use presences"}.
Expand Down
2 changes: 1 addition & 1 deletion big_tests/dynamic_domains.spec
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
{suites, "tests", vcard_SUITE}.
{suites, "tests", vcard_simple_SUITE}.
{suites, "tests", websockets_SUITE}.
% {suites, "tests", xep_0352_csi_SUITE}.
{suites, "tests", xep_0352_csi_SUITE}.
{suites, "tests", domain_removal_SUITE}.
{suites, "tests", local_iq_SUITE}.
% {suites, "tests", tcp_listener_SUITE}.
Expand Down
38 changes: 22 additions & 16 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
-define(LONG_TIMEOUT, 3600).
-define(SHORT_TIMEOUT, 1).
-define(SMALL_SM_BUFFER, 3).
-define(EXT_C2S_STATE(S), {external_state, S}).

%%--------------------------------------------------------------------
%% Suite configuration
Expand Down Expand Up @@ -508,7 +509,7 @@ preserve_order(Config) ->
%% kill alice connection
escalus_connection:kill(Alice),
C2SPid = mongoose_helper:get_session_pid(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),

escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)),
escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)),
Expand Down Expand Up @@ -575,7 +576,7 @@ resend_unacked_after_resume_timeout(Config) ->

%% ensure there is no session
C2SPid = mongoose_helper:get_session_pid(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),

%% alice come back and receives unacked message
NewAlice = connect_spec(AliceSpec, session),
Expand Down Expand Up @@ -640,7 +641,7 @@ resume_session_state_send_message_generic(Config, AckInitialPresence) ->
%% kill alice connection
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

%% send some messages and check if c2s can handle it
Expand Down Expand Up @@ -688,7 +689,7 @@ resume_session_state_stop_c2s(Config) ->
% session should be alive
sm_helper:assert_alive_resources(Alice, 1),
rpc(mim(), mongoose_c2s, stop, [C2SPid, normal]),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),
%% suspend the process to ensure that Alice has enough time to reconnect,
%% before resumption timeout occurs.
ok = rpc(mim(), sys, suspend, [C2SPid]),
Expand Down Expand Up @@ -719,7 +720,7 @@ wait_for_resumption(Config) ->
Bob = connect_fresh(Config, bob, session),
Texts = three_texts(),
{C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session).
wait_until_resume_session(C2SPid).

unacknowledged_message_hook_filter(Config) ->
FilterText = <<"filter">>,
Expand All @@ -737,7 +738,7 @@ unacknowledged_message_hook_filter(Config) ->
%% kill alice connection
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),
%% ensure second C2S is registered so all the messages are bounced properly
NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual),
Expand Down Expand Up @@ -825,7 +826,7 @@ unacknowledged_message_hook_common(RestartConnectionFN, Config) ->
%% kill alice connection
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)),
Expand All @@ -851,7 +852,7 @@ unacknowledged_message_hook_common(RestartConnectionFN, Config) ->

NewC2SPid = mongoose_helper:get_session_pid(NewAlice),
escalus_connection:kill(NewAlice),
mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session),
wait_until_resume_session(NewC2SPid),

escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
Expand Down Expand Up @@ -879,17 +880,19 @@ resume_session(Config) ->
resume_session_with_wrong_h_does_not_leak_sessions(Config) ->
AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
Messages = three_texts(),
HostType = host_type(),
escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
{_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages),
%% Resume the session.
Alice = connect_spec(AliceSpec, auth, manual),
Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30),
escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed),

[] = sm_helper:get_user_present_resources(Alice),
HostType = host_type(),
{error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID),
escalus_connection:wait_for_close(Alice, timer:seconds(5))
escalus_connection:wait_for_close(Alice, timer:seconds(5)),
Fun = fun() ->
[] = sm_helper:get_user_present_resources(Alice),
sm_helper:get_sid_by_stream_id(HostType, SMID)
end,
mongoose_helper:wait_until(Fun, {error, smid_not_found}, #{name => smid_is_cleaned})
end).

resume_session_with_wrong_sid_returns_item_not_found(Config) ->
Expand Down Expand Up @@ -929,7 +932,7 @@ resume_session_kills_old_C2S_gracefully(Config) ->
escalus_client:kill_connection(Config, Alice),

%% Ensure the c2s process is waiting for resumption.
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),

%% Resume the session.
NewAlice = connect_resume(Alice, 1),
Expand All @@ -947,7 +950,7 @@ buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) ->
sm_helper:wait_for_messages(Alice, Texts),
%% Alice's connection is violently terminated.
escalus_client:kill_connection(Config, Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),
SMID = sm_helper:client_to_smid(Alice),
{C2SPid, SMID}.

Expand Down Expand Up @@ -1080,7 +1083,7 @@ messages_are_properly_flushed_during_resumption(Config) ->
escalus_client:kill_connection(Config, Alice),
%% The receiver process would stop now
C2SPid = mongoose_helper:get_session_pid(Alice),
mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
wait_until_resume_session(C2SPid),

sm_helper:wait_for_queue_length(C2SPid, 0),
ok = rpc(mim(), sys, suspend, [C2SPid]),
Expand Down Expand Up @@ -1266,6 +1269,9 @@ wait_for_session(JID, Retries, SleepTime) ->
ok
end.

wait_until_resume_session(C2SPid) ->
mongoose_helper:wait_for_c2s_state_name(C2SPid, ?EXT_C2S_STATE(resume_session)).

maybe_ack_initial_presence(Alice, ack) ->
ack_initial_presence(Alice);
maybe_ack_initial_presence(_Alice, no_ack) ->
Expand Down
122 changes: 51 additions & 71 deletions big_tests/tests/xep_0352_csi_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ all() ->


groups() ->
[{basic, [parallel, shuffle], all_tests()}].
[{basic, [parallel], all_tests()}].

all_tests() ->
[
server_announces_csi,
alice_is_inactive_and_no_stanza_arrived,
inactive_twice_does_not_reset_buffer,
alice_gets_msgs_after_activate,
alice_gets_msgs_after_activate_in_order,
alice_gets_message_after_buffer_overflow,
Expand Down Expand Up @@ -60,20 +61,15 @@ end_per_testcase(CaseName, Config) ->
server_announces_csi(Config) ->
NewConfig = escalus_fresh:create_users(Config, [{alice, 1}]),
Spec = escalus_users:get_userspec(NewConfig, alice),
Steps = [start_stream,
stream_features,
maybe_use_ssl,
authenticate,
bind,
session],
Steps = [start_stream, stream_features, maybe_use_ssl, authenticate, bind, session],
{ok, _Client, Features} = escalus_connection:start(Spec, Steps),
true = proplists:get_value(client_state_indication, Features).

alice_is_inactive_and_no_stanza_arrived(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
given_client_is_inactive_and_messages_sent(Alice, Bob, 1),

escalus_assert:has_no_stanzas(Alice)
given_client_is_inactive_and_no_messages_arrive(Alice),
given_messages_are_sent(Alice, Bob, 1),
then_client_does_not_receive_any_message(Alice)
end).

alice_gets_msgs_after_activate(Config) ->
Expand All @@ -84,12 +80,18 @@ alice_gets_msgs_after_activate_in_order(Config) ->

alice_gets_msgs_after_activate(Config, N) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
%%Given
Msgs = given_client_is_inactive_and_messages_sent(Alice, Bob, N),

%%When client becomes active again
escalus:send(Alice, csi_stanza(<<"active">>)),
given_client_is_inactive_and_no_messages_arrive(Alice),
Msgs = given_messages_are_sent(Alice, Bob, N),
given_client_is_active(Alice),
then_client_receives_message(Alice, Msgs)
end).

inactive_twice_does_not_reset_buffer(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
given_client_is_inactive_and_no_messages_arrive(Alice),
Msgs = given_messages_are_sent(Alice, Bob, 2),
given_client_is_inactive_and_no_messages_arrive(Alice),
given_client_is_active(Alice),
then_client_receives_message(Alice, Msgs)
end).

Expand All @@ -102,62 +104,47 @@ alice_gets_buffered_messages_after_reconnection_with_sm(Config) ->
Alice = Alice0#client{jid = JID},
{ok, Bob, _} = escalus_connection:start(BobSpec),

given_client_is_inactive(Alice),
given_client_is_inactive_and_no_messages_arrive(Alice),

MsgsToAlice = given_client_is_inactive_and_messages_sent(Alice, Bob, 5),
MsgsToAlice = given_messages_are_sent(Alice, Bob, 5),

%% then Alice disconnects

escalus_connection:kill(Alice),

{ok, Alice2, _} = escalus_connection:start(AliceSpec),

escalus_connection:send(Alice2, escalus_stanza:presence(<<"available">>)),
ConnSteps = [start_stream, stream_features, authenticate, bind, session],
{ok, Alice2, _} = escalus_connection:start(AliceSpec, ConnSteps),

then_client_receives_message(Alice2, MsgsToAlice),

ok.
escalus_connection:stop(Alice2),
escalus_connection:stop(Bob).

alice_gets_buffered_messages_after_stream_resumption(Config) ->
ConnSteps = [start_stream,
stream_features,
authenticate,
bind,
session,
stream_resumption],
ConnSteps = [start_stream, stream_features, authenticate, bind, session, stream_resumption],
NewConfig = escalus_fresh:create_users(Config, [{alice, 1}, {bob, 1}]),
AliceSpec = escalus_users:get_userspec(NewConfig, alice),
BobSpec = escalus_users:get_userspec(NewConfig, bob),
{ok, Alice0 = #client{props = AliceProps}, _} = escalus_connection:start(AliceSpec,
ConnSteps),
{ok, Alice0 = #client{props = AliceProps}, _} = escalus_connection:start(AliceSpec, ConnSteps),
JID = make_jid_from_spec(AliceProps),
Alice = Alice0#client{jid = JID},

escalus_connection:send(Alice, escalus_stanza:presence(<<"available">>)),
escalus:wait_for_stanza(Alice),
{ok, Bob, _} = escalus_connection:start(BobSpec),

given_client_is_inactive(Alice),

MsgsToAlice = given_client_is_inactive_and_messages_sent(Alice, Bob, 5),

%% then Alice disconnects
given_client_is_inactive_and_no_messages_arrive(Alice),
MsgsToAlice = given_messages_are_sent(Alice, Bob, 5),
then_client_does_not_receive_any_message(Alice),

%% then Alice loses connection and resumes it
escalus_connection:kill(Alice),

SMID = proplists:get_value(smid, AliceProps),
ResumeSession = [start_stream,
stream_features,
authenticate,
mk_resume_stream(SMID, 1)],

ResumeSession = [start_stream, stream_features, authenticate, mk_resume_stream(SMID, 1)],
{ok, Alice2, _} = escalus_connection:start(AliceSpec, ResumeSession),

escalus_connection:send(Alice2, escalus_stanza:presence(<<"available">>)),

then_client_receives_message(Alice2, MsgsToAlice),

ok.
escalus_connection:stop(Alice2),
escalus_connection:stop(Bob).

make_jid_from_spec(AliceProps) ->
AliceUsername = proplists:get_value(username, AliceProps),
Expand All @@ -174,73 +161,66 @@ mk_resume_stream(SMID, PrevH) ->

alice_gets_message_after_buffer_overflow(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
Msgs = given_client_is_inactive_and_messages_sent(Alice, Bob, ?CSI_BUFFER_MAX+5),

given_client_is_inactive_and_no_messages_arrive(Alice),
Msgs = given_messages_are_sent(Alice, Bob, ?CSI_BUFFER_MAX + 5),
{Flushed, Awaiting} = lists:split(?CSI_BUFFER_MAX+1, Msgs),

then_client_receives_message(Alice, Flushed),
%% and no other stanza
escalus_assert:has_no_stanzas(Alice),
then_client_does_not_receive_any_message(Alice),
%% Alice activates
escalus:send(Alice, csi_stanza(<<"active">>)),
given_client_is_active(Alice),
%% ands gets remaining stanzas
then_client_receives_message(Alice, Awaiting)
end).

bob_gets_msgs_from_inactive_alice(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
given_client_is_inactive_but_sends_messages(Alice, Bob, 1),

escalus:assert(is_chat_message, escalus:wait_for_stanza(Bob))
end).

alice_is_inactive_but_sends_sm_req_and_recives_ack(Config) ->
escalus:fresh_story(Config, [{alice,1}], fun(Alice) ->
given_client_is_inactive(Alice),

given_client_is_inactive_and_no_messages_arrive(Alice),
escalus:send(Alice, escalus_stanza:sm_request()),

escalus:assert(is_sm_ack, escalus:wait_for_stanza(Alice))

end).

given_client_is_inactive_but_sends_messages(Alice, Bob, N) ->
%%Given
MsgsToAlice = given_client_is_inactive_and_messages_sent(Alice, Bob, N),

given_client_is_inactive_and_no_messages_arrive(Alice),
MsgsToAlice = given_messages_are_sent(Alice, Bob, N),
MsgsToBob = gen_msgs(<<"Hi, Bob">>, N),
send_msgs(Alice, Bob, MsgsToBob),
timer:sleep(1),
{MsgsToAlice, MsgsToBob}.

then_client_does_not_receive_any_message(Alice) ->
[] = escalus:wait_for_stanzas(Alice, 1, 100),
escalus_assert:has_no_stanzas(Alice).

then_client_receives_message(Alice, Msgs) ->
[escalus:assert(is_chat_message, [Msg], escalus:wait_for_stanza(Alice)) ||
Msg <- Msgs].


given_client_is_inactive_and_messages_sent(Alice, Bob, N) ->
%%Given
given_client_is_inactive(Alice),

timer:sleep(1000),

%%When
given_messages_are_sent(Alice, Bob, N) ->
Msgs = gen_msgs(<<"Hi, Alice">>, N),
send_msgs(Bob, Alice, Msgs),
timer:sleep(timer:seconds(1)),
Msgs.

send_msgs(From, To, Msgs) ->
[escalus:send(From, escalus_stanza:chat_to(To, Msg)) ||
Msg <- Msgs].

[escalus:send(From, escalus_stanza:chat_to(To, Msg)) || Msg <- Msgs].

gen_msgs(Prefix, N) ->
[<<Prefix/binary, (integer_to_binary(I))/binary>> || I <- lists:seq(1, N)].
[<<Prefix/binary, ": ", (integer_to_binary(I))/binary>> || I <- lists:seq(1, N)].

given_client_is_active(Alice) ->
escalus:send(Alice, csi_stanza(<<"active">>)).

given_client_is_inactive(Alice) ->
escalus:send(Alice, csi_stanza(<<"inactive">>)).
given_client_is_inactive_and_no_messages_arrive(Alice) ->
escalus:send(Alice, csi_stanza(<<"inactive">>)),
then_client_does_not_receive_any_message(Alice).

csi_stanza(Name) ->
#xmlel{name = Name,
Expand Down
Loading