Skip to content

Commit

Permalink
[4.3] PISTON-183: acdc_agent_fsm state sync using shared events (2600…
Browse files Browse the repository at this point in the history
…hz#6565)

PISTON-183: make apis
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jun 2, 2020
1 parent d950c9d commit bf8ea2d
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 68 deletions.
165 changes: 98 additions & 67 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
,member_connect_req/2
,member_connect_win/3
,agent_timeout/2
,shared_failure/2
,shared_call_id/2
,originate_ready/2
,originate_resp/2, originate_started/2, originate_uuid/2
,originate_failed/2
Expand Down Expand Up @@ -116,6 +118,7 @@
,max_connect_failures :: timeout()
,connect_failures = 0 :: non_neg_integer()
,agent_state_updates = [] :: list()
,monitoring = 'false' :: boolean() % process is not handling call, but following state transitions
}).
-type state() :: #state{}.

Expand Down Expand Up @@ -150,6 +153,14 @@ member_connect_win(ServerRef, JObj, Node) ->
agent_timeout(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'agent_timeout', JObj}).

-spec shared_failure(pid(), kz_json:object()) -> 'ok'.
shared_failure(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'shared_failure', JObj}).

-spec shared_call_id(pid(), kz_json:object()) -> 'ok'.
shared_call_id(ServerRef, JObj) ->
gen_statem:cast(ServerRef, {'shared_call_id', JObj}).

%%------------------------------------------------------------------------------
%% @doc When an agent is involved in a call, it will receive call events.
%% Pass the call event to the `statem' to see if action is needed (usually
Expand All @@ -170,7 +181,7 @@ call_event(ServerRef, <<"call_event">>, <<"LEG_CREATED">>, JObj) ->
call_event(ServerRef, <<"call_event">>, <<"LEG_DESTROYED">>, JObj) ->
gen_statem:cast(ServerRef, {'leg_destroyed', call_id(JObj)});
call_event(ServerRef, <<"call_event">>, <<"CHANNEL_ANSWER">>, JObj) ->
gen_statem:cast(ServerRef, {'channel_answered', call_id(JObj)});
gen_statem:cast(ServerRef, {'channel_answered', JObj});
call_event(ServerRef, <<"call_event">>, <<"DTMF">>, EvtJObj) ->
gen_statem:cast(ServerRef, {'dtmf_pressed', kz_json:get_value(<<"DTMF-Digit">>, EvtJObj)});
call_event(ServerRef, <<"call_event">>, <<"CHANNEL_EXECUTE_COMPLETE">>, JObj) ->
Expand Down Expand Up @@ -637,6 +648,7 @@ ready('cast', {'member_connect_win', JObj, 'different_node'}, #state{agent_liste
,caller_exit_key=CallerExitKey
,endpoints=UpdatedEPs
,queue_notifications=kz_json:get_value(<<"Notifications">>, JObj)
,monitoring='true'
}}
end;
ready('cast', {'member_connect_req', _}, #state{max_connect_failures=Max
Expand All @@ -654,7 +666,8 @@ ready('cast', {'member_connect_req', JObj}, #state{agent_listener=AgentListener}
ready('cast', {'originate_uuid', ACallId, ACtrlQ}, #state{agent_listener=AgentListener}=State) ->
acdc_agent_listener:originate_uuid(AgentListener, ACallId, ACtrlQ),
{'next_state', 'ready', State};
ready('cast', {'channel_answered', CallId}, #state{outbound_call_ids=OutboundCallIds}=State) ->
ready('cast', {'channel_answered', JObj}, #state{outbound_call_ids=OutboundCallIds}=State) ->
CallId = call_id(JObj),
case lists:member(CallId, OutboundCallIds) of
'true' ->
lager:debug("agent picked up outbound call ~s", [CallId]),
Expand Down Expand Up @@ -751,44 +764,41 @@ ringing('cast', {'originate_failed', E}, #state{agent_listener=AgentListener
,agent_id=AgentId
,member_call_queue_id=QueueId
,member_call_id=CallId
,connect_failures=Fails
,max_connect_failures=MaxFails
}=State) ->
acdc_agent_listener:member_connect_retry(AgentListener, CallId),

ErrReason = missed_reason(kz_json:get_value(<<"Error-Message">>, E)),
lager:debug("originate failed (~s), broadcasting", [ErrReason]),
kapi_acdc_agent:publish_shared_originate_failure([{<<"Account-ID">>, AccountId}
,{<<"Agent-ID">>, AgentId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
]),

lager:debug("ringing agent failed: ~s", [ErrReason]),
acdc_agent_listener:member_connect_retry(AgentListener, CallId),

acdc_stats:call_missed(AccountId, QueueId, AgentId, CallId, ErrReason),

acdc_agent_listener:presence_update(AgentListener, ?PRESENCE_GREEN),

State1 = clear_call(State, 'failed'),
StateName1 = return_to_state(Fails+1, MaxFails),
case StateName1 of
'paused' -> {'next_state', 'paused', State1};
'ready' -> apply_state_updates(State1)
end;
{'next_state', 'ringing', State};
ringing('cast', {'agent_timeout', _JObj}, #state{agent_listener=AgentListener
,account_id=AccountId
,agent_id=AgentId
,member_call_queue_id=QueueId
,member_call_id=CallId
,connect_failures=Fails
,max_connect_failures=MaxFails
}=State) ->
ErrReason = <<"timeout">>,
lager:debug("agent timeout, publishing originate failed"),
kapi_acdc_agent:publish_shared_originate_failure([{<<"Account-ID">>, AccountId}
,{<<"Agent-ID">>, AgentId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
]),

acdc_agent_listener:agent_timeout(AgentListener),
lager:debug("recv timeout from queue process"),
acdc_stats:call_missed(AccountId, QueueId, AgentId, CallId, <<"timeout">>),

acdc_stats:call_missed(AccountId, QueueId, AgentId, CallId, ErrReason),

acdc_agent_listener:presence_update(AgentListener, ?PRESENCE_GREEN),
State1 = clear_call(State, 'failed'),
StateName1 = return_to_state(Fails+1, MaxFails),
case StateName1 of
'paused' -> {'next_state', 'paused', State1};
'ready' -> apply_state_updates(State1)
end;

{'next_state', 'ringing', State};
ringing('cast', {'channel_bridged', MemberCallId}, #state{member_call_id=MemberCallId
,member_call=MemberCall
,agent_listener=AgentListener
Expand Down Expand Up @@ -823,35 +833,24 @@ ringing('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF
ringing('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=_ExitKey}=State) ->
lager:debug("caller pressed ~s, exit key is ~s", [DTMF, _ExitKey]),
{'next_state', 'ringing', State};
ringing('cast', {'channel_answered', MemberCallId}, #state{member_call_id=MemberCallId}=State) ->
lager:debug("caller's channel answered"),
{'next_state', 'ringing', State};
ringing('cast', {'channel_answered', OtherCallId}, #state{account_id=AccountId
,agent_id=AgentId
,member_call=MemberCall
,member_call_id=MemberCallId
,agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
,member_call_queue_id=QueueId
}=State) ->
case lists:member(OtherCallId, OutboundCallIds) of
'true' ->
lager:debug("agent picked up outbound call ~s instead of the queue call ~s", [OtherCallId, MemberCallId]),
acdc_agent_listener:hangup_call(AgentListener),
{'next_state', 'outbound', start_outbound_call_handling(OtherCallId, clear_call(State, 'ready')), 'hibernate'};
'false' ->
lager:debug("recv answer for ~s, probably the agent's call", [OtherCallId]),

CIDName = kapps_call:caller_id_name(MemberCall),
CIDNum = kapps_call:caller_id_number(MemberCall),

acdc_agent_stats:agent_connected(AccountId, AgentId, MemberCallId, CIDName, CIDNum, QueueId),

acdc_agent_listener:presence_update(AgentListener, ?PRESENCE_RED_SOLID),

{'next_state', 'answered', State#state{agent_call_id=OtherCallId
,connect_failures=0
}}
ringing('cast', {'channel_answered', JObj}, #state{member_call_id=MemberCallId
,agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
}=State) ->
case call_id(JObj) of
MemberCallId ->
lager:debug("caller's channel answered"),
{'next_state', 'ringing', State};
OtherCallId ->
case lists:member(OtherCallId, OutboundCallIds) of
'true' ->
lager:debug("agent picked up outbound call ~s instead of the queue call ~s", [OtherCallId, MemberCallId]),
acdc_agent_listener:hangup_call(AgentListener),
{'next_state', 'outbound', start_outbound_call_handling(OtherCallId, clear_call(State, 'ready')), 'hibernate'};
'false' ->
lager:debug("recv answer for ~s, probably the agent's call", [OtherCallId]),
{'next_state', 'ringing', State#state{agent_call_id=OtherCallId}}
end
end;
ringing('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener}=State) ->
lager:debug("recv sync_req from ~s", [kz_json:get_value(<<"Process-ID">>, JObj)]),
Expand All @@ -867,16 +866,41 @@ ringing('cast', {'originate_resp', ACallId}, #state{agent_listener=AgentListener
,queue_notifications=Ns
,member_call_queue_id=QueueId
}=State) ->
lager:debug("originate resp on ~s, connecting to caller", [ACallId]),
acdc_agent_listener:member_connect_accepted(AgentListener, ACallId),
lager:debug("originate resp on ~s, broadcasting", [ACallId]),
kapi_acdc_agent:publish_shared_call_id([{<<"Account-ID">>, AccountId}
,{<<"Agent-ID">>, AgentId}
,{<<"Agent-Call-ID">>, ACallId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
]),

maybe_notify(Ns, ?NOTIFY_PICKUP, State),

CIDName = kapps_call:caller_id_name(MemberCall),
CIDNum = kapps_call:caller_id_number(MemberCall),

acdc_agent_listener:member_connect_accepted(AgentListener, ACallId),
acdc_agent_stats:agent_connected(AccountId, AgentId, MemberCallId, CIDName, CIDNum, QueueId),

{'next_state', 'ringing', State};
ringing('cast', {'shared_failure', _JObj}, #state{connect_failures=Fails
,max_connect_failures=MaxFails
}=State) ->
lager:debug("shared originate failure"),

NewFSMState = clear_call(State, 'failed'),
NextState = return_to_state(Fails+1, MaxFails),
case NextState of
'paused' -> {'next_state', 'paused', NewFSMState};
'ready' -> apply_state_updates(NewFSMState)
end;
ringing('cast', {'shared_call_id', JObj}, #state{agent_listener=AgentListener}=State) ->
ACallId = kz_json:get_value(<<"Agent-Call-ID">>, JObj),

lager:debug("shared call id ~s acquired, connecting to caller", [ACallId]),

acdc_util:bind_to_call_events(ACallId, AgentListener),
acdc_agent_listener:monitor_connect_accepted(AgentListener, ACallId),

{'next_state', 'answered', State#state{agent_call_id=ACallId
,connect_failures=0
}};
Expand Down Expand Up @@ -1028,20 +1052,26 @@ answered('cast', {'channel_unbridged', CallId}, #state{member_call_id=CallId}=St
answered('cast', {'channel_unbridged', CallId}, #state{agent_call_id=CallId}=State) ->
lager:info("agent channel unbridged"),
{'next_state', 'answered', State};
answered('cast', {'channel_answered', MemberCallId}, #state{member_call_id=MemberCallId}=State) ->
lager:debug("member's channel has answered"),
{'next_state', 'answered', State};
answered('cast', {'channel_answered', AgentCallId}, #state{agent_call_id=AgentCallId}=State) ->
lager:debug("agent's channel ~s has answered", [AgentCallId]),
{'next_state', 'answered', State};
answered('cast', {'channel_answered', OtherCallId}=Evt, #state{outbound_call_ids=OutboundCallIds}=State) ->
case lists:member(OtherCallId, OutboundCallIds) of
'true' ->
lager:debug("agent answered outbound call ~s", [OtherCallId]),
answered('cast', {'channel_answered', JObj}=Evt, #state{agent_call_id=AgentCallId
,member_call_id=MemberCallId
,outbound_call_ids=OutboundCallIds
}=State) ->
case call_id(JObj) of
AgentCallId ->
lager:debug("agent's channel ~s has answered", [AgentCallId]),
{'next_state', 'answered', State};
'false' ->
lager:debug("unexpected event while answered: ~p", [Evt]),
{'next_state', 'answered', State}
MemberCallId ->
lager:debug("member's channel has answered"),
{'next_state', 'answered', State};
OtherCallId ->
case lists:member(OtherCallId, OutboundCallIds) of
'true' ->
lager:debug("agent answered outbound call ~s", [OtherCallId]),
{'next_state', 'answered', State};
'false' ->
lager:debug("unexpected event while answered: ~p", [Evt]),
{'next_state', 'answered', State}
end
end;
answered('cast', {'originate_started', _CallId}, State) ->
{'next_state', 'answered', State};
Expand Down Expand Up @@ -1618,6 +1648,7 @@ clear_call(#state{statem_call_id=StateMCallId
,member_call_queue_id = 'undefined'
,agent_call_id = 'undefined'
,caller_exit_key = <<"#">>
,monitoring = 'false'
}.

-spec current_call(kapps_call:call() | 'undefined', atom(), kz_term:ne_binary(), 'undefined' | kz_time:now()) ->
Expand Down
6 changes: 6 additions & 0 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ handle_agent_message(JObj, Props) ->
handle_agent_message(JObj, Props, <<"connect_timeout">>) ->
'true' = kapi_acdc_queue:agent_timeout_v(JObj),
acdc_agent_fsm:agent_timeout(props:get_value('fsm_pid', Props), JObj);
handle_agent_message(JObj, Props, <<"shared_failure">>) ->
'true' = kapi_acdc_agent:shared_originate_failure_v(JObj),
acdc_agent_fsm:shared_failure(props:get_value('fsm_pid', Props), JObj);
handle_agent_message(JObj, Props, <<"shared_call_id">>) ->
'true' = kapi_acdc_agent:shared_call_id_v(JObj),
acdc_agent_fsm:shared_call_id(props:get_value('fsm_pid', Props), JObj);
handle_agent_message(_, _, _EvtName) ->
lager:debug("not handling agent event ~s", [_EvtName]).

Expand Down
11 changes: 10 additions & 1 deletion applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
,member_connect_resp/2
,member_connect_retry/2
,member_connect_accepted/1, member_connect_accepted/2
,monitor_connect_accepted/2
,agent_timeout/1
,bridge_to_member/6
,hangup_call/1
Expand Down Expand Up @@ -119,7 +120,7 @@
-define(BINDINGS(AcctId, AgentId), [{'self', []}
,{'acdc_agent', [{'account_id', AcctId}
,{'agent_id', AgentId}
,{'restrict_to', ['member_connect_win', 'sync', 'stats_req']}
,{'restrict_to', ['fsm_shared', 'member_connect_win', 'sync', 'stats_req']}
]}
,{'conf', [{'action', <<"*">>}
,{'db', kz_util:format_account_id(AcctId, 'encoded')}
Expand Down Expand Up @@ -215,6 +216,10 @@ member_connect_accepted(Srv) ->
member_connect_accepted(Srv, ACallId) ->
gen_listener:cast(Srv, {'member_connect_accepted', ACallId}).

-spec monitor_connect_accepted(pid(), kz_term:ne_binary()) -> 'ok'.
monitor_connect_accepted(Srv, ACallId) ->
gen_listener:cast(Srv, {'monitor_connect_accepted', ACallId}).

-spec hangup_call(pid()) -> 'ok'.
hangup_call(Srv) ->
gen_listener:cast(Srv, {'hangup_call'}).
Expand Down Expand Up @@ -681,6 +686,10 @@ handle_cast({'member_connect_accepted', ACallId}, #state{msg_queue_id=AmqpQueue
[send_agent_busy(AcctId, AgentId, QueueId) || QueueId <- Qs],
{'noreply', State#state{agent_call_ids=ACallIds1}, 'hibernate'};

handle_cast({'monitor_connect_accepted', ACallId}, #state{agent_call_ids=ACallIds}=State) ->
lager:debug("monitoring ~s", [ACallId]),
{'noreply', State#state{agent_call_ids=[ACallId | ACallIds]}, 'hibernate'};

handle_cast({'member_connect_resp', ReqJObj}, #state{agent_id=AgentId
,last_connect=LastConn
,agent_queues=Qs
Expand Down
Loading

0 comments on commit bf8ea2d

Please sign in to comment.