diff --git a/applications/acdc/src/acdc_agent_stats.erl b/applications/acdc/src/acdc_agent_stats.erl index 49c2e8bf6e5..f023f7f8f68 100644 --- a/applications/acdc/src/acdc_agent_stats.erl +++ b/applications/acdc/src/acdc_agent_stats.erl @@ -2,6 +2,7 @@ %%% @copyright (C) 2014-2020, 2600Hz %%% @doc Collector of stats for agents %%% @author James Aimonetti +%%% @author Daniel Finke %%% @end %%%----------------------------------------------------------------------------- -module(acdc_agent_stats). @@ -19,6 +20,7 @@ ,handle_status_stat/2 ,handle_status_query/2 + ,status_stat_key/3 ,status_stat_id/3 ,status_table_id/0 @@ -31,15 +33,19 @@ -include("acdc.hrl"). -include("acdc_stats.hrl"). +%%------------------------------------------------------------------------------ +%% @doc Status stat table configuration +%% @end +%%------------------------------------------------------------------------------ -spec status_table_id() -> atom(). status_table_id() -> 'acdc_stats_status'. -spec status_key_pos() -> pos_integer(). -status_key_pos() -> #status_stat.id. +status_key_pos() -> #status_stat.key. -spec status_table_opts() -> kz_term:proplist(). status_table_opts() -> - ['protected', 'named_table' + ['ordered_set', 'protected', 'named_table' ,{'keypos', status_key_pos()} ]. @@ -210,16 +216,15 @@ handle_status_stat(JObj, Props) -> 'false' end, - AgentId = kz_json:get_value(<<"Agent-ID">>, JObj), + AccountId = kz_json:get_ne_binary_value(<<"Account-ID">>, JObj), + AgentId = kz_json:get_ne_binary_value(<<"Agent-ID">>, JObj), Timestamp = kz_json:get_integer_value(<<"Timestamp">>, JObj), gen_listener:cast(props:get_value('server', Props) ,{'create_status' - ,#status_stat{id=status_stat_id(AgentId, Timestamp, EventName) - ,agent_id=AgentId - ,account_id=kz_json:get_value(<<"Account-ID">>, JObj) + ,#status_stat{key=status_stat_key(AccountId, AgentId, Timestamp) + ,id=status_stat_id(AgentId, Timestamp, EventName) ,status=EventName - ,timestamp=Timestamp ,callid=kz_json:get_value(<<"Call-ID">>, JObj) ,wait_time=acdc_stats_util:wait_time(EventName, JObj) ,pause_time=acdc_stats_util:pause_time(EventName, JObj) @@ -230,6 +235,18 @@ handle_status_stat(JObj, Props) -> } ). +%%------------------------------------------------------------------------------ +%% @doc Status stat table key is in an order which can optimize the ordered_set +%% lookup if partially bound +%% @end +%%------------------------------------------------------------------------------ +-spec status_stat_key(kz_term:ne_binary(), kz_term:ne_binary(), pos_integer()) -> status_stat_key(). +status_stat_key(AccountId, AgentId, Timestamp) -> + #status_stat_key{account_id=AccountId + ,agent_id=AgentId + ,timestamp=Timestamp + }. + -spec status_stat_id(kz_term:ne_binary(), pos_integer(), any()) -> kz_term:ne_binary(). status_stat_id(AgentId, Timestamp, _EventName) -> <>. @@ -255,12 +272,19 @@ publish_query_errors(RespQ, MsgId, Errors) -> lager:debug("responding with errors to req ~s: ~p", [MsgId, Errors]), kapi_acdc_stats:publish_status_err(RespQ, API). +%%------------------------------------------------------------------------------ +%% @doc Build a match spec for querying status stats +%% @end +%%------------------------------------------------------------------------------ +-spec status_build_match_spec(kz_json:object()) -> + {'ok', ets:match_spec()} | + {'error', kz_json:object()}. status_build_match_spec(JObj) -> case kz_json:get_value(<<"Account-ID">>, JObj) of 'undefined' -> {'error', kz_json:from_list([{<<"Account-ID">>, <<"missing but required">>}])}; AccountId -> - AcctMatch = {#status_stat{account_id='$1', _='_'} + AcctMatch = {#status_stat{key=#status_stat_key{account_id='$1'}, _='_'} ,[{'=:=', '$1', {'const', AccountId}}] }, status_build_match_spec(JObj, AcctMatch) @@ -277,7 +301,8 @@ status_build_match_spec(JObj, AcctMatch) -> status_match_builder_fold(_, _, {'error', _Err}=E) -> E; status_match_builder_fold(<<"Agent-ID">>, AgentId, {StatusStat, Contstraints}) -> - {StatusStat#status_stat{agent_id='$2'} + Key = StatusStat#status_stat.key, + {StatusStat#status_stat{key=Key#status_stat_key{agent_id='$2'}} ,[{'=:=', '$2', {'const', AgentId}} | Contstraints] }; status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints}) -> @@ -297,7 +322,8 @@ status_match_builder_fold(<<"Start-Range">>, Start, {StatusStat, Contstraints}) ,{<<"Current-Timestamp">>, Now} ])}; N -> - {StatusStat#status_stat{timestamp='$3'} + Key = StatusStat#status_stat.key, + {StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}} ,[{'>=', '$3', N} | Contstraints] } catch @@ -319,7 +345,8 @@ status_match_builder_fold(<<"End-Range">>, End, {StatusStat, Contstraints}) -> ,{<<"Current-Timestamp">>, Now} ])}; N -> - {StatusStat#status_stat{timestamp='$3'} + Key = StatusStat#status_stat.key, + {StatusStat#status_stat{key=Key#status_stat_key{timestamp='$3'}} ,[{'=<', '$3', N} | Contstraints] } catch @@ -332,54 +359,90 @@ status_match_builder_fold(<<"Status">>, Status, {StatusStat, Contstraints}) -> }; status_match_builder_fold(_, _, Acc) -> Acc. +%%------------------------------------------------------------------------------ +%% @doc Execute a status query +%% @end +%%------------------------------------------------------------------------------ -spec query_statuses(kz_term:ne_binary(), kz_term:ne_binary(), ets:match_spec(), pos_integer() | 'no_limit') -> 'ok'. query_statuses(RespQ, MsgId, Match, Limit) -> - case ets:select(status_table_id(), Match) of - [] -> - lager:debug("no stats found, sorry ~s", [RespQ]), - Resp = [{<<"Error-Reason">>, <<"No agents found">>} - ,{<<"Msg-ID">>, MsgId} - | kz_api:default_headers(?APP_NAME, ?APP_VERSION) - ], - kapi_acdc_stats:publish_status_err(RespQ, Resp); - Stats -> - QueryResults = lists:foldl(fun query_status_fold/2, kz_json:new(), Stats), - TrimmedResults = kz_json:map(fun(A, B) -> - {A, trim_query_statuses(B, Limit)} - end, QueryResults), - - Resp = [{<<"Agents">>, TrimmedResults} - ,{<<"Msg-ID">>, MsgId} - | kz_api:default_headers(?APP_NAME, ?APP_VERSION) - ], - kapi_acdc_stats:publish_status_resp(RespQ, Resp) - end. + Stats = ets:select_reverse(status_table_id(), Match), --spec trim_query_statuses(kz_json:object(), pos_integer() | 'no_limit') -> kz_json:object(). -trim_query_statuses(Statuses, Limit) -> - StatusProps = kz_json:to_proplist(Statuses), - SortedProps = lists:sort(fun({A, _}, {B, _}) -> - kz_term:to_integer(A) >= kz_term:to_integer(B) - end, StatusProps), - LimitedProps = case Limit of - 'no_limit' -> SortedProps; - _ -> lists:sublist(SortedProps, Limit) - end, - kz_json:from_list(LimitedProps). - --spec query_status_fold(status_stat(), kz_json:object()) -> kz_json:object(). -query_status_fold(#status_stat{agent_id=AgentId - ,timestamp=T - }=Stat, Acc) -> - Doc = kz_doc:public_fields(status_stat_to_doc(Stat)), - kz_json:set_value([AgentId, kz_term:to_binary(T)], Doc, Acc). + case Stats of + [] -> lager:debug("no stats found (requester: ~s)", [RespQ]); + _ -> 'ok' + end, + + Resp = [{<<"Agents">>, query_statuses_group_by_agent(Stats, Limit)} + ,{<<"Msg-ID">>, MsgId} + | kz_api:default_headers(?APP_NAME, ?APP_VERSION) + ], + kapi_acdc_stats:publish_status_resp(RespQ, Resp). + +%%------------------------------------------------------------------------------ +%% @doc Group status stats by agent and return the whole map as a JObj. Each +%% agent grouping will contain at max "Limit" number of status stats +%% @end +%%------------------------------------------------------------------------------ +query_statuses_group_by_agent(Stats, Limit) -> + query_statuses_fold(Stats, Limit, #{}). + +query_statuses_fold([], _, StatsByAgent) -> kz_json:from_map(StatsByAgent); +query_statuses_fold([#status_stat{key=#status_stat_key{agent_id=AgentId + ,timestamp=Timestamp + } + }=Stat + | Stats + ], Limit, StatsByAgent) -> + AgentStats = maps:get(AgentId, StatsByAgent, #{}), + StatsByAgent1 = case Limit =:= 'no_limit' + orelse maps:size(AgentStats) < Limit + of + 'true' -> + TimestampBin = kz_term:to_binary(Timestamp), + Stat1 = status_stat_to_map(Stat), + AgentStats1 = AgentStats#{TimestampBin => Stat1}, + StatsByAgent#{AgentId => AgentStats1}; + 'false' -> StatsByAgent + end, + query_statuses_fold(Stats, Limit, StatsByAgent1). + +%%------------------------------------------------------------------------------ +%% @doc Convert a status stat record to a map that can be efficiently parsed +%% into a JObj +%% @end +%%------------------------------------------------------------------------------ +-spec status_stat_to_map(status_stat()) -> map(). +status_stat_to_map(#status_stat{key=#status_stat_key{agent_id=AgentId + ,timestamp=Timestamp + } + ,id=Id + ,status=Status + ,wait_time=WT + ,pause_time=PT + ,callid=CallId + ,caller_id_name=CIDName + ,caller_id_number=CIDNum + ,queue_id=QueueId + }) -> + #{'agent_id' => AgentId + ,'timestamp' => Timestamp + ,'id' => Id + ,'status' => Status + ,'wait_time' => WT + ,'pause_time' => PT + ,'call_id' => CallId + ,'caller_id_name' => CIDName + ,'caller_id_number' => CIDNum + ,'queue_id' => QueueId + }. -spec status_stat_to_doc(status_stat()) -> kz_json:object(). -status_stat_to_doc(#status_stat{id=Id - ,agent_id=AgentId - ,account_id=AccountId +status_stat_to_doc(#status_stat{key=#status_stat_key{account_id=AccountId + ,agent_id=AgentId + ,timestamp=Timestamp + } + ,id=Id ,status=Status - ,timestamp=Timestamp ,wait_time=WT ,pause_time=PT ,callid=CallId @@ -418,7 +481,7 @@ archive_status_data(Srv, 'true') -> archive_status_data(Srv, 'false') -> kz_util:put_callid(<<"acdc_stats.status_archiver">>), Past = kz_time:now_s() - ?ARCHIVE_WINDOW, - Match = [{#status_stat{timestamp='$1' + Match = [{#status_stat{key=#status_stat_key{timestamp='$1'} ,is_archived='$2' ,_='_' } @@ -438,14 +501,14 @@ maybe_archive_status_data(Srv, Match) -> _ = [kz_datamgr:save_docs(acdc_stats_util:db_name(Acct), Docs) || {Acct, Docs} <- dict:to_list(ToSave) ], - _ = [gen_listener:cast(Srv, {'update_status', Id, [{#status_stat.is_archived, 'true'}]}) - || #status_stat{id=Id} <- Stats + _ = [gen_listener:cast(Srv, {'update_status', Id, Key, [{#status_stat.is_archived, 'true'}]}) + || #status_stat{id=Id, key=Key} <- Stats ], 'ok' end. -spec archive_status_fold(status_stat(), dict:dict()) -> dict:dict(). -archive_status_fold(#status_stat{account_id=AccountId}=Stat, Acc) -> +archive_status_fold(#status_stat{key=#status_stat_key{account_id=AccountId}}=Stat, Acc) -> Doc = status_stat_to_doc(Stat), dict:update(AccountId, fun(L) -> [Doc | L] end, [Doc], Acc). diff --git a/applications/acdc/src/acdc_stats.erl b/applications/acdc/src/acdc_stats.erl index 812e7926cce..781eb68a62d 100644 --- a/applications/acdc/src/acdc_stats.erl +++ b/applications/acdc/src/acdc_stats.erl @@ -412,9 +412,9 @@ handle_cast({'remove_call', [{M, P, _}]}, State) -> andalso lager:debug("removed calls: ~p", [N]), {'noreply', State}; -handle_cast({'update_status', Id, Updates}, State) -> +handle_cast({'update_status', Id, Key, Updates}, State) -> lager:debug("updating status stat ~s: ~p", [Id, Updates]), - ets:update_element(acdc_agent_stats:status_table_id(), Id, Updates), + ets:update_element(acdc_agent_stats:status_table_id(), Key, Updates), {'noreply', State}; handle_cast({'remove_status', [{M, P, _}]}, State) -> Match = [{M, P, ['true']}], @@ -686,7 +686,7 @@ cleanup_data(Srv) -> }], gen_listener:cast(Srv, {'remove_call', CallMatch}), - StatusMatch = [{#status_stat{timestamp='$1', _='_'} + StatusMatch = [{#status_stat{key=#status_stat_key{timestamp='$1'}, _='_'} ,[{'=<', '$1', Past}] ,['$_'] }], diff --git a/applications/acdc/src/acdc_stats.hrl b/applications/acdc/src/acdc_stats.hrl index 15a27d6161a..dd4e6b52207 100644 --- a/applications/acdc/src/acdc_stats.hrl +++ b/applications/acdc/src/acdc_stats.hrl @@ -46,11 +46,16 @@ ,<<"connecting">>, <<"connected">> ,<<"wrapup">>, <<"paused">>, <<"outbound">> ]). --record(status_stat, {id :: kz_term:api_binary() | '_' - ,agent_id :: kz_term:api_binary() | '$2' | '_' - ,account_id :: kz_term:api_binary() | '$1' | '_' + +%% This key optimizes lookups in the ordered_set ETS table +-record(status_stat_key, {account_id = '_' :: kz_term:ne_binary() | '$1' | '_' + ,agent_id = '_' :: kz_term:ne_binary() | '$2' | '_' + ,timestamp = '_' :: pos_integer() | '$1' | '$3' | '_' + }). +-type status_stat_key() :: #status_stat_key{}. +-record(status_stat, {key = '_' :: status_stat_key() | '_' + ,id :: kz_term:api_binary() | '_' ,status :: kz_term:api_binary() | '$4' | '_' - ,timestamp :: kz_term:api_pos_integer() | '$1' | '$3' | '$5' | '_' ,wait_time :: kz_term:api_integer() | '_' ,pause_time :: kz_term:api_integer() | '_'