Skip to content

Commit

Permalink
PISTON-625: do not pass pids as start_link args (2600hz#6571)
Browse files Browse the repository at this point in the history
They get re-used on restart
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jun 4, 2020
1 parent f22e119 commit 9bd3339
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 120 deletions.
92 changes: 49 additions & 43 deletions applications/acdc/src/acdc_queue_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
-behaviour(gen_statem).

%% API
-export([start_link/3]).
-export([start_link/4]).

%% Event injectors
-export([member_call/3
Expand Down Expand Up @@ -55,7 +55,7 @@
-define(AGENT_RING_TIMEOUT, 5).
-define(AGENT_RING_TIMEOUT_MESSAGE, 'agent_timer_expired').

-record(state, {queue_proc :: pid()
-record(state, {listener_proc :: kz_term:api_pid()
,manager_proc :: pid()
,connect_resps = [] :: kz_json:objects()
,collect_ref :: kz_term:api_reference()
Expand Down Expand Up @@ -103,9 +103,9 @@
%% function does not return until Module:init/1 has returned.
%% @end
%%------------------------------------------------------------------------------
-spec start_link(pid(), pid(), kz_json:object()) -> kz_types:startlink_ret().
start_link(MgrPid, ListenerPid, QueueJObj) ->
gen_statem:start_link(?SERVER, [MgrPid, ListenerPid, QueueJObj], []).
-spec start_link(pid(), pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> kz_types:startlink_ret().
start_link(WorkerSup, MgrPid, AccountId, QueueId) ->
gen_statem:start_link(?SERVER, [WorkerSup, MgrPid, AccountId, QueueId], []).

-spec refresh(pid(), kz_json:object()) -> 'ok'.
refresh(ServerRef, QueueJObj) ->
Expand Down Expand Up @@ -189,19 +189,21 @@ cdr_url(ServerRef) ->
%% @end
%%------------------------------------------------------------------------------
-spec init(list()) -> {'ok', atom(), state()}.
init([MgrPid, ListenerPid, QueueJObj]) ->
QueueId = kz_doc:id(QueueJObj),
init([WorkerSup, MgrPid, AccountId, QueueId]) ->
kz_util:put_callid(<<"statem_", QueueId/binary, "_", (kz_term:to_binary(self()))/binary>>),

webseq:start(?WSD_ID),
webseq:reg_who(?WSD_ID, self(), iolist_to_binary([<<"qFSM">>, pid_to_list(self())])),

AccountDb = kz_util:format_account_db(AccountId),
{'ok', QueueJObj} = kz_datamgr:open_cache_doc(AccountDb, QueueId),

gen_statem:cast(self(), {'get_listener_proc', WorkerSup}),
{'ok'
,'ready'
,#state{queue_proc = ListenerPid
,manager_proc = MgrPid
,account_id = kz_doc:account_id(QueueJObj)
,account_db = kz_doc:account_db(QueueJObj)
,#state{manager_proc = MgrPid
,account_id = AccountId
,account_db = AccountDb
,queue_id = QueueId

,name = kz_json:get_value(<<"name">>, QueueJObj)
Expand Down Expand Up @@ -235,7 +237,11 @@ callback_mode() ->
%% @end
%%------------------------------------------------------------------------------
-spec ready(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
ready('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=QueueSrv
ready('cast', {'get_listener_proc', WorkerSup}, State) ->
ListenerSrv = acdc_queue_worker_sup:listener(WorkerSup),
lager:debug("got listener proc: ~p", [ListenerSrv]),
{'next_state', 'ready', State#state{listener_proc=ListenerSrv}};
ready('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv
,manager_proc=MgrSrv
}=State) ->
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, CallJObj)),
Expand All @@ -247,7 +253,7 @@ ready('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=QueueSrv
maybe_delay_connect_req(Call, CallJObj, Delivery, State);
'true' ->
lager:debug("queue mgr said to ignore this call: ~s", [CallId]),
acdc_queue_listener:ignore_member_call(QueueSrv, Call, Delivery),
acdc_queue_listener:ignore_member_call(ListenerSrv, Call, Delivery),
{'next_state', 'ready', State}
end;
ready('cast', {'agent_resp', _Resp}, State) ->
Expand Down Expand Up @@ -288,11 +294,11 @@ ready({'call', From}, Event, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec connect_req(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
connect_req('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=Srv}=State) ->
connect_req('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv}=State) ->
lager:debug("recv a member_call while processing a different member"),
CallId = kz_json:get_value(<<"Call-ID">>, CallJObj),
webseq:evt(?WSD_ID, CallId, self(), <<"member call recv while busy">>),
acdc_queue_listener:cancel_member_call(Srv, CallJObj, Delivery),
acdc_queue_listener:cancel_member_call(ListenerSrv, CallJObj, Delivery),
{'next_state', 'connect_req', State};

connect_req('cast', {'agent_resp', Resp}, #state{connect_resps=CRs
Expand Down Expand Up @@ -320,7 +326,7 @@ connect_req('cast', {'retry', _RetryJObj}, State) ->
lager:debug("recv retry response before win sent"),
{'next_state', 'connect_req', State};

connect_req('cast', {'member_hungup', JObj}, #state{queue_proc=Srv
connect_req('cast', {'member_hungup', JObj}, #state{listener_proc=ListenerSrv
,member_call=Call
,account_id=AccountId
,queue_id=QueueId
Expand All @@ -332,7 +338,7 @@ connect_req('cast', {'member_hungup', JObj}, #state{queue_proc=Srv

webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - abandon">>),

acdc_queue_listener:cancel_member_call(Srv, JObj),
acdc_queue_listener:cancel_member_call(ListenerSrv, JObj),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_HANGUP),
{'next_state', 'ready', clear_member_call(State), 'hibernate'};
'false' ->
Expand All @@ -353,7 +359,7 @@ connect_req('cast', {'member_finished'}, #state{member_call=Call}=State) ->
{'next_state', 'ready', clear_member_call(State), 'hibernate'};

connect_req('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF
,queue_proc=Srv
,listener_proc=ListenerSrv
,account_id=AccountId
,queue_id=QueueId
,member_call=Call
Expand All @@ -362,7 +368,7 @@ connect_req('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF
CallId = kapps_call:call_id(Call),
webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - DTMF">>),

acdc_queue_listener:exit_member_call(Srv),
acdc_queue_listener:exit_member_call(ListenerSrv),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_EXIT),
{'next_state', 'ready', clear_member_call(State), 'hibernate'};

Expand Down Expand Up @@ -401,23 +407,23 @@ connect_req('info', {'timeout', Ref, ?COLLECT_RESP_MESSAGE}, #state{collect_ref=
,connect_resps=[]
,manager_proc=MgrSrv
,member_call=Call
,queue_proc=Srv
,listener_proc=ListenerSrv
,account_id=AccountId
,queue_id=QueueId
}=State) ->
maybe_stop_timer(Ref),
case acdc_queue_manager:should_ignore_member_call(MgrSrv, Call, AccountId, QueueId) of
'true' ->
lager:debug("queue mgr said to ignore this call: ~s, not retrying agents", [kapps_call:call_id(Call)]),
acdc_queue_listener:finish_member_call(Srv),
acdc_queue_listener:finish_member_call(ListenerSrv),
{'next_state', 'ready', State};
'false' ->
maybe_connect_re_req(MgrSrv, Srv, State)
maybe_connect_re_req(MgrSrv, ListenerSrv, State)
end;
connect_req('info', {'timeout', Ref, ?COLLECT_RESP_MESSAGE}, #state{collect_ref=Ref}=State) ->
{NextState, State1} = handle_agent_responses(State),
{'next_state', NextState, State1};
connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{queue_proc=Srv
connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{listener_proc=ListenerSrv
,connection_timer_ref=ConnRef
,account_id=AccountId
,queue_id=QueueId
Expand All @@ -427,7 +433,7 @@ connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{qu
CallId = kapps_call:call_id(Call),
webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - timeout">>),

acdc_queue_listener:timeout_member_call(Srv),
acdc_queue_listener:timeout_member_call(ListenerSrv),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_TIMEOUT),
{'next_state', 'ready', clear_member_call(State), 'hibernate'}.

Expand All @@ -436,16 +442,16 @@ connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{qu
%% @end
%%------------------------------------------------------------------------------
-spec connecting(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()).
connecting('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=Srv}=State) ->
connecting('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv}=State) ->
lager:debug("recv a member_call while connecting"),
acdc_queue_listener:cancel_member_call(Srv, CallJObj, Delivery),
acdc_queue_listener:cancel_member_call(ListenerSrv, CallJObj, Delivery),
{'next_state', 'connecting', State};

connecting('cast', {'agent_resp', _Resp}, State) ->
lager:debug("agent resp must have just missed cutoff"),
{'next_state', 'connecting', State};

connecting('cast', {'accepted', AcceptJObj}, #state{queue_proc=Srv
connecting('cast', {'accepted', AcceptJObj}, #state{listener_proc=ListenerSrv
,member_call=Call
,account_id=AccountId
,queue_id=QueueId
Expand All @@ -456,7 +462,7 @@ connecting('cast', {'accepted', AcceptJObj}, #state{queue_proc=Srv
CallId = kapps_call:call_id(Call),
webseq:evt(?WSD_ID, self(), CallId, <<"member call - agent acceptance">>),

acdc_queue_listener:finish_member_call(Srv, AcceptJObj),
acdc_queue_listener:finish_member_call(ListenerSrv, AcceptJObj),
acdc_stats:call_handled(AccountId, QueueId, CallId
,kz_json:get_value(<<"Agent-ID">>, AcceptJObj)
),
Expand Down Expand Up @@ -497,13 +503,13 @@ connecting('cast', {'retry', RetryJObj}, #state{agent_ring_timer_ref=AgentRef
{'next_state', 'connecting', State}
end;

connecting('cast', {'member_hungup', CallEvt}, #state{queue_proc=Srv
connecting('cast', {'member_hungup', CallEvt}, #state{listener_proc=ListenerSrv
,account_id=AccountId
,queue_id=QueueId
,member_call=Call
}=State) ->
lager:debug("caller hungup while we waited for the agent to connect"),
acdc_queue_listener:cancel_member_call(Srv, CallEvt),
acdc_queue_listener:cancel_member_call(ListenerSrv, CallEvt),
CallId = kapps_call:call_id(Call),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_HANGUP),

Expand All @@ -521,13 +527,13 @@ connecting('cast', {'member_finished'}, #state{member_call=Call}=State) ->
end,
{'next_state', 'ready', clear_member_call(State), 'hibernate'};
connecting('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF
,queue_proc=Srv
,listener_proc=ListenerSrv
,account_id=AccountId
,queue_id=QueueId
,member_call=Call
}=State) when is_binary(DTMF) ->
lager:debug("member pressed the exit key (~s)", [DTMF]),
acdc_queue_listener:exit_member_call(Srv),
acdc_queue_listener:exit_member_call(ListenerSrv),
CallId = kapps_call:call_id(Call),
webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - DTMF">>),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_EXIT),
Expand Down Expand Up @@ -572,21 +578,21 @@ connecting({'call', From}, Event, State) ->

connecting('info', {'timeout', AgentRef, ?AGENT_RING_TIMEOUT_MESSAGE}, #state{agent_ring_timer_ref=AgentRef
,member_call_winner=Winner
,queue_proc=Srv
,listener_proc=ListenerSrv
}=State) ->
lager:debug("timed out waiting for agent to pick up"),
lager:debug("let's try another agent"),
erlang:send(self(), {'timeout', 'undefined', ?COLLECT_RESP_MESSAGE}),

acdc_queue_listener:timeout_agent(Srv, Winner),
acdc_queue_listener:timeout_agent(ListenerSrv, Winner),

{'next_state', 'connect_req', State#state{agent_ring_timer_ref='undefined'
,member_call_winner='undefined'
}};
connecting('info', {'timeout', _OtherAgentRef, ?AGENT_RING_TIMEOUT_MESSAGE}, #state{agent_ring_timer_ref=_AgentRef}=State) ->
lager:debug("unknown agent ref: ~p known: ~p", [_OtherAgentRef, _AgentRef]),
{'next_state', 'connect_req', State};
connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{queue_proc=Srv
connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{listener_proc=ListenerSrv
,connection_timer_ref=ConnRef
,account_id=AccountId
,queue_id=QueueId
Expand All @@ -595,7 +601,7 @@ connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{que
}=State) ->
lager:debug("connection timeout occurred, bounce the caller out of the queue"),

maybe_timeout_winner(Srv, Winner),
maybe_timeout_winner(ListenerSrv, Winner),
CallId = kapps_call:call_id(Call),
acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_TIMEOUT),

Expand Down Expand Up @@ -759,7 +765,7 @@ elapsed(Time) -> kz_time:elapsed_s(Time).
%%------------------------------------------------------------------------------
-spec maybe_delay_connect_req(kapps_call:call(), kz_json:object(), gen_listener:basic_deliver(), state()) ->
{'next_state', 'ready' | 'connect_req', state()}.
maybe_delay_connect_req(Call, CallJObj, Delivery, #state{queue_proc=QueueSrv
maybe_delay_connect_req(Call, CallJObj, Delivery, #state{listener_proc=ListenerSrv
,manager_proc=MgrSrv
,connection_timeout=ConnTimeout
,connection_timer_ref=ConnRef
Expand All @@ -773,7 +779,7 @@ maybe_delay_connect_req(Call, CallJObj, Delivery, #state{queue_proc=QueueSrv
webseq:note(?WSD_ID, self(), 'right', [CallId, <<": member call">>]),
webseq:evt(?WSD_ID, CallId, self(), <<"member call received">>),

acdc_queue_listener:member_connect_req(QueueSrv, CallJObj, Delivery, Url),
acdc_queue_listener:member_connect_req(ListenerSrv, CallJObj, Delivery, Url),

maybe_stop_timer(ConnRef), % stop the old one, maybe

Expand Down Expand Up @@ -837,7 +843,7 @@ update_agent(Agent, Winner) ->
-spec handle_agent_responses(state()) -> {atom(), state()}.
handle_agent_responses(#state{collect_ref=Ref
,manager_proc=MgrSrv
,queue_proc=Srv
,listener_proc=ListenerSrv
,member_call=Call
,account_id=AccountId
,queue_id=QueueId
Expand All @@ -846,7 +852,7 @@ handle_agent_responses(#state{collect_ref=Ref
case acdc_queue_manager:should_ignore_member_call(MgrSrv, Call, AccountId, QueueId) of
'true' ->
lager:debug("queue mgr said to ignore this call: ~s, not connecting to agents", [kapps_call:call_id(Call)]),
acdc_queue_listener:finish_member_call(Srv),
acdc_queue_listener:finish_member_call(ListenerSrv),
{'ready', State};
'false' ->
lager:debug("done waiting for agents to respond, picking a winner"),
Expand All @@ -855,7 +861,7 @@ handle_agent_responses(#state{collect_ref=Ref

-spec maybe_pick_winner(state()) -> {atom(), state()}.
maybe_pick_winner(#state{connect_resps=CRs
,queue_proc=Srv
,listener_proc=ListenerSrv
,manager_proc=Mgr
,agent_ring_timeout=RingTimeout
,agent_wrapup_time=AgentWrapup
Expand All @@ -876,7 +882,7 @@ maybe_pick_winner(#state{connect_resps=CRs
,{<<"Notifications">>, Notifications}
],

_ = [acdc_queue_listener:member_connect_win(Srv, update_agent(Agent, Winner), QueueOpts)
_ = [acdc_queue_listener:member_connect_win(ListenerSrv, update_agent(Agent, Winner), QueueOpts)
|| Agent <- Agents
],

Expand All @@ -891,7 +897,7 @@ maybe_pick_winner(#state{connect_resps=CRs
'undefined' ->
lager:debug("no more responses to choose from"),

acdc_queue_listener:cancel_member_call(Srv),
acdc_queue_listener:cancel_member_call(ListenerSrv),
{'ready', clear_member_call(State)}
end.

Expand Down
Loading

0 comments on commit 9bd3339

Please sign in to comment.