Skip to content

Commit

Permalink
PISTON-976: significant performance improvements for acdc status table (
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jun 4, 2020
1 parent 475c729 commit f22e119
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 64 deletions.
177 changes: 120 additions & 57 deletions applications/acdc/src/acdc_agent_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
%%% @copyright (C) 2014-2020, 2600Hz
%%% @doc Collector of stats for agents
%%% @author James Aimonetti
%%% @author Daniel Finke
%%% @end
%%%-----------------------------------------------------------------------------
-module(acdc_agent_stats).
Expand All @@ -19,6 +20,7 @@
,handle_status_stat/2
,handle_status_query/2

,status_stat_key/3
,status_stat_id/3

,status_table_id/0
Expand All @@ -31,15 +33,19 @@
-include("acdc.hrl").
-include("acdc_stats.hrl").

%%------------------------------------------------------------------------------
%% @doc Status stat table configuration
%% @end
%%------------------------------------------------------------------------------
-spec status_table_id() -> atom().
status_table_id() -> 'acdc_stats_status'.

-spec status_key_pos() -> pos_integer().
status_key_pos() -> #status_stat.id.
status_key_pos() -> #status_stat.key.

-spec status_table_opts() -> kz_term:proplist().
status_table_opts() ->
['protected', 'named_table'
['ordered_set', 'protected', 'named_table'
,{'keypos', status_key_pos()}
].

Expand Down Expand Up @@ -210,16 +216,15 @@ handle_status_stat(JObj, Props) ->
'false'
end,

AgentId = kz_json:get_value(<<"Agent-ID">>, JObj),
AccountId = kz_json:get_ne_binary_value(<<"Account-ID">>, JObj),
AgentId = kz_json:get_ne_binary_value(<<"Agent-ID">>, JObj),
Timestamp = kz_json:get_integer_value(<<"Timestamp">>, JObj),

gen_listener:cast(props:get_value('server', Props)
,{'create_status'
,#status_stat{id=status_stat_id(AgentId, Timestamp, EventName)
,agent_id=AgentId
,account_id=kz_json:get_value(<<"Account-ID">>, JObj)
,#status_stat{key=status_stat_key(AccountId, AgentId, Timestamp)
,id=status_stat_id(AgentId, Timestamp, EventName)
,status=EventName
,timestamp=Timestamp
,callid=kz_json:get_value(<<"Call-ID">>, JObj)
,wait_time=acdc_stats_util:wait_time(EventName, JObj)
,pause_time=acdc_stats_util:pause_time(EventName, JObj)
Expand All @@ -230,6 +235,18 @@ handle_status_stat(JObj, Props) ->
}
).

%%------------------------------------------------------------------------------
%% @doc Status stat table key is in an order which can optimize the ordered_set
%% lookup if partially bound
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_key(kz_term:ne_binary(), kz_term:ne_binary(), pos_integer()) -> status_stat_key().
status_stat_key(AccountId, AgentId, Timestamp) ->
#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}.

-spec status_stat_id(kz_term:ne_binary(), pos_integer(), any()) -> kz_term:ne_binary().
status_stat_id(AgentId, Timestamp, _EventName) ->
<<AgentId/binary, "::", (kz_term:to_binary(Timestamp))/binary>>.
Expand All @@ -255,12 +272,19 @@ publish_query_errors(RespQ, MsgId, Errors) ->
lager:debug("responding with errors to req ~s: ~p", [MsgId, Errors]),
kapi_acdc_stats:publish_status_err(RespQ, API).

%%------------------------------------------------------------------------------
%% @doc Build a match spec for querying status stats
%% @end
%%------------------------------------------------------------------------------
-spec status_build_match_spec(kz_json:object()) ->
{'ok', ets:match_spec()} |
{'error', kz_json:object()}.
status_build_match_spec(JObj) ->
case kz_json:get_value(<<"Account-ID">>, JObj) of
'undefined' ->
{'error', kz_json:from_list([{<<"Account-ID">>, <<"missing but required">>}])};
AccountId ->
AcctMatch = {#status_stat{account_id='$1', _='_'}
AcctMatch = {#status_stat{key=#status_stat_key{account_id='$1'}, _='_'}
,[{'=:=', '$1', {'const', AccountId}}]
},
status_build_match_spec(JObj, AcctMatch)
Expand All @@ -277,7 +301,8 @@ status_build_match_spec(JObj, AcctMatch) ->

status_match_builder_fold(_, _, {'error', _Err}=E) -> E;
status_match_builder_fold(<<"Agent-ID">>, AgentId, {StatusStat, Contstraints}) ->
{StatusStat#status_stat{agent_id='$2'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{agent_id='$2'}}
,[{'=:=', '$2', {'const', AgentId}} | Contstraints]
};
status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints}) ->
Expand All @@ -297,7 +322,8 @@ status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints})
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'>=', '$3', N} | Contstraints]
}
catch
Expand All @@ -319,7 +345,8 @@ status_match_builder_fold(<<"End-Range">>, End, {StatusStat, Contstraints}) ->
,{<<"Current-Timestamp">>, Now}
])};
N ->
{StatusStat#status_stat{timestamp='$3'}
Key = StatusStat#status_stat.key,
{StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}}
,[{'=<', '$3', N} | Contstraints]
}
catch
Expand All @@ -332,54 +359,90 @@ status_match_builder_fold(<<"Status">>, Status, {StatusStat, Contstraints}) ->
};
status_match_builder_fold(_, _, Acc) -> Acc.

%%------------------------------------------------------------------------------
%% @doc Execute a status query
%% @end
%%------------------------------------------------------------------------------
-spec query_statuses(kz_term:ne_binary(), kz_term:ne_binary(), ets:match_spec(), pos_integer() | 'no_limit') -> 'ok'.
query_statuses(RespQ, MsgId, Match, Limit) ->
case ets:select(status_table_id(), Match) of
[] ->
lager:debug("no stats found, sorry ~s", [RespQ]),
Resp = [{<<"Error-Reason">>, <<"No agents found">>}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_err(RespQ, Resp);
Stats ->
QueryResults = lists:foldl(fun query_status_fold/2, kz_json:new(), Stats),
TrimmedResults = kz_json:map(fun(A, B) ->
{A, trim_query_statuses(B, Limit)}
end, QueryResults),

Resp = [{<<"Agents">>, TrimmedResults}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_resp(RespQ, Resp)
end.
Stats = ets:select_reverse(status_table_id(), Match),

-spec trim_query_statuses(kz_json:object(), pos_integer() | 'no_limit') -> kz_json:object().
trim_query_statuses(Statuses, Limit) ->
StatusProps = kz_json:to_proplist(Statuses),
SortedProps = lists:sort(fun({A, _}, {B, _}) ->
kz_term:to_integer(A) >= kz_term:to_integer(B)
end, StatusProps),
LimitedProps = case Limit of
'no_limit' -> SortedProps;
_ -> lists:sublist(SortedProps, Limit)
end,
kz_json:from_list(LimitedProps).

-spec query_status_fold(status_stat(), kz_json:object()) -> kz_json:object().
query_status_fold(#status_stat{agent_id=AgentId
,timestamp=T
}=Stat, Acc) ->
Doc = kz_doc:public_fields(status_stat_to_doc(Stat)),
kz_json:set_value([AgentId, kz_term:to_binary(T)], Doc, Acc).
case Stats of
[] -> lager:debug("no stats found (requester: ~s)", [RespQ]);
_ -> 'ok'
end,

Resp = [{<<"Agents">>, query_statuses_group_by_agent(Stats, Limit)}
,{<<"Msg-ID">>, MsgId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapi_acdc_stats:publish_status_resp(RespQ, Resp).

%%------------------------------------------------------------------------------
%% @doc Group status stats by agent and return the whole map as a JObj. Each
%% agent grouping will contain at max "Limit" number of status stats
%% @end
%%------------------------------------------------------------------------------
query_statuses_group_by_agent(Stats, Limit) ->
query_statuses_fold(Stats, Limit, #{}).

query_statuses_fold([], _, StatsByAgent) -> kz_json:from_map(StatsByAgent);
query_statuses_fold([#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
}=Stat
| Stats
], Limit, StatsByAgent) ->
AgentStats = maps:get(AgentId, StatsByAgent, #{}),
StatsByAgent1 = case Limit =:= 'no_limit'
orelse maps:size(AgentStats) < Limit
of
'true' ->
TimestampBin = kz_term:to_binary(Timestamp),
Stat1 = status_stat_to_map(Stat),
AgentStats1 = AgentStats#{TimestampBin => Stat1},
StatsByAgent#{AgentId => AgentStats1};
'false' -> StatsByAgent
end,
query_statuses_fold(Stats, Limit, StatsByAgent1).

%%------------------------------------------------------------------------------
%% @doc Convert a status stat record to a map that can be efficiently parsed
%% into a JObj
%% @end
%%------------------------------------------------------------------------------
-spec status_stat_to_map(status_stat()) -> map().
status_stat_to_map(#status_stat{key=#status_stat_key{agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,wait_time=WT
,pause_time=PT
,callid=CallId
,caller_id_name=CIDName
,caller_id_number=CIDNum
,queue_id=QueueId
}) ->
#{'agent_id' => AgentId
,'timestamp' => Timestamp
,'id' => Id
,'status' => Status
,'wait_time' => WT
,'pause_time' => PT
,'call_id' => CallId
,'caller_id_name' => CIDName
,'caller_id_number' => CIDNum
,'queue_id' => QueueId
}.

-spec status_stat_to_doc(status_stat()) -> kz_json:object().
status_stat_to_doc(#status_stat{id=Id
,agent_id=AgentId
,account_id=AccountId
status_stat_to_doc(#status_stat{key=#status_stat_key{account_id=AccountId
,agent_id=AgentId
,timestamp=Timestamp
}
,id=Id
,status=Status
,timestamp=Timestamp
,wait_time=WT
,pause_time=PT
,callid=CallId
Expand Down Expand Up @@ -418,7 +481,7 @@ archive_status_data(Srv, 'true') ->
archive_status_data(Srv, 'false') ->
kz_util:put_callid(<<"acdc_stats.status_archiver">>),
Past = kz_time:now_s() - ?ARCHIVE_WINDOW,
Match = [{#status_stat{timestamp='$1'
Match = [{#status_stat{key=#status_stat_key{timestamp='$1'}
,is_archived='$2'
,_='_'
}
Expand All @@ -438,14 +501,14 @@ maybe_archive_status_data(Srv, Match) ->
_ = [kz_datamgr:save_docs(acdc_stats_util:db_name(Acct), Docs)
|| {Acct, Docs} <- dict:to_list(ToSave)
],
_ = [gen_listener:cast(Srv, {'update_status', Id, [{#status_stat.is_archived, 'true'}]})
|| #status_stat{id=Id} <- Stats
_ = [gen_listener:cast(Srv, {'update_status', Id, Key, [{#status_stat.is_archived, 'true'}]})
|| #status_stat{id=Id, key=Key} <- Stats
],
'ok'
end.

-spec archive_status_fold(status_stat(), dict:dict()) -> dict:dict().
archive_status_fold(#status_stat{account_id=AccountId}=Stat, Acc) ->
archive_status_fold(#status_stat{key=#status_stat_key{account_id=AccountId}}=Stat, Acc) ->
Doc = status_stat_to_doc(Stat),
dict:update(AccountId, fun(L) -> [Doc | L] end, [Doc], Acc).

Expand Down
6 changes: 3 additions & 3 deletions applications/acdc/src/acdc_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,9 @@ handle_cast({'remove_call', [{M, P, _}]}, State) ->
andalso lager:debug("removed calls: ~p", [N]),
{'noreply', State};

handle_cast({'update_status', Id, Updates}, State) ->
handle_cast({'update_status', Id, Key, Updates}, State) ->
lager:debug("updating status stat ~s: ~p", [Id, Updates]),
ets:update_element(acdc_agent_stats:status_table_id(), Id, Updates),
ets:update_element(acdc_agent_stats:status_table_id(), Key, Updates),
{'noreply', State};
handle_cast({'remove_status', [{M, P, _}]}, State) ->
Match = [{M, P, ['true']}],
Expand Down Expand Up @@ -686,7 +686,7 @@ cleanup_data(Srv) ->
}],
gen_listener:cast(Srv, {'remove_call', CallMatch}),

StatusMatch = [{#status_stat{timestamp='$1', _='_'}
StatusMatch = [{#status_stat{key=#status_stat_key{timestamp='$1'}, _='_'}
,[{'=<', '$1', Past}]
,['$_']
}],
Expand Down
13 changes: 9 additions & 4 deletions applications/acdc/src/acdc_stats.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@
,<<"connecting">>, <<"connected">>
,<<"wrapup">>, <<"paused">>, <<"outbound">>
]).
-record(status_stat, {id :: kz_term:api_binary() | '_'
,agent_id :: kz_term:api_binary() | '$2' | '_'
,account_id :: kz_term:api_binary() | '$1' | '_'

%% This key optimizes lookups in the ordered_set ETS table
-record(status_stat_key, {account_id = '_' :: kz_term:ne_binary() | '$1' | '_'
,agent_id = '_' :: kz_term:ne_binary() | '$2' | '_'
,timestamp = '_' :: pos_integer() | '$1' | '$3' | '_'
}).
-type status_stat_key() :: #status_stat_key{}.
-record(status_stat, {key = '_' :: status_stat_key() | '_'
,id :: kz_term:api_binary() | '_'
,status :: kz_term:api_binary() | '$4' | '_'
,timestamp :: kz_term:api_pos_integer() | '$1' | '$3' | '$5' | '_'

,wait_time :: kz_term:api_integer() | '_'
,pause_time :: kz_term:api_integer() | '_'
Expand Down

0 comments on commit f22e119

Please sign in to comment.