Skip to content

Commit

Permalink
2nd major commit to ACDC includes:
Browse files Browse the repository at this point in the history
1) Convert FSMs from gen_fsm to gen_statem
2) remove playback_stop dependency
3) manually merge Daniel's 4.3 PRs

List of Daniel's PRs merged:
Description                                                           Commit Id    kageds/master-acdc
                                                                                   status
-----------------------------------------------------------------------------------------------------
Actually set last_connect in acdc_agent_listener                      1ad1933    already present
PISTON-188: update agent_listener queue id during originate, no       81d70b4    already present
PISTON-371: handle ringing cancellations better when originate_       662a76c    already present
PISTON-55: acdc distributed member_connect_win (2600hz#6                    51ac54a    OK
PISTON-315: correctly identify endpoint id during load_endpoint       dbbb411    already present
PISTON-375: add dedicated force-restart API for acdc (#               9cead8a    OK
[4.3] PISTON-772: do not start agents in acdc_queue_manager           9eb224e    OK
[4.3] Check if ErrJObj is actually a JObj (2600hz#6563)                     0b5a29a    already present
[4.3] PISTON-183: acdc_agent_fsm state sync using shared events       bf8ea2d    already present
[4.3] PISTON-397: handle agent channels that are destroyed qui        d4afa8f    already present
PISTON-893: acdc_agent_listener doesn't need route.req, don't b       fe86855    OK
PISTON-290: refactor transfer tracking in acdc agent (#               26a67cc    already present
acdc_agent_fsm: don't set ready status until apply_state_update       21769cc    already present
PISTON-371: missed change in paused state 'originate_uuid' (          eb53a39    already present
PISTON-592: clear call state when ending wrapup early (#              475c729    already present
PISTON-976: significant performance improvements for acdc statu       f22e119    needs work
PISTON-625: do not pass pids as start_link args (2600hz#6                   9bd3339    OK
[4.3] PISTON-234: keep modified caller ID on phone after bridge       cb9a8c8    already present
[4.3] PISTON-352: track endpoints in acdc_agent_fsm better (          4dc25e7    already present
[4.3] Misc sync-up changes to acdc (2600hz#6575)                            a028dd8    already present
[4.3] PISTON-773: rework some acdc_agent_util status code (           d7497f2    needs work
[4.3] PISTON-116: acdc_agent_fsm: stop ringing when pausing           6e80559    already present
[4.3] remove unused acdc_agent_listener:maybe_update_presence_s       1914499    OK
[4.3] PISTON-1083: acdc stats: allow some wiggle room on query        ee3382d    OK
[4.3] acdc ignore/cancel/exit overhaul (2600hz#6588)                        08115c9    already present
[4.3] Fix number of tuple elements to acdc_queue_manager's agen       dcfed9f    already present
[4.3] Many dialyzer fixes for acdc (2600hz#6590)                            542e518    OK
[4.3] Support infinity for agent pause time (2600hz#65                      38ac480    OK
[4.3] PISTON-292: support for pause aliases (2600hz#65                      dfa6cb3    OK

NOTE: The 2 PRs identiified as "needs work" are related to ETS improvements and require further study as some ETS tables maybe moved
to a distrubted mnesia table in order to have a cluster view rather than a node view.
  • Loading branch information
danielfinke authored and kageds committed Jul 7, 2020
1 parent d98a322 commit b19e880
Show file tree
Hide file tree
Showing 31 changed files with 1,431 additions and 1,409 deletions.
1 change: 1 addition & 0 deletions applications/acdc/priv/couchdb/views/agents.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
" 'first_name': doc.first_name,",
" 'last_name': doc.last_name,",
" 'queues': doc.queues,",
" 'agent_priority': doc.acdc_agent_priority || 0,",
" 'skills': doc.acdc_skills || []",
" });",
"}"
Expand Down
7 changes: 4 additions & 3 deletions applications/acdc/priv/couchdb/views/queues.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
" for (i in doc.queues) {",
" emit(doc.queues[i], {",
" 'id': doc._id,",
" 'agent_priority': doc.acdc_agent_priority,",
" 'skills': doc.acdc_skills",
" 'agent_priority': doc.acdc_agent_priority || 0,",
" 'skills': doc.acdc_skills || []",
" });",
" }",
"}"
]
],
"reduce": "_count"
},
"crossbar_listing": {
"map": [
Expand Down
12 changes: 5 additions & 7 deletions applications/acdc/src/acdc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@

-define(CACHE_NAME, 'acdc_cache').

-define(ABANDON_TIMEOUT, 'member_timeout').
-define(ABANDON_EXIT, 'member_exit').
-define(ABANDON_HANGUP, 'member_hangup').
-define(ABANDON_EMPTY, 'member_exit_empty').
-define(ABANDON_TIMEOUT, <<"member_timeout">>).
-define(ABANDON_EXIT, <<"member_exit">>).
-define(ABANDON_HANGUP, <<"member_hangup">>).
-define(ABANDON_EMPTY, <<"member_exit_empty">>).
-define(ABANDON_INTERNAL_ERROR, <<"INTERNAL ERROR">>).

-define(PRESENCE_GREEN, <<"terminated">>).
-define(PRESENCE_RED_FLASH, <<"early">>).
Expand All @@ -35,9 +36,6 @@
-define(DESTROYED_CHANNEL_REG(AccountId, User), {'p', 'l', {'destroyed_channel', AccountId, User}}).
-define(DESTROYED_CHANNEL(CallId, HangupCause), {'call_down', CallId, HangupCause}).

-type abandon_reason() :: ?ABANDON_TIMEOUT | ?ABANDON_EXIT |
?ABANDON_HANGUP.

-type deliveries() :: [gen_listener:basic_deliver()].

-type announcements_pids() :: #{kz_term:ne_binary() => pid()}.
Expand Down
1,547 changes: 770 additions & 777 deletions applications/acdc/src/acdc_agent_fsm.erl

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ maybe_stop_agent(AccountId, AgentId, JObj) ->

end.

maybe_pause_agent(AccountId, AgentId, Timeout, Alias, JObj) when is_integer(Timeout) ->
maybe_pause_agent(AccountId, AgentId, <<"infinity">>, Alias, JObj) ->
maybe_pause_agent(AccountId, AgentId, 'infinity', Alias, JObj);
maybe_pause_agent(AccountId, AgentId, Timeout, Alias, JObj) when is_integer(Timeout)
orelse Timeout =:= 'infinity' ->
case acdc_agents_sup:find_agent_supervisor(AccountId, AgentId) of
'undefined' -> lager:debug("agent ~s (~s) not found, nothing to do", [AgentId, AccountId]);
Sup when is_pid(Sup) ->
Expand Down Expand Up @@ -320,13 +323,13 @@ handle_destroyed_channel(JObj, AccountId) ->
get_to_user(JObj) ->
case kz_json:is_defined(<<"To-Uri">>, JObj) of
true -> hd(binary:split(kz_json:get_value(<<"To-Uri">>, JObj), <<"@">>));
false -> get_to_or_destination_number(JObj)
false -> get_username_or_destination_number(JObj)
end.

-spec get_to_or_destination_number(kz_json:object()) -> kz_term:api_binary().
get_to_or_destination_number(JObj) ->
case kz_json:is_defined(<<"To">>, JObj) of
true -> hd(binary:split(kz_json:get_value(<<"To">>, JObj), <<"@">>));
-spec get_username_or_destination_number(kz_json:object()) -> kz_term:api_binary().
get_username_or_destination_number(JObj) ->
case kz_json:is_defined([<<"Custom-Channel-Vars">>,<<"Username">>], JObj) of
true -> kz_json:get_ne_value([<<"Custom-Channel-Vars">>,<<"Username">>], JObj);
false -> kz_json:get_value(<<"Caller-Destination-Number">>, JObj)
end.

Expand Down Expand Up @@ -479,12 +482,9 @@ handle_agent_change(AccountDb, AccountId, AgentId, ?DOC_EDITED) ->
P when is_pid(P) -> acdc_agent_fsm:refresh(acdc_agent_sup:fsm(P), JObj)
end;
handle_agent_change(_, AccountId, AgentId, ?DOC_DELETED) ->
case acdc_agents_sup:find_agent_supervisor(AccountId, AgentId) of
'undefined' -> lager:debug("user ~s has left us, but wasn't started", [AgentId]);
P when is_pid(P) ->
lager:debug("agent ~s(~s) has been deleted, stopping ~p", [AccountId, AgentId, P]),
_ = acdc_agent_sup:stop(P),
acdc_agent_stats:agent_logged_out(AccountId, AgentId)
case acdc_agents_sup:stop_agent(AccountId, AgentId) of
'ok' -> acdc_agent_stats:agent_logged_out(AccountId, AgentId);
_ -> 'ok'
end.

-spec handle_presence_probe(kz_json:object(), kz_term:proplist()) -> 'ok'.
Expand Down
86 changes: 28 additions & 58 deletions applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-behaviour(gen_listener).

%% API
-export([start_link/2, start_link/3, start_link/5
-export([start_link/3, start_link/4, start_link/5
,member_connect_resp/2
,member_connect_retry/2
,member_connect_accepted/1, member_connect_accepted/2, member_connect_accepted/3
Expand Down Expand Up @@ -41,15 +41,12 @@
,add_acdc_queue/3
,rm_acdc_queue/2
,call_status_req/1, call_status_req/2
,stop/1
,fsm_started/2
,add_endpoint_bindings/3, remove_endpoint_bindings/3
,outbound_call_id/2
,remove_cdr_urls/2
,logout_agent/1
,agent_info/2
,maybe_update_presence_id/2
,maybe_update_presence_state/2
,presence_update/2
,update_agent_status/2
]).
Expand Down Expand Up @@ -81,11 +78,11 @@
,original_call :: kapps_call:call()
,acdc_queue_id :: api_kz_term:ne_binary() % the ACDc Queue ID
,msg_queue_id :: api_kz_term:ne_binary() % the AMQP Queue ID of the ACDc Queue process
,agent_id :: api_kz_term:ne_binary()
,agent_id :: kz_term:api_ne_binary()
,agent_priority :: agent_priority()
,skills :: kz_term:ne_binaries() % skills this agent has
,acct_db :: api_kz_term:ne_binary()
,acct_id :: api_kz_term:ne_binary()
,acct_db :: kz_term:api_ne_binary()
,acct_id :: kz_term:api_binary()
,fsm_pid :: kz_term:api_pid()
,agent_queues = [] :: kz_term:ne_binaries()
,last_connect :: kz_term:kz_now() | undefined % last connection
Expand Down Expand Up @@ -178,15 +175,8 @@
%% @doc Starts the server
%% @end
%%------------------------------------------------------------------------------
-spec start_link(pid(), kz_json:object()) -> kz_term:startlink_ret().
start_link(Supervisor, AgentJObj) ->
AgentId = kz_doc:id(AgentJObj),
AccountId = account_id(AgentJObj),
Queues = kz_json:get_value(<<"queues">>, AgentJObj, []),
start_link(Supervisor, AgentJObj, AccountId, AgentId, Queues).

-spec start_link(pid(), kz_json:object(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binaries()) -> kz_term:startlink_ret().
start_link(Supervisor, AgentJObj, AccountId, AgentId, Queues) ->
-spec start_link(pid(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object(), kz_term:ne_binaries()) -> kz_types:startlink_ret().
start_link(Supervisor, AccountId, AgentId, AgentJObj, Queues) ->
lager:debug("start bindings for ~s(~s) in ready", [AccountId, AgentId]),
gen_listener:start_link(?SERVER
,[{'bindings', ?BINDINGS(AccountId, AgentId)}
Expand All @@ -195,6 +185,11 @@ start_link(Supervisor, AgentJObj, AccountId, AgentId, Queues) ->
,[Supervisor, AgentJObj, Queues]
).

-spec start_link(pid(), kz_term:ne_binary(), kz_term:ne_binary(), kz_json:object()) -> kz_types:startlink_ret().
start_link(Supervisor, AccountId, AgentId, AgentJObj) ->
Queues = kz_json:get_value(<<"queues">>, AgentJObj, []),
start_link(Supervisor, AccountId, AgentId, AgentJObj, Queues).

-spec start_link(pid(), kapps_call:call(), kz_term:ne_binary()) -> kz_term:startlink_ret().
start_link(Supervisor, ThiefCall, QueueId) ->
AgentId = kapps_call:owner_id(ThiefCall),
Expand All @@ -208,9 +203,6 @@ start_link(Supervisor, ThiefCall, QueueId) ->
,[Supervisor, ThiefCall, [QueueId]]
).

-spec stop(pid()) -> 'ok'.
stop(Srv) -> gen_listener:cast(Srv, {'stop_agent', self()}).

-spec member_connect_resp(pid(), kz_json:object()) -> 'ok'.
member_connect_resp(Srv, ReqJObj) ->
gen_listener:cast(Srv, {'member_connect_resp', ReqJObj}).
Expand Down Expand Up @@ -347,22 +339,6 @@ call_status_req(Srv, CallId) ->
fsm_started(Srv, FSM) ->
gen_listener:cast(Srv, {'fsm_started', FSM}).

-spec add_endpoint_bindings(pid(), kz_term:ne_binary(), api_kz_term:ne_binary()) -> 'ok'.
add_endpoint_bindings(_Srv, _Realm, 'undefined') ->
lager:debug("ignoring adding endpoint bindings for undefined user @ ~s", [_Realm]);
add_endpoint_bindings(Srv, Realm, User) ->
lager:debug("adding route bindings to ~p for endpoint ~s@~s", [Srv, User, Realm]),
gen_listener:add_binding(Srv, 'route', [{'realm', Realm}
,{'user', User}
]).

-spec remove_endpoint_bindings(pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
remove_endpoint_bindings(Srv, Realm, User) ->
lager:debug("removing route bindings to ~p for endpoint ~s@~s", [Srv, User, Realm]),
gen_listener:rm_binding(Srv, 'route', [{'realm', Realm}
,{'user', User}
]).

-spec remove_cdr_urls(pid(), kz_term:ne_binary()) -> 'ok'.
remove_cdr_urls(Srv, CallId) -> gen_listener:cast(Srv, {'remove_cdr_urls', CallId}).

Expand All @@ -374,11 +350,6 @@ maybe_update_presence_id(_Srv, 'undefined') -> 'ok';
maybe_update_presence_id(Srv, Id) ->
gen_listener:cast(Srv, {'presence_id', Id}).

-spec maybe_update_presence_state(pid(), api_kz_term:ne_binary()) -> 'ok'.
maybe_update_presence_state(_Srv, 'undefined') -> 'ok';
maybe_update_presence_state(Srv, State) ->
presence_update(Srv, State).

-spec presence_update(pid(), api_kz_term:ne_binary()) -> 'ok'.
presence_update(_, 'undefined') -> 'ok';
presence_update(Srv, PresenceState) ->
Expand Down Expand Up @@ -492,10 +463,6 @@ handle_cast({'refresh_config', JObj, StateName}, #state{agent_priority=Priority0
{'noreply', State#state{agent_priority=Priority
,skills=Skills
}};
handle_cast({'stop_agent', Req}, #state{supervisor=Supervisor}=State) ->
lager:debug("stop agent requested by ~p", [Req]),
_ = kz_process:spawn(fun acdc_agent_sup:stop/1, [Supervisor]),
{'noreply', State};

handle_cast({'fsm_started', FSMPid}, State) ->
lager:debug("fsm started: ~p", [FSMPid]),
Expand Down Expand Up @@ -580,8 +547,7 @@ handle_cast({'channel_hungup', CallId}, #state{call=Call
,'hibernate'};
'true' ->
lager:debug("thief is done, going down"),
stop(self()),
{'noreply', State}
{'stop', 'normal', State}
end;
_ ->
case props:get_value(CallId, ACallIds) of
Expand Down Expand Up @@ -852,8 +818,9 @@ handle_cast({'member_callback_accepted', ACall}, #state{msg_queue_id=AmqpQueue

send_member_callback_accepted(AmqpQueue, call_id(Call), AccountId, AgentId, MyId),

ACall1 = kapps_call:set_control_queue(props:get_value(ACallId, ACallIds), ACall),
kapps_call_command:prompt(<<"queue-now_calling_back">>, ACall1),
CtrlQ = props:get_value(ACallId, ACallIds),
ACall1 = kapps_call:set_control_queue(CtrlQ, ACall),
kapps_call_command:audio_macro([{'prompt', <<"queue-now_calling_back">>}], ACall1),

{'noreply', State#state{agent_call_ids=ACallIds1}, 'hibernate'};

Expand Down Expand Up @@ -1096,6 +1063,8 @@ terminate(Reason, #state{agent_queues=Queues
}
) when Reason == 'normal'; Reason == 'shutdown' ->
_ = [rm_queue_binding(AccountId, AgentId, QueueId) || QueueId <- Queues],
Reason =:= 'normal' %% Prevent race condition of supervisor delete_child/restart_child
andalso kz_process:spawn(fun acdc_agents_sup:stop_agent/2, [AccountId, AgentId]),
lager:debug("agent process going down: ~p", [Reason]);
terminate(_Reason, _State) ->
lager:debug("agent process going down: ~p", [_Reason]).
Expand Down Expand Up @@ -1596,28 +1565,28 @@ recording_format() ->

-spec agent_id(agent()) -> kz_term:api_binary().
agent_id(Agent) ->
case kz_json:is_json_object(Agent) of
'true' -> kz_doc:id(Agent);
'false' -> kapps_call:owner_id(Agent)
case is_thief(Agent) of
'true' -> kapps_call:owner_id(Agent);
'false' -> kz_doc:id(Agent)
end.

-spec account_id(agent()) -> kz_term:api_binary().
account_id(Agent) ->
case kz_json:is_json_object(Agent) of
'true' -> find_account_id(Agent);
'false' -> kapps_call:account_id(Agent)
case is_thief(Agent) of
'true' -> kapps_call:account_id(Agent);
'false' -> find_account_id(Agent)
end.

-spec account_db(agent()) -> kz_term:api_binary().
account_db(Agent) ->
case kz_json:is_json_object(Agent) of
'true' -> kz_doc:account_db(Agent);
'false' -> kapps_call:account_db(Agent)
case is_thief(Agent) of
'true' -> kapps_call:account_db(Agent);
'false' -> kz_doc:account_db(Agent)
end.

-spec record_calls(agent()) -> boolean().
record_calls(Agent) ->
kz_json:is_json_object(Agent)
not is_thief(Agent)
andalso kz_json:is_true(<<"record_calls">>, Agent, 'false').

-spec is_thief(agent()) -> boolean().
Expand All @@ -1636,6 +1605,7 @@ stop_agent_leg(ACallId, ACtrlQ) ->
lager:debug("sending hangup to ~s: ~s", [ACallId, ACtrlQ]),
kapi_dialplan:publish_command(ACtrlQ, Command).

-spec find_account_id(kz_json:object()) -> kz_term:api_ne_binary().
find_account_id(JObj) ->
case kz_doc:account_id(JObj) of
'undefined' -> kzs_util:format_account_id(kz_doc:account_db(JObj));
Expand Down
24 changes: 4 additions & 20 deletions applications/acdc/src/acdc_agent_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,15 @@ agent_status(AccountId, AgentId) ->
S -> acdc_agent_sup:status(S)
end.

-spec acct_restart(kz_term:text()) -> 'ok'.
-spec acct_restart(kz_term:text()) -> [kz_types:sup_startchild_ret()].
acct_restart(AccountId) when not is_binary(AccountId) ->
acct_restart(kz_term:to_binary(AccountId));
acct_restart(AccountId) ->
case acdc_agents_sup:find_acct_supervisors(AccountId) of
[] -> lager:info("no agents with account id ~s available", [AccountId]);
As ->
lager:debug("terminating existing agent processes in ~s", [AccountId]),
_ = [exit(Sup, 'kill') || Sup <- As],
lager:info("restarting agents in ~s", [AccountId]),
_ = acdc_init:init_acct_agents(AccountId),
'ok'
end.
acdc_agents_sup:restart_acct(AccountId).

-spec agent_restart(kz_term:text(), kz_term:text()) -> 'ok'.
-spec agent_restart(kz_term:text(), kz_term:text()) -> kz_types:sup_startchild_ret().
agent_restart(AccountId, AgentId) when not is_binary(AccountId);
not is_binary(AgentId) ->
agent_restart(kz_term:to_binary(AccountId), kz_term:to_binary(AgentId));
agent_restart(AccountId, AgentId) ->
case acdc_agents_sup:find_agent_supervisor(AccountId, AgentId) of
'undefined' -> lager:info("no agent ~s in account ~s available", [AgentId, AccountId]);
S ->
lager:info("terminating existing agent process ~p", [S]),
exit(S, 'kill'),
lager:info("restarting agent ~s in ~s", [AgentId, AccountId]),
acdc_agents_sup:new(AccountId, AgentId),
'ok'
end.
acdc_agents_sup:restart_agent(AccountId, AgentId).
Loading

0 comments on commit b19e880

Please sign in to comment.