-
Notifications
You must be signed in to change notification settings - Fork 428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
C2S/csi #3880
C2S/csi #3880
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
Codecov ReportBase: 73.11% // Head: 73.24% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## feature/mongoose_c2s #3880 +/- ##
========================================================
+ Coverage 73.11% 73.24% +0.13%
========================================================
Files 540 540
Lines 34110 34187 +77
========================================================
+ Hits 24939 25041 +102
+ Misses 9171 9146 -25
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job 👍, just got one small comment about the style.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This is an important part of the rework. The point is that now a session is ready to receive messages directly addressed to it immediately after binding a resource, so afterwards a request to enable stream management can have interlapping messages between the request and the response. And this is fine, as XEP-0198 requires: - _sending_ count to start after _sending_ the <enable> or <enabled> - _receiving_ count to start after _receiving_ the <enable> or <enabled> So the client will set its counter for received messages after receiving the <enabled>. So the question is then whether the order in which the stanzas are received is relevant: - If the client receives first the stream management <enabled> and then the rerouted stanzas, that means that the c2s received the <enable> and answered with <enabled> before receiving the rerouted stanzas, and therefore counted them for the sending counter. - If the client receives first the messages from the reroute and then the <enabled> stanza, that means that the c2s received first all the messages and delivered, before receiving the <enable> and initialising its counters, as the client will have its received stanzas at zero (starting when it received the <enabled>), the server will also have its sent stanzas at zero (starting when it sent the <enabled>). So in all cases the counts match.
Many tests have Alice sending the inactive stanza, immediately followed by Bob sending many messages. But here lies a race condition: a user message is implemented as the test-case Pid sending Erlang messages to the user's socket-managing Pids, which in turn send the actual TCP messages. But while the order in which the messages sent from the test-case Pid is ensured, the order in which the socket-managing Pids will be executed is not, and therefore it is possible that the first message from Bob will be delivered _before_ Alice's inactive request. So after sending an inactive request, we need to introduce a small wait, followed by a verification that Alice has not received any message. Still, a wait is not bulletproof, but it solves the problem sufficiently often. The issue is that verifying that Alice has processed the inactive request is non-trivial, as CSI describes no answer to it.
In the original implementation, stream_mgmt first checks on the very beginning of the route, if there is a sid conflict. Then _all_ the `user_receive_packet` hooks are executed, and only after that, buffering for CSI and stream_mgmt happens. But in the current implementation we've partitioned `user_receive_packet` into all four types of stanzas, which could potentially stop the handling, when stream_mgmt has already buffered the payload, therefore we're having a bug. So we add a "pre-send" event hook, where stream management will do the buffering, and a very early stream_mgmt `user_receive_packet` will do the sid conflict check. We also add a new kind of c2s statem event: {flush, Acc}. This will run handle_flush, which is the second part to handle_route, meaning, after all the `user_receive_*` handlers.
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good job 👍. I left some comments.
src/c2s/mongoose_c2s.erl
Outdated
@@ -172,13 +176,13 @@ handle_event(state_timeout, state_timeout_termination, _FsmState, StateData) -> | |||
handle_event(EventType, EventContent, C2SState, StateData) -> | |||
handle_foreign_event(StateData, C2SState, EventType, EventContent). | |||
|
|||
-spec terminate(term(), c2s_state(), c2s_data()) -> term(). | |||
-spec terminate(term(), c2s_state(term()), c2s_data()) -> term(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that we still keep c2s_state()
in the most specs which may be misleading as CS2State
can include {external_state, ...}
as well. I think it would be nicer to have it defined like this:
-type c2s_state(State) :: connect
% ...
| {external_state, State}.
-type c2s_state() :: c2s_state(term()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's revert this line.
src/c2s/mongoose_c2s.erl
Outdated
|
||
-spec handle_flush(c2s_data(), c2s_state(), mongoose_acc:t()) -> fsm_res(). | ||
handle_flush(StateData = #c2s_data{host_type = HostType}, C2SState, Acc) -> | ||
HookParams = hook_arg(StateData, C2SState, info, Acc, flust), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HookParams = hook_arg(StateData, C2SState, info, Acc, flust), | |
HookParams = hook_arg(StateData, C2SState, info, Acc, flush), |
Or we can even skip the last argument as there is no any specific reason to be provided:
HookParams = hook_arg(StateData, C2SState, info, Acc, flust), | |
HookParams = hook_arg(StateData, C2SState, info, Acc, undefined), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave it (typo corrected) just because it doesn't hurt and it might be used any other time in the future, dunno 🤔
send_element(StateData = #c2s_data{host_type = HostType}, Els, Acc) when is_list(Els) -> | ||
Res = send_xml(StateData, Els), | ||
do_send_element(StateData = #c2s_data{host_type = HostType}, #xmlel{} = El, Acc) -> | ||
Res = send_xml(StateData, El), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we send all elements in one batch. After this one, every new element would be sent separately. I remember that we had discussion about this and we agreed that sending in one batch is preferred way.
I'm wondering if we should keep this code as before with some adjustments:
send_element(StateData = #c2s_data{host_type = HostType}, Els, Acc) when is_list(Els) ->
Res = send_xml(StateData, Els),
Acc1 = mongoose_acc:set(c2s, send_result, Res, Acc),
lists:foldl(fun(El, Acc2) -> mongoose_hooks:xmpp_send_element(HostType, Acc2, El) end, Acc1, Els];
By this, we assume that send_xml/2
has a binary result - all elements are sent or none.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, about this, I remember we talked about it. But now I'm not that sure. The point was that calls to exml:to_iolist/1
and the TCP socket send, are theoretically faster if batching, and that's why I wanted to optimise for them. But on the other hand, it seems like handlers like csi, amp, metrics, and stream-mgmt, might want to deal with the granularity of single elements, and it seems to complicate code too much to have sometimes to deal with lists and sometimes with single elements, of sometimes #xmlel{}
and other times mongoose_acc:t()
. So the change here was to have everything dealing with single elements of mongoose_acc:t()
, and then the mongoose_c2s_acc helper transform this list of mongoose_acc:t()
to a list of gen_statem
actions, so that all code in c2s handles only one scenario.
So the old adage applies: first make it work, then make it beautiful, and only then if you really have to, make it fast. At the beginning I could force fast and pretty because I knew of few scenarios yet, but now with this thing becoming more complex... perhaps the minute optimisations can wait 🤷🏽
src/c2s/mongoose_c2s_acc.erl
Outdated
extract_flushes(Params = #{flush := Accs}) -> | ||
WithoutStop = maps:remove(flush, Params), | ||
NewAction = [{next_event, internal, {flush, Acc}} || Acc <- Accs ], | ||
Fun = fun(Actions) -> NewAction ++ Actions end, | ||
maps:update_with(actions, Fun, NewAction, WithoutStop); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract_flushes(Params = #{flush := Accs}) -> | |
WithoutStop = maps:remove(flush, Params), | |
NewAction = [{next_event, internal, {flush, Acc}} || Acc <- Accs ], | |
Fun = fun(Actions) -> NewAction ++ Actions end, | |
maps:update_with(actions, Fun, NewAction, WithoutStop); | |
extract_flushes(Params = #{flush := Accs}) -> | |
WithoutFlush = maps:remove(flush, Params), | |
NewAction = [{next_event, internal, {flush, Acc}} || Acc <- Accs], | |
Fun = fun(Actions) -> NewAction ++ Actions end, | |
maps:update_with(actions, Fun, NewAction, WithoutFlush); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or with maps:take/2
:
extract_flushes(Params = #{flush := Accs}) -> | |
WithoutStop = maps:remove(flush, Params), | |
NewAction = [{next_event, internal, {flush, Acc}} || Acc <- Accs ], | |
Fun = fun(Actions) -> NewAction ++ Actions end, | |
maps:update_with(actions, Fun, NewAction, WithoutStop); | |
extract_flushes(Params) -> | |
case maps:take(flush, Params) of | |
{Accs, Params1} -> | |
NewAction = [{next_event, internal, {flush, Acc}} || Acc <- Accs], | |
Fun = fun(Actions) -> NewAction ++ Actions end, | |
maps:update_with(actions, Fun, NewAction, Params2); | |
error -> | |
Params | |
end. |
maps:take/2
is defined as a BIF function so I assume it's fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good idea. Only it is a few more lines of code so I'd leave the first version, but I didn't think about maps:take/2
before 😃
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me, I added a few minor comments.
src/c2s/mongoose_c2s.erl
Outdated
@@ -172,13 +176,13 @@ handle_event(state_timeout, state_timeout_termination, _FsmState, StateData) -> | |||
handle_event(EventType, EventContent, C2SState, StateData) -> | |||
handle_foreign_event(StateData, C2SState, EventType, EventContent). | |||
|
|||
-spec terminate(term(), c2s_state(), c2s_data()) -> term(). | |||
-spec terminate(term(), c2s_state(term()), c2s_data()) -> term(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's revert this line.
small_tests_24 / small_tests / a356ac5 small_tests_25 / small_tests / a356ac5 ldap_mnesia_24 / ldap_mnesia / a356ac5 dynamic_domains_pgsql_mnesia_24 / pgsql_mnesia / a356ac5 ldap_mnesia_25 / ldap_mnesia / a356ac5 pgsql_mnesia_24 / pgsql_mnesia / a356ac5 internal_mnesia_25 / internal_mnesia / a356ac5 elasticsearch_and_cassandra_25 / elasticsearch_and_cassandra_mnesia / a356ac5 riak_mnesia_24 / riak_mnesia / a356ac5 dynamic_domains_mysql_redis_25 / mysql_redis / a356ac5 dynamic_domains_pgsql_mnesia_25 / pgsql_mnesia / a356ac5 dynamic_domains_mssql_mnesia_25 / odbc_mssql_mnesia / a356ac5 pgsql_mnesia_25 / pgsql_mnesia / a356ac5 mssql_mnesia_25 / odbc_mssql_mnesia / a356ac5 mysql_redis_25 / mysql_redis / a356ac5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍
Important in this PR took many changes to many modules, so some explanation below:
c2s
Route pipeline processing
In the old implementation, when receiving a packet through routing, c2s would, in strict order:
sid
conflictsuser_receive_packet
hookxmpp_send_element
with the result of the sendSo, steps 1, 3 and 4 are the ones that are not XMPP core related and need to be taken out of c2s. So now I've reordered as follows:
user_receive_packet
hook (and the more granularuser_receive_*
ones) <- stream management is an early subscriber for thesid
conflict verification.xmpp_presend_element
<- here CSI and stream_mgmt subscribe and can stop the actual socket deliveryxmpp_send_element
with the result of the sendThis, very importantly, solves a bug in the current implementation of stream_mgmt: if stream_mgmt subscribes to
user_receive_packet
, it will buffer stanzas that are made to be handled by the server and not delivered to the socket, like IQs addressed to a specific resource, for example those frommod_last
.Thus, we also introduce a new kind of internal event,
handle_flush
, which will run the received-packet logic from step 2, skipping step one. This is necessary for CSI, when packets are being flushed, to not go through theuser_receive_*
handlers and have modules like inbox processing them twice. The event is internal to avoid third processes sending erlang messages that would run an incomplete pipeline.State machine states
Before, c2s states where defined with strict names, where other modules, like stream management (and in the future mod_websockets), can take over the state machine and inject their own states, but still reuse functionality available from
mongoose_c2s
. In that case, the types defined for the state machine wouldn't match anymore. So we add a new state to thec2s_state()
type, called{external_state, term()}
, to identify states defined by other modules, which will in turn know what to do with them, separately from c2s. This was a suggestion from @kamilwaz 😉Stream management
So as described above, stream management now subscribes to
user_receive_packet
only forsid
conflict verification. Then, it subscribes toxmpp_presend_element
to do the buffering.Another optimisation, is the fact that on resumption, rerouting lost messages is strictly meant to be delivered to the process that resumed the session, with no new filtering applied. So we can bypass the complex routing logic, and directly send the erlang messages to the resuming process. So
mongoose_c2s
implements a newreroute_buffer_to_peer/3
helper, which takes a pid and sends all the messages.Also a flaky test (
resume_session_with_wrong_h_does_not_leak_sessions
) was fixed by introducing a wait, as smids are removed by requesting so to mnesia, asynchonously, so again we can have a race condition between mnesia and the verification.CSI
First of all one more test was introduced and many other were fixed, for details redirect to commits 8743faf and 8e21d57. For the implementation, see 881d827.