Skip to content

Commit

Permalink
PISTON-55: acdc distributed member_connect_win (2600hz#6558)
Browse files Browse the repository at this point in the history
- better synchronizes states of acdc_agent_fsm per agent
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jun 2, 2020
1 parent 662a76c commit 51ac54a
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 162 deletions.
151 changes: 94 additions & 57 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-export([start_link/2, start_link/3, start_link/4, start_link/5
,call_event/4
,member_connect_req/2
,member_connect_win/2
,member_connect_win/3
,agent_timeout/2
,originate_ready/2
,originate_resp/2, originate_started/2, originate_uuid/2
Expand Down Expand Up @@ -134,14 +134,17 @@ member_connect_req(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'member_connect_req', JObj}).

%%------------------------------------------------------------------------------
%% @doc When a queue receives a call and needs an agent, it will send a
%% `member_connect_req'. The agent will respond (if possible) with a
%% `member_connect_resp' payload or ignore the request
%% @doc When an agent has been selected to handle the queue call, each process
%% for the agent will receive a `member_connect_win' event. The event will
%% include a flag of whether the winner is on the current node - if true, the
%% agent process will handle call control. Otherwise, the agent process will
%% just follow along through state transitions.
%% @end
%%------------------------------------------------------------------------------
-spec member_connect_win(pid(), kz_json:object()) -> 'ok'.
member_connect_win(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'member_connect_win', JObj}).
-type member_connect_win_node() :: 'same_node' | 'different_node'.
-spec member_connect_win(pid(), kz_json:object(), member_connect_win_node()) -> 'ok'.
member_connect_win(ServerRef, JObj, Node) ->
gen_statem:cast(ServerRef, {'member_connect_win', JObj, Node}).

-spec agent_timeout(pid(), kz_json:object()) -> 'ok'.
agent_timeout(ServerRef, JObj) ->
Expand Down Expand Up @@ -561,13 +564,12 @@ ready('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener}=State) ->
{'next_state', 'ready', State};
ready('cast', {'sync_resp', _}, State) ->
{'next_state', 'ready', State};
ready('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,agent_listener_id=MyId
,account_id=AccountId
,agent_id=AgentId
,connect_failures=CF
}=State) ->
ready('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,account_id=AccountId
,agent_id=AgentId
,connect_failures=CF
}=State) ->
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)),
CallId = kapps_call:call_id(Call),

Expand All @@ -580,51 +582,80 @@ ready('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener
CDRUrl = cdr_url(JObj),
RecordingUrl = recording_url(JObj),

case kz_json:get_value(<<"Agent-Process-ID">>, JObj) of
MyId ->
lager:debug("trying to ring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),

case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent_stats:agent_logged_out(AccountId, AgentId),
agent_logout(self()),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_agent_listener:bridge_to_member(AgentListener, Call, JObj, UpdatedEPs, CDRUrl, RecordingUrl),

CIDName = kapps_call:caller_id_name(Call),
CIDNum = kapps_call:caller_id_number(Call),

acdc_agent_stats:agent_connecting(AccountId, AgentId, CallId, CIDName, CIDNum, QueueId),
lager:info("trying to ring agent endpoints(~p)", [length(UpdatedEPs)]),
lager:debug("notifications for the queue: ~p", [kz_json:get_value(<<"Notifications">>, JObj)]),
{'next_state', 'ringing', State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
_OtherId ->
lager:debug("monitoring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),
lager:debug("trying to ring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),

case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent_stats:agent_logged_out(AccountId, AgentId),
agent_logout(self()),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_util:bind_to_call_events(Call, AgentListener),

acdc_agent_listener:monitor_call(AgentListener, Call, JObj, RecordingUrl),
acdc_agent_listener:bridge_to_member(AgentListener, Call, JObj, UpdatedEPs, CDRUrl, RecordingUrl),

CIDName = kapps_call:caller_id_name(Call),
CIDNum = kapps_call:caller_id_number(Call),

acdc_agent_stats:agent_connecting(AccountId, AgentId, CallId, CIDName, CIDNum, QueueId),
lager:info("trying to ring agent endpoints(~p)", [length(UpdatedEPs)]),
lager:debug("notifications for the queue: ~p", [kz_json:get_value(<<"Notifications">>, JObj)]),
{'next_state', 'ringing', State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
ready('cast', {'member_connect_win', JObj, 'different_node'}, #state{agent_listener=AgentListener
,endpoints=OrigEPs
,agent_id=AgentId
,connect_failures=CF
}=State) ->
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)),
CallId = kapps_call:call_id(Call),

kz_util:put_callid(CallId),

WrapupTimer = kz_json:get_integer_value(<<"Wrapup-Timeout">>, JObj, 0),
CallerExitKey = kz_json:get_value(<<"Caller-Exit-Key">>, JObj, <<"#">>),
QueueId = kz_json:get_value(<<"Queue-ID">>, JObj),

CDRUrl = cdr_url(JObj),
RecordingUrl = recording_url(JObj),

%% Only start monitoring if the agent can actually take the call
case get_endpoints(OrigEPs, AgentListener, Call, AgentId, QueueId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
{'next_state', 'paused', State};
{'error', _E} ->
lager:debug("can't take the call, skip me: ~p", [_E]),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', UpdatedEPs} ->
acdc_util:bind_to_call_events(Call, AgentListener),

acdc_agent_listener:monitor_call(AgentListener, Call, CDRUrl, RecordingUrl),
NextState = 'ringing',

lager:debug("monitoring agent ~s to connect to caller in queue ~s", [AgentId, QueueId]),
{'next_state', NextState, State#state{wrapup_timeout=WrapupTimer
,member_call=Call
,member_call_id=CallId
,member_call_start=kz_time:now()
,member_call_queue_id=QueueId
,caller_exit_key=CallerExitKey
,agent_call_id='undefined'
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
}}
end;
ready('cast', {'member_connect_req', _}, #state{max_connect_failures=Max
Expand Down Expand Up @@ -698,7 +729,7 @@ ready('info', Evt, State) ->
-spec ringing(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
ringing('cast', {'member_connect_req', _}, State) ->
{'next_state', 'ringing', State};
ringing('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
ringing('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (already ringing)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -959,7 +990,7 @@ ringing('info', Evt, State) ->
-spec answered(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
answered('cast', {'member_connect_req', _}, State) ->
{'next_state', 'answered', State};
answered('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
answered('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (on the phone with someone)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -1138,10 +1169,13 @@ wrapup('cast', {'pause', Timeout}, #state{account_id=AccountId
{'next_state', 'paused', State#state{pause_ref=Ref}};
wrapup('cast', {'member_connect_req', _}, State) ->
{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
wrapup('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (in wrapup)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'member_connect_win', _, 'different_node'}, State) ->
lager:debug("received member_connect_win for different node (wrapup)"),
{'next_state', 'wrapup', State#state{wrapup_timeout=0}};
wrapup('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener
,wrapup_ref=Ref
Expand Down Expand Up @@ -1204,7 +1238,7 @@ paused('cast', {'sync_resp', _}, State) ->
{'next_state', 'paused', State};
paused('cast', {'member_connect_req', _}, State) ->
{'next_state', 'paused', State};
paused('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
paused('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),

Expand Down Expand Up @@ -1251,7 +1285,7 @@ paused('info', Evt, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec outbound(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
outbound('cast', {'member_connect_win', JObj}, #state{agent_listener=AgentListener}=State) ->
outbound('cast', {'member_connect_win', JObj, 'same_node'}, #state{agent_listener=AgentListener}=State) ->
lager:debug("agent won, but can't process this right now (on outbound call)"),
acdc_agent_listener:member_connect_retry(AgentListener, JObj),
{'next_state', 'outbound', State};
Expand Down Expand Up @@ -1416,6 +1450,9 @@ handle_event(Event, StateName, State) ->
-spec handle_info(any(), atom(), state()) -> kz_types:handle_fsm_ret(state()).
handle_info({'timeout', _Ref, ?SYNC_RESPONSE_MESSAGE}, StateName, State) ->
{'next_state', StateName, State};
handle_info({'member_connect_win', _, 'different_node'}, StateName, State) ->
lager:debug("received member_connect_win for different node (~s)", [StateName]),
{'next_state', StateName, State};
handle_info({'endpoint_edited', EP}, StateName, #state{endpoints=EPs
,account_id=AccountId
,agent_id=AgentId
Expand Down
9 changes: 7 additions & 2 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,13 @@ handle_member_message(JObj, Props, <<"connect_req">>) ->
'true' = kapi_acdc_queue:member_connect_req_v(JObj),
acdc_agent_fsm:member_connect_req(props:get_value('fsm_pid', Props), JObj);
handle_member_message(JObj, Props, <<"connect_win">>) ->
'true' = kapi_acdc_queue:member_connect_win_v(JObj),
acdc_agent_fsm:member_connect_win(props:get_value('fsm_pid', Props), JObj);
'true' = kapi_acdc_agent:member_connect_win_v(JObj),
FSMPid = props:get_value('fsm_pid', Props),
MyId = acdc_util:proc_id(FSMPid),
case kz_json:get_value(<<"Agent-Process-ID">>, JObj) of
MyId -> acdc_agent_fsm:member_connect_win(FSMPid, JObj, 'same_node');
_ -> acdc_agent_fsm:member_connect_win(FSMPid, JObj, 'different_node')
end;
handle_member_message(_, _, EvtName) ->
lager:debug("not handling member event ~s", [EvtName]).

Expand Down
2 changes: 1 addition & 1 deletion applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
-define(BINDINGS(AcctId, AgentId), [{'self', []}
,{'acdc_agent', [{'account_id', AcctId}
,{'agent_id', AgentId}
,{'restrict_to', ['sync', 'stats_req']}
,{'restrict_to', ['member_connect_win', 'sync', 'stats_req']}
]}
,{'conf', [{'action', <<"*">>}
,{'db', kz_util:format_account_id(AcctId, 'encoded')}
Expand Down
4 changes: 2 additions & 2 deletions applications/acdc/src/acdc_queue_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,15 @@ send_member_connect_req(CallId, AccountId, QueueId, MyQ, MyId) ->
-spec send_member_connect_win(kz_json:object(), kapps_call:call(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:proplist()) -> 'ok'.
send_member_connect_win(RespJObj, Call, QueueId, MyQ, MyId, QueueOpts) ->
CallJSON = kapps_call:to_json(Call),
Q = kz_json:get_value(<<"Server-ID">>, RespJObj),
Win = props:filter_undefined(
[{<<"Call">>, CallJSON}
,{<<"Process-ID">>, MyId}
,{<<"Agent-Process-ID">>, kz_json:get_value(<<"Agent-Process-ID">>, RespJObj)}
,{<<"Queue-ID">>, QueueId}
,{<<"Agent-ID">>, kz_json:get_value(<<"Agent-ID">>, RespJObj)}
| QueueOpts ++ kz_api:default_headers(MyQ, ?APP_NAME, ?APP_VERSION)
]),
publish(Q, Win, fun kapi_acdc_queue:publish_member_connect_win/2).
publish(Win, fun kapi_acdc_agent:publish_member_connect_win/1).

-spec send_agent_timeout(kz_json:object(), kapps_call:call(), kz_term:ne_binary()) -> 'ok'.
send_agent_timeout(RespJObj, Call, QueueId) ->
Expand Down
Loading

0 comments on commit 51ac54a

Please sign in to comment.