Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.3] acdc - PISTON-976: significant performance improvements for acdc status table #6570

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 120 additions & 57 deletions applications/acdc/src/acdc_agent_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
%%% @copyright (C) 2014-2020, 2600Hz
%%% @doc Collector of stats for agents
%%% @author James Aimonetti
%%% @author Daniel Finke
%%% @end
%%%-----------------------------------------------------------------------------
-module(acdc_agent_stats).
Expand All @@ -19,6 +20,7 @@
,handle_status_stat/2
,handle_status_query/2

,status_stat_key/3
,status_stat_id/3

,status_table_id/0
Expand All @@ -31,15 +33,19 @@
-include("acdc.hrl").
-include("acdc_stats.hrl").

%%------------------------------------------------------------------------------
%% @doc Status stat table configuration
%% @end
%%------------------------------------------------------------------------------
-spec status_table_id() -> atom().
status_table_id() -> 'acdc_stats_status'.

-spec status_key_pos() -> pos_integer().
status_key_pos() -> #status_stat.id.
status_key_pos() -> #status_stat.key.

-spec status_table_opts() -> kz_term:proplist().
status_table_opts() ->
['protected', 'named_table'
['ordered_set', 'protected', 'named_table'
,{'keypos', status_key_pos()}
].

Expand Down Expand Up @@ -210,16 +216,15 @@ handle_status_stat(JObj, Props) ->
'false'
end,

AgentId = kz_json:get_value(<<"Agent-ID">>, JObj),
AccountId = kz_json:get_ne_binary_value(<<"Account-ID">>, JObj),
AgentId = kz_json:get_ne_binary_value(<<"Agent-ID">>, JObj),
Timestamp = kz_json:get_integer_value(<<"Timestamp">>, JObj),

gen_listener:cast(props:get_value('server', Props)
,{'create_status'
,#status_stat{id=status_stat_id(AgentId, Timestamp, EventName)
,agent_id=AgentId
,account_id=kz_json:get_value(<<"Account-ID">>, JObj)
,#status_stat{key=status_stat_key(AccountId, AgentId, Timestamp)
,id=status_stat_id(AgentId, Timestamp, EventName)
,status=EventName
,timestamp=Timestamp
,callid=kz_json:get_value(<<"Call-ID">>, JObj)
,wait_time=acdc_stats_util:wait_time(EventName, JObj)
,pause_time=acdc_stats_util:pause_time(EventName, JObj)
Expand All @@ -230,6 +235,18 @@ handle_status_stat(JObj, Props) ->
}
).

%%------------------------------------------------------------------------------
%% @doc Status stat table key is in an order which can optimize the ordered_set
%% lookup if partially bound
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_key(kz_term:ne_binary(), kz_term:ne_binary(), pos_integer()) -> status_stat_key().
status_stat_key(AccountId, AgentId, Timestamp) ->
#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}.

-spec status_stat_id(kz_term:ne_binary(), pos_integer(), any()) -> kz_term:ne_binary().
status_stat_id(AgentId, Timestamp, _EventName) ->
<<AgentId/binary, "::", (kz_term:to_binary(Timestamp))/binary>>.
Expand All @@ -255,12 +272,19 @@ publish_query_errors(RespQ, MsgId, Errors) ->
lager:debug("responding with errors to req ~s: ~p", [MsgId, Errors]),
kapi_acdc_stats:publish_status_err(RespQ, API).

%%------------------------------------------------------------------------------
%% @doc Build a match spec for querying status stats
%% @end
%%------------------------------------------------------------------------------
-spec status_build_match_spec(kz_json:object()) ->
{'ok', ets:match_spec()} |
{'error', kz_json:object()}.
status_build_match_spec(JObj) ->
case kz_json:get_value(<<"Account-ID">>, JObj) of
'undefined' ->
{'error', kz_json:from_list([{<<"Account-ID">>, <<"missing but required">>}])};
AccountId ->
AcctMatch = {#status_stat{account_id='$1', _='_'}
AcctMatch = {#status_stat{key=#status_stat_key{account_id='$1'}, _='_'}
,[{'=:=', '$1', {'const', AccountId}}]
},
status_build_match_spec(JObj, AcctMatch)
Expand All @@ -277,7 +301,8 @@ status_build_match_spec(JObj, AcctMatch) ->

status_match_builder_fold(_, _, {'error', _Err}=E) -> E;
status_match_builder_fold(<<"Agent-ID">>, AgentId, {StatusStat, Contstraints}) ->
{StatusStat#status_stat{agent_id='$2'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{agent_id='$2'}}
,[{'=:=', '$2', {'const', AgentId}} | Contstraints]
};
status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints}) ->
Expand All @@ -297,7 +322,8 @@ status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints})
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'>=', '$3', N} | Contstraints]
}
catch
Expand All @@ -319,7 +345,8 @@ status_match_builder_fold(<<"End-Range">>, End, {StatusStat, Contstraints}) ->
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'=<', '$3', N} | Contstraints]
}
catch
Expand All @@ -332,54 +359,90 @@ status_match_builder_fold(<<"Status">>, Status, {StatusStat, Contstraints}) ->
};
status_match_builder_fold(_, _, Acc) -> Acc.

%%------------------------------------------------------------------------------
%% @doc Execute a status query
%% @end
%%------------------------------------------------------------------------------
-spec query_statuses(kz_term:ne_binary(), kz_term:ne_binary(), ets:match_spec(), pos_integer() | 'no_limit') -> 'ok'.
query_statuses(RespQ, MsgId, Match, Limit) ->
case ets:select(status_table_id(), Match) of
[] ->
lager:debug("no stats found, sorry ~s", [RespQ]),
Resp = [{<<"Error-Reason">>, <<"No agents found">>}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_err(RespQ, Resp);
Stats ->
QueryResults = lists:foldl(fun query_status_fold/2, kz_json:new(), Stats),
TrimmedResults = kz_json:map(fun(A, B) ->
{A, trim_query_statuses(B, Limit)}
end, QueryResults),

Resp = [{<<"Agents">>, TrimmedResults}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_resp(RespQ, Resp)
end.
Stats = ets:select_reverse(status_table_id(), Match),

-spec trim_query_statuses(kz_json:object(), pos_integer() | 'no_limit') -> kz_json:object().
trim_query_statuses(Statuses, Limit) ->
StatusProps = kz_json:to_proplist(Statuses),
SortedProps = lists:sort(fun({A, _}, {B, _}) ->
kz_term:to_integer(A) >= kz_term:to_integer(B)
end, StatusProps),
LimitedProps = case Limit of
'no_limit' -> SortedProps;
_ -> lists:sublist(SortedProps, Limit)
end,
kz_json:from_list(LimitedProps).

-spec query_status_fold(status_stat(), kz_json:object()) -> kz_json:object().
query_status_fold(#status_stat{agent_id=AgentId
,timestamp=T
}=Stat, Acc) ->
Doc = kz_doc:public_fields(status_stat_to_doc(Stat)),
kz_json:set_value([AgentId, kz_term:to_binary(T)], Doc, Acc).
case Stats of
[] -> lager:debug("no stats found (requester: ~s)", [RespQ]);
_ -> 'ok'
end,

Resp = [{<<"Agents">>, query_statuses_group_by_agent(Stats, Limit)}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_resp(RespQ, Resp).

%%------------------------------------------------------------------------------
%% @doc Group status stats by agent and return the whole map as a JObj. Each
%% agent grouping will contain at max "Limit" number of status stats
%% @end
%%------------------------------------------------------------------------------
query_statuses_group_by_agent(Stats, Limit) ->
query_statuses_fold(Stats, Limit, #{}).

query_statuses_fold([], _, StatsByAgent) -> kz_json:from_map(StatsByAgent);
query_statuses_fold([#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
}=Stat
| Stats
], Limit, StatsByAgent) ->
AgentStats = maps:get(AgentId, StatsByAgent, #{}),
StatsByAgent1 = case Limit =:= 'no_limit'
orelse maps:size(AgentStats) < Limit
of
'true' ->
TimestampBin = kz_term:to_binary(Timestamp),
Stat1 = status_stat_to_map(Stat),
AgentStats1 = AgentStats#{TimestampBin => Stat1},
StatsByAgent#{AgentId => AgentStats1};
'false' -> StatsByAgent
end,
query_statuses_fold(Stats, Limit, StatsByAgent1).

%%------------------------------------------------------------------------------
%% @doc Convert a status stat record to a map that can be efficiently parsed
%% into a JObj
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_to_map(status_stat()) -> map().
status_stat_to_map(#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,wait_time=WT
,pause_time=PT
,callid=CallId
,caller_id_name=CIDName
,caller_id_number=CIDNum
,queue_id=QueueId
}) ->
#{'agent_id' => AgentId
,'timestamp' => Timestamp
,'id' => Id
,'status' => Status
,'wait_time' => WT
,'pause_time' => PT
,'call_id' => CallId
,'caller_id_name' => CIDName
,'caller_id_number' => CIDNum
,'queue_id' => QueueId
}.

-spec status_stat_to_doc(status_stat()) -> kz_json:object().
status_stat_to_doc(#status_stat{id=Id
,agent_id=AgentId
,account_id=AccountId
status_stat_to_doc(#status_stat{key=#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,timestamp=Timestamp
,wait_time=WT
,pause_time=PT
,callid=CallId
Expand Down Expand Up @@ -418,7 +481,7 @@ archive_status_data(Srv, 'true') ->
archive_status_data(Srv, 'false') ->
kz_util:put_callid(<<"acdc_stats.status_archiver">>),
Past = kz_time:now_s() - ?ARCHIVE_WINDOW,
Match = [{#status_stat{timestamp='$1'
Match = [{#status_stat{key=#status_stat_key{timestamp='$1'}
,is_archived='$2'
,_='_'
}
Expand All @@ -438,14 +501,14 @@ maybe_archive_status_data(Srv, Match) ->
_ = [kz_datamgr:save_docs(acdc_stats_util:db_name(Acct), Docs)
|| {Acct, Docs} <- dict:to_list(ToSave)
],
_ = [gen_listener:cast(Srv, {'update_status', Id, [{#status_stat.is_archived, 'true'}]})
|| #status_stat{id=Id} <- Stats
_ = [gen_listener:cast(Srv, {'update_status', Id, Key, [{#status_stat.is_archived, 'true'}]})
|| #status_stat{id=Id, key=Key} <- Stats
],
'ok'
end.

-spec archive_status_fold(status_stat(), dict:dict()) -> dict:dict().
archive_status_fold(#status_stat{account_id=AccountId}=Stat, Acc) ->
archive_status_fold(#status_stat{key=#status_stat_key{account_id=AccountId}}=Stat, Acc) ->
Doc = status_stat_to_doc(Stat),
dict:update(AccountId, fun(L) -> [Doc | L] end, [Doc], Acc).

Expand Down
6 changes: 3 additions & 3 deletions applications/acdc/src/acdc_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,9 @@ handle_cast({'remove_call', [{M, P, _}]}, State) ->
andalso lager:debug("removed calls: ~p", [N]),
{'noreply', State};

handle_cast({'update_status', Id, Updates}, State) ->
handle_cast({'update_status', Id, Key, Updates}, State) ->
lager:debug("updating status stat ~s: ~p", [Id, Updates]),
ets:update_element(acdc_agent_stats:status_table_id(), Id, Updates),
ets:update_element(acdc_agent_stats:status_table_id(), Key, Updates),
{'noreply', State};
handle_cast({'remove_status', [{M, P, _}]}, State) ->
Match = [{M, P, ['true']}],
Expand Down Expand Up @@ -686,7 +686,7 @@ cleanup_data(Srv) ->
}],
gen_listener:cast(Srv, {'remove_call', CallMatch}),

StatusMatch = [{#status_stat{timestamp='$1', _='_'}
StatusMatch = [{#status_stat{key=#status_stat_key{timestamp='$1'}, _='_'}
,[{'=<', '$1', Past}]
,['$_']
}],
Expand Down
13 changes: 9 additions & 4 deletions applications/acdc/src/acdc_stats.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@
,<<"connecting">>, <<"connected">>
,<<"wrapup">>, <<"paused">>, <<"outbound">>
]).
-record(status_stat, {id :: kz_term:api_binary() | '_'
,agent_id :: kz_term:api_binary() | '$2' | '_'
,account_id :: kz_term:api_binary() | '$1' | '_'

%% This key optimizes lookups in the ordered_set ETS table
-record(status_stat_key, {account_id = '_' :: kz_term:ne_binary() | '$1' | '_'
,agent_id = '_' :: kz_term:ne_binary() | '$2' | '_'
,timestamp = '_' :: pos_integer() | '$1' | '$3' | '_'
}).
-type status_stat_key() :: #status_stat_key{}.
-record(status_stat, {key = '_' :: status_stat_key() | '_'
,id :: kz_term:api_binary() | '_'
,status :: kz_term:api_binary() | '$4' | '_'
,timestamp :: kz_term:api_pos_integer() | '$1' | '$3' | '$5' | '_'

,wait_time :: kz_term:api_integer() | '_'
,pause_time :: kz_term:api_integer() | '_'
Expand Down