Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument mod_mam #4224

Merged
merged 9 commits into from
Mar 20, 2024
Merged
35 changes: 35 additions & 0 deletions big_tests/tests/instrument_event_table.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
%% @doc This module is injected to the mim node by instrument_helper.
%% It collects events in an ETS table.

-module(instrument_event_table).

-export([set_up/1, tear_down/0, get_events/2]).

set_up(DeclaredEvents) ->
ets_helper:new(?MODULE, [bag]), % repeating measurements are stored only once
meck:new(mongoose_instrument_log, [no_link, passthrough]),
meck:expect(mongoose_instrument_log, handle_event,
Comment on lines +10 to +11
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... There should be a better way than meck here. If we're already rpc-inserting a module here, and we're creating an ets table in the remote node, we can just rpc-insert a module that implements the mongoose_instrument behaviour already and does the ets inserts on its own, entirely bypassing meck.

Copy link
Member Author

@chrzaszcz chrzaszcz Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many ways without meck, and I tried the following:

  • Implementing mongoose_instrument_* is not easy, because all instrumentation is set up on startup. You would need to restart mongooseim.
  • Adding a special list of global handlers. This adds extra code executed for each event, that is only for tests, so I don't like it.
  • Log handler. This required a lot of work to silence the debug logs by reconfiguring existing handlers, and looked really messy. But actually a debug handler is what I would do if I had to get rid of meck.

I implemented all three of them, and the current one is the fourth attempt. I really don't see a cleaner solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunate. Points 2 and 3 aren't good at all from my point of view, so I'd discard them before even trying. Point 1 is the one that sounds best to me, wonder if there's a way to rework how handlers are initialised so that they can be injected dynamically without restarting MIM. I'll make a note for this one, it can be in a separate PR, it's well encapsulated in the instrumentation helper modules.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add this functionality, because it's not needed during normal operation, and I didn't want something just for tests. Also, we want to avoid the situation where configuration (mongoose_config) is different from actual setup (an ad-hoc added handler). So it would be something like dynamic_modules, but for handlers. It's doable, but do we really need it?

fun(Event, Labels, _Config, Measurements) ->
case lists:member({Event, Labels}, DeclaredEvents) of
true -> ets:insert(?MODULE, {{Event, Labels}, Measurements});
false -> ok
end,
meck:passthrough()
end).

tear_down() ->
meck:unload(mongoose_instrument_log),
Logged = all_keys(?MODULE),
ets_helper:delete(?MODULE),
{ok, Logged}.

get_events(EventName, Labels) ->
ets:lookup(?MODULE, {EventName, Labels}).

all_keys(Tab) ->
all_keys(Tab, ets:first(Tab), []).

all_keys(_Tab, '$end_of_table', Acc) ->
lists:sort(Acc);
all_keys(Tab, Key, Acc) ->
all_keys(Tab, ets:next(Tab, Key), [Key|Acc]).
85 changes: 85 additions & 0 deletions big_tests/tests/instrument_helper.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
%% @doc A helper module for checking instrumentation events in tests.
%% For now it always uses mim(). We can extend it to other nodes when needed.

-module(instrument_helper).

-export([declared_events/1, start/1, stop/0, assert/3]).

-import(distributed_helper, [rpc/4, mim/0]).

-include_lib("eunit/include/eunit.hrl").

-define(STATUS_TABLE, instrument_event_status_table).

-type event_name() :: atom().
-type labels() :: #{atom() => term()}.
-type measurements() :: #{atom() => term()}.

%% API

%% @doc Helper to get `DeclaredEvents' needed by `start/1'
-spec declared_events([module()] | module()) -> [{event_name(), labels()}].
declared_events(Modules) when is_list(Modules) ->
lists:flatmap(fun declared_events/1, Modules);
declared_events(Module) ->
Specs = rpc(mim(), Module, instrumentation, [domain_helper:host_type()]),
[{Event, Labels} || {Event, Labels, _Config} <- Specs].

%% @doc Only `DeclaredEvents' will be logged, and can be tested with `assert/3'
-spec start([{event_name(), labels()} | module()]) -> ok.
start(DeclaredEvents) ->
mongoose_helper:inject_module(ets_helper),
mongoose_helper:inject_module(instrument_event_table),
ets_helper:new(?STATUS_TABLE),
[ets:insert(?STATUS_TABLE, {Event, untested}) || Event <- DeclaredEvents],
ok = rpc(mim(), instrument_event_table, set_up, [DeclaredEvents]).

-spec stop() -> ok.
stop() ->
#{tested := Tested, untested := Untested} = classify_events(),
ets_helper:delete(?STATUS_TABLE),
{ok, Logged} = rpc(mim(), instrument_event_table, tear_down, []),
ct:log("Tested instrumentation events:~n ~p", [lists:sort(Tested)]),
verify_unlogged(Untested -- Logged),
verify_logged_but_untested(Logged -- Tested).

%% @doc `CheckF' can return a boolean or fail with `function_clause', which means `false'.
%% This is for convenience - you only have to code one clause.
-spec assert(event_name(), labels(), fun((measurements()) -> boolean())) -> ok.
assert(EventName, Labels, CheckF) ->
Events = rpc(mim(), instrument_event_table, get_events, [EventName, Labels]),
case lists:filter(fun({_, Measurements}) ->
try CheckF(Measurements) catch error:function_clause -> false end
end, Events) of
[] ->
ct:log("All ~p events with labels ~p:~n~p", [EventName, Labels, Events]),
ct:fail("No instrumentation events matched");
Filtered ->
ct:log("Matching events: ~p", [Filtered]),
event_tested(EventName, Labels)
end.

%% Internal functions

%% Don't fail if some events are unlogged, because we don't have full test coverage (yet)
verify_unlogged([]) -> ok;
verify_unlogged(Events) ->
ct:log("Instrumentation events that were not logged - functionality not covered by tests:~n~p",
[lists:sort(Events)]).

%% Fail if any events were logged, but there were no assertions for them
verify_logged_but_untested([]) -> ok;
verify_logged_but_untested(Events) ->
ct:fail("Instrumentation events that were logged, but not tested:~n~p~n"
"You need to test them with ~p:assert/3",
[lists:sort(Events), ?MODULE]).

event_tested(EventName, Labels) ->
ets:insert(?STATUS_TABLE, {{EventName, Labels}, tested}),
ok.

classify_events() ->
ets:foldl(fun classify_event/2, #{tested => [], untested => []}, ?STATUS_TABLE).

classify_event({Event, Status}, M) ->
M#{Status => [Event | maps:get(Status, M)]}.
122 changes: 58 additions & 64 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@
muc_light_host/0,
host_type/0,
config_opts/1,
stanza_metadata_request/0
stanza_metadata_request/0,
assert_archive_message_event/2,
assert_lookup_event/2,
assert_flushed_event_if_async/2,
assert_dropped_iq_event/2,
assert_event_with_jid/2
]).

-import(muc_light_helper,
Expand Down Expand Up @@ -278,13 +283,13 @@ chat_markers_cases() ->
dont_archive_chat_markers].

mam_metrics_cases() ->
[metric_incremented_on_archive_request,
metric_incremented_when_store_message].
[metric_incremented_when_store_message].

mam_cases() ->
[mam_service_discovery,
mam_service_discovery_to_client_bare_jid,
mam_service_discovery_to_different_client_bare_jid_results_in_error,
archive_is_instrumented,
easy_archive_request,
easy_archive_request_for_the_receiver,
message_sent_to_yourself,
Expand Down Expand Up @@ -335,9 +340,7 @@ muc_text_search_cases() ->

archived_cases() ->
[archived,
filter_forwarded,
metrics_incremented_for_async_pools
].
filter_forwarded].

stanzaid_cases() ->
[message_with_stanzaid,
Expand All @@ -359,7 +362,8 @@ muc_cases() ->
[muc_service_discovery | muc_cases_with_room()].

muc_cases_with_room() ->
[muc_archive_request,
[muc_archive_is_instrumented,
muc_archive_request,
muc_multiple_devices,
muc_protected_message,
muc_deny_protected_room_access,
Expand Down Expand Up @@ -489,12 +493,19 @@ suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

init_per_suite(Config) ->
instrument_helper:start(instrument_helper:declared_events(instrumented_modules())),
muc_helper:load_muc(),
mam_helper:prepare_for_suite(
increase_limits(
delete_users([{escalus_user_db, {module, escalus_ejabberd}}
| escalus:init_per_suite(Config)]))).

instrumented_modules() ->
case mongoose_helper:is_rdbms_enabled(host_type()) of
true -> [mod_mam_rdbms_arch_async, mod_mam_muc_rdbms_arch_async];
false -> []
end ++ [mod_mam_pm, mod_mam_muc].

end_per_suite(Config) ->
muc_helper:unload_muc(),
%% Next function creates a lot of sessions...
Expand All @@ -503,7 +514,8 @@ end_per_suite(Config) ->
mongoose_helper:kick_everyone(),
%% so we don't have sessions anymore and other tests will not fail
mongoose_helper:restore_config(Config),
escalus:end_per_suite(Config).
escalus:end_per_suite(Config),
instrument_helper:stop().

user_names() ->
[alice, bob, kate, carol].
Expand Down Expand Up @@ -709,9 +721,6 @@ init_steps() ->
[fun init_users/2, fun init_archive/2, fun start_room/2, fun init_metrics/2,
fun escalus:init_per_testcase/2].

maybe_skip(metrics_incremented_for_async_pools, Config) ->
skip_if(?config(configuration, Config) =/= rdbms_async_pool,
"Not an async-pool test");
maybe_skip(C, Config) when C =:= retract_message;
C =:= retract_wrong_message;
C =:= ignore_bad_retraction;
Expand Down Expand Up @@ -787,9 +796,6 @@ fresh_users(C) ->
false -> []
end.

init_archive(C, Config) when C =:= metrics_incremented_for_async_pools;
C =:= metric_incremented_when_store_message ->
clean_archives(Config);
init_archive(C, Config) when ?requires_pm_archive(C) ->
bootstrap_archive(Config);
init_archive(C, Config) when ?requires_muc_archive(C) ->
Expand Down Expand Up @@ -821,15 +827,6 @@ init_metrics(metric_incremented_when_store_message, ConfigIn) ->
_ ->
ConfigIn
end;
init_metrics(muc_archive_request, Config) ->
%% Check that metric is incremented on MUC flushed
case ?config(configuration, Config) of
rdbms_async_pool ->
MongooseMetrics = [{['_', 'modMucMamFlushed'], changed}],
[{mongoose_metrics, MongooseMetrics} | Config];
_ ->
Config
end;
init_metrics(_CaseName, Config) ->
Config.

Expand Down Expand Up @@ -1055,6 +1052,19 @@ same_stanza_id(Config) ->
end,
escalus_fresh:story(Config, [{alice, 1}, {bob, 1}], F).

archive_is_instrumented(Config) ->
F = fun(Alice, Bob) ->
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
escalus:wait_for_stanza(Bob),
assert_archive_message_event(mod_mam_pm_archive_message, escalus_utils:get_jid(Alice)),
mam_helper:wait_for_archive_size(Alice, 1),
assert_flushed_event_if_async(mod_mam_pm_flushed, Config),
{S, U} = {escalus_utils:get_server(Alice), escalus_utils:get_username(Alice)},
mam_helper:delete_archive(S, U),
assert_event_with_jid(mod_mam_pm_remove_archive, escalus_utils:get_short_jid(Alice))
end,
escalus_fresh:story(Config, [{alice, 1}, {bob, 1}], F).

%% Querying the archive for messages
easy_archive_request(Config) ->
P = ?config(props, Config),
Expand All @@ -1070,6 +1080,7 @@ easy_archive_request(Config) ->
mam_helper:wait_for_archive_size(Alice, 1),
escalus:send(Alice, stanza_archive_request(P, <<"q1">>)),
Res = wait_archive_respond(Alice),
assert_lookup_event(mod_mam_pm_lookup, escalus_utils:get_jid(Alice)),
assert_respond_size(1, Res),
assert_respond_query_id(P, <<"q1">>, parse_result_iq(Res)),
ok
Expand Down Expand Up @@ -1835,7 +1846,6 @@ archived(Config) ->

%% Bob receives a message.
Msg = escalus:wait_for_stanza(Bob),
try
StanzaId = exml_query:subelement(Msg, <<"stanza-id">>),
%% JID of the archive (i.e. where the client would send queries to)
By = exml_query:attr(StanzaId, <<"by">>),
Expand All @@ -1851,10 +1861,6 @@ archived(Config) ->
#forwarded_message{result_id=ArcId} = parse_forwarded_message(ArcMsg),
?assert_equal(Id, ArcId),
ok
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also remove that ok if we're already removing dead code here 🤷🏽‍♂️

catch Class:Reason:StackTrace ->
ct:pal("Msg ~p", [Msg]),
erlang:raise(Class, Reason, StackTrace)
end
end,
%% Made fresh in init_per_testcase
escalus:story(Config, [{alice, 1}, {bob, 1}], F).
Expand Down Expand Up @@ -2084,6 +2090,26 @@ test_retract_muc_message(Config) ->
end,
escalus:story(Config, [{alice, 1}, {bob, 1}], F).

muc_archive_is_instrumented(Config) ->
F = fun(Alice, Bob) ->
Room = ?config(room, Config),
RoomAddr = room_address(Room),
Text = <<"Hi, Bob!">>,
escalus:send(Alice, stanza_muc_enter_room(Room, nick(Alice))),
escalus:send(Bob, stanza_muc_enter_room(Room, nick(Bob))),
escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Text)),

%% Bob received presences, the room's subject and the message.
escalus:wait_for_stanzas(Bob, 4),
assert_archive_message_event(mod_mam_muc_archive_message, RoomAddr),
maybe_wait_for_archive(Config),
assert_flushed_event_if_async(mod_mam_muc_flushed, Config),

mam_helper:delete_room_archive(muc_host(), ?config(room, Config)),
assert_event_with_jid(mod_mam_muc_remove_archive, RoomAddr)
end,
escalus:story(Config, [{alice, 1}, {bob, 1}], F).

muc_archive_request(Config) ->
P = ?config(props, Config),
F = fun(Alice, Bob) ->
Expand Down Expand Up @@ -2139,6 +2165,7 @@ muc_archive_request(Config) ->
?assert_equal(escalus_utils:jid_to_lower(RoomAddr), By),
?assert_equal_extra(true, has_x_user_element(ArcMsg),
[{forwarded_message, ArcMsg}]),
assert_lookup_event(mod_mam_muc_lookup, escalus_utils:get_jid(Bob)),
ok
end,
escalus:story(Config, [{alice, 1}, {bob, 1}], F).
Expand Down Expand Up @@ -2989,7 +3016,7 @@ server_returns_item_not_found_for_nonexistent_id(Config, RSM, StanzaID, Conditio
Res = escalus:wait_for_stanza(Alice),
escalus:assert(is_iq_error, [IQ], Res),
escalus:assert(is_error, Condition, Res),
ok
assert_dropped_iq_event(Config, escalus_utils:get_jid(Alice))
end,
parallel_story(Config, [{alice, 1}], F).

Expand Down Expand Up @@ -3182,9 +3209,11 @@ prefs_set_request(Config) ->
[<<"montague@montague.net">>],
mam_ns_binary())),
ReplySet = escalus:wait_for_stanza(Alice),
assert_event_with_jid(mod_mam_pm_set_prefs, escalus_utils:get_short_jid(Alice)),

escalus:send(Alice, stanza_prefs_get_request(mam_ns_binary())),
ReplyGet = escalus:wait_for_stanza(Alice),
assert_event_with_jid(mod_mam_pm_get_prefs, escalus_utils:get_short_jid(Alice)),

ResultIQ1 = parse_prefs_result_iq(ReplySet),
ResultIQ2 = parse_prefs_result_iq(ReplyGet),
Expand Down Expand Up @@ -3292,41 +3321,6 @@ discover_features(Config, Client, Service) ->
?assert_equal(message_retraction_is_enabled(Config),
escalus_pred:has_feature(retract_tombstone_ns(), Stanza)).

metric_incremented_on_archive_request(ConfigIn) ->
P = ?config(props, ConfigIn),
F = fun(Alice) ->
escalus:send(Alice, stanza_archive_request(P, <<"metric_q1">>)),
Res = wait_archive_respond(Alice),
assert_respond_size(0, Res),
assert_respond_query_id(P, <<"metric_q1">>, parse_result_iq(Res)),
ok
end,
HostType = domain_helper:host_type(mim),
HostTypePrefix = domain_helper:make_metrics_prefix(HostType),
MongooseMetrics = [{[HostTypePrefix, backends, mod_mam_pm, lookup], changed}],
Config = [{mongoose_metrics, MongooseMetrics} | ConfigIn],
escalus_fresh:story(Config, [{alice, 1}], F).

metrics_incremented_for_async_pools(Config) ->
OldValue = get_mongoose_async_metrics(),
archived(Config),
Validator = fun(NewValue) -> OldValue =/= NewValue end,
mongoose_helper:wait_until(
fun get_mongoose_async_metrics/0,
Validator, #{name => ?FUNCTION_NAME}).

get_mongoose_async_metrics() ->
HostType = domain_helper:host_type(mim),
HostTypePrefix = domain_helper:make_metrics_prefix(HostType),
#{batch_flushes => get_mongoose_async_metrics(HostTypePrefix, batch_flushes),
timed_flushes => get_mongoose_async_metrics(HostTypePrefix, timed_flushes)}.

get_mongoose_async_metrics(HostTypePrefix, MetricName) ->
Metric = [HostTypePrefix, mongoose_async_pools, pm_mam, MetricName],
{ok, Value} = rpc(mim(), mongoose_metrics, get_metric_value, [Metric]),
{value, Count} = lists:keyfind(value, 1, Value),
Count.

metric_incremented_when_store_message(Config) ->
archived(Config).

Expand Down
Loading