diff --git a/applications/acdc/src/acdc_queue_fsm.erl b/applications/acdc/src/acdc_queue_fsm.erl index 2ca792b0280..57a5bdf871a 100644 --- a/applications/acdc/src/acdc_queue_fsm.erl +++ b/applications/acdc/src/acdc_queue_fsm.erl @@ -9,7 +9,7 @@ -behaviour(gen_statem). %% API --export([start_link/3]). +-export([start_link/4]). %% Event injectors -export([member_call/3 @@ -55,7 +55,7 @@ -define(AGENT_RING_TIMEOUT, 5). -define(AGENT_RING_TIMEOUT_MESSAGE, 'agent_timer_expired'). --record(state, {queue_proc :: pid() +-record(state, {listener_proc :: kz_term:api_pid() ,manager_proc :: pid() ,connect_resps = [] :: kz_json:objects() ,collect_ref :: kz_term:api_reference() @@ -103,9 +103,9 @@ %% function does not return until Module:init/1 has returned. %% @end %%------------------------------------------------------------------------------ --spec start_link(pid(), pid(), kz_json:object()) -> kz_types:startlink_ret(). -start_link(MgrPid, ListenerPid, QueueJObj) -> - gen_statem:start_link(?SERVER, [MgrPid, ListenerPid, QueueJObj], []). +-spec start_link(pid(), pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> kz_types:startlink_ret(). +start_link(WorkerSup, MgrPid, AccountId, QueueId) -> + gen_statem:start_link(?SERVER, [WorkerSup, MgrPid, AccountId, QueueId], []). -spec refresh(pid(), kz_json:object()) -> 'ok'. refresh(ServerRef, QueueJObj) -> @@ -189,19 +189,21 @@ cdr_url(ServerRef) -> %% @end %%------------------------------------------------------------------------------ -spec init(list()) -> {'ok', atom(), state()}. -init([MgrPid, ListenerPid, QueueJObj]) -> - QueueId = kz_doc:id(QueueJObj), +init([WorkerSup, MgrPid, AccountId, QueueId]) -> kz_util:put_callid(<<"statem_", QueueId/binary, "_", (kz_term:to_binary(self()))/binary>>), webseq:start(?WSD_ID), webseq:reg_who(?WSD_ID, self(), iolist_to_binary([<<"qFSM">>, pid_to_list(self())])), + AccountDb = kz_util:format_account_db(AccountId), + {'ok', QueueJObj} = kz_datamgr:open_cache_doc(AccountDb, QueueId), + + gen_statem:cast(self(), {'get_listener_proc', WorkerSup}), {'ok' ,'ready' - ,#state{queue_proc = ListenerPid - ,manager_proc = MgrPid - ,account_id = kz_doc:account_id(QueueJObj) - ,account_db = kz_doc:account_db(QueueJObj) + ,#state{manager_proc = MgrPid + ,account_id = AccountId + ,account_db = AccountDb ,queue_id = QueueId ,name = kz_json:get_value(<<"name">>, QueueJObj) @@ -235,7 +237,11 @@ callback_mode() -> %% @end %%------------------------------------------------------------------------------ -spec ready(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()). -ready('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=QueueSrv +ready('cast', {'get_listener_proc', WorkerSup}, State) -> + ListenerSrv = acdc_queue_worker_sup:listener(WorkerSup), + lager:debug("got listener proc: ~p", [ListenerSrv]), + {'next_state', 'ready', State#state{listener_proc=ListenerSrv}}; +ready('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv ,manager_proc=MgrSrv }=State) -> Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, CallJObj)), @@ -247,7 +253,7 @@ ready('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=QueueSrv maybe_delay_connect_req(Call, CallJObj, Delivery, State); 'true' -> lager:debug("queue mgr said to ignore this call: ~s", [CallId]), - acdc_queue_listener:ignore_member_call(QueueSrv, Call, Delivery), + acdc_queue_listener:ignore_member_call(ListenerSrv, Call, Delivery), {'next_state', 'ready', State} end; ready('cast', {'agent_resp', _Resp}, State) -> @@ -288,11 +294,11 @@ ready({'call', From}, Event, State) -> %% @end %%------------------------------------------------------------------------------ -spec connect_req(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()). -connect_req('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=Srv}=State) -> +connect_req('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv}=State) -> lager:debug("recv a member_call while processing a different member"), CallId = kz_json:get_value(<<"Call-ID">>, CallJObj), webseq:evt(?WSD_ID, CallId, self(), <<"member call recv while busy">>), - acdc_queue_listener:cancel_member_call(Srv, CallJObj, Delivery), + acdc_queue_listener:cancel_member_call(ListenerSrv, CallJObj, Delivery), {'next_state', 'connect_req', State}; connect_req('cast', {'agent_resp', Resp}, #state{connect_resps=CRs @@ -320,7 +326,7 @@ connect_req('cast', {'retry', _RetryJObj}, State) -> lager:debug("recv retry response before win sent"), {'next_state', 'connect_req', State}; -connect_req('cast', {'member_hungup', JObj}, #state{queue_proc=Srv +connect_req('cast', {'member_hungup', JObj}, #state{listener_proc=ListenerSrv ,member_call=Call ,account_id=AccountId ,queue_id=QueueId @@ -332,7 +338,7 @@ connect_req('cast', {'member_hungup', JObj}, #state{queue_proc=Srv webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - abandon">>), - acdc_queue_listener:cancel_member_call(Srv, JObj), + acdc_queue_listener:cancel_member_call(ListenerSrv, JObj), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_HANGUP), {'next_state', 'ready', clear_member_call(State), 'hibernate'}; 'false' -> @@ -353,7 +359,7 @@ connect_req('cast', {'member_finished'}, #state{member_call=Call}=State) -> {'next_state', 'ready', clear_member_call(State), 'hibernate'}; connect_req('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF - ,queue_proc=Srv + ,listener_proc=ListenerSrv ,account_id=AccountId ,queue_id=QueueId ,member_call=Call @@ -362,7 +368,7 @@ connect_req('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF CallId = kapps_call:call_id(Call), webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - DTMF">>), - acdc_queue_listener:exit_member_call(Srv), + acdc_queue_listener:exit_member_call(ListenerSrv), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_EXIT), {'next_state', 'ready', clear_member_call(State), 'hibernate'}; @@ -401,7 +407,7 @@ connect_req('info', {'timeout', Ref, ?COLLECT_RESP_MESSAGE}, #state{collect_ref= ,connect_resps=[] ,manager_proc=MgrSrv ,member_call=Call - ,queue_proc=Srv + ,listener_proc=ListenerSrv ,account_id=AccountId ,queue_id=QueueId }=State) -> @@ -409,15 +415,15 @@ connect_req('info', {'timeout', Ref, ?COLLECT_RESP_MESSAGE}, #state{collect_ref= case acdc_queue_manager:should_ignore_member_call(MgrSrv, Call, AccountId, QueueId) of 'true' -> lager:debug("queue mgr said to ignore this call: ~s, not retrying agents", [kapps_call:call_id(Call)]), - acdc_queue_listener:finish_member_call(Srv), + acdc_queue_listener:finish_member_call(ListenerSrv), {'next_state', 'ready', State}; 'false' -> - maybe_connect_re_req(MgrSrv, Srv, State) + maybe_connect_re_req(MgrSrv, ListenerSrv, State) end; connect_req('info', {'timeout', Ref, ?COLLECT_RESP_MESSAGE}, #state{collect_ref=Ref}=State) -> {NextState, State1} = handle_agent_responses(State), {'next_state', NextState, State1}; -connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{queue_proc=Srv +connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{listener_proc=ListenerSrv ,connection_timer_ref=ConnRef ,account_id=AccountId ,queue_id=QueueId @@ -427,7 +433,7 @@ connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{qu CallId = kapps_call:call_id(Call), webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - timeout">>), - acdc_queue_listener:timeout_member_call(Srv), + acdc_queue_listener:timeout_member_call(ListenerSrv), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_TIMEOUT), {'next_state', 'ready', clear_member_call(State), 'hibernate'}. @@ -436,16 +442,16 @@ connect_req('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{qu %% @end %%------------------------------------------------------------------------------ -spec connecting(gen_statem:event_type(), any(), state()) -> kz_types:handle_fsm_ret(state()). -connecting('cast', {'member_call', CallJObj, Delivery}, #state{queue_proc=Srv}=State) -> +connecting('cast', {'member_call', CallJObj, Delivery}, #state{listener_proc=ListenerSrv}=State) -> lager:debug("recv a member_call while connecting"), - acdc_queue_listener:cancel_member_call(Srv, CallJObj, Delivery), + acdc_queue_listener:cancel_member_call(ListenerSrv, CallJObj, Delivery), {'next_state', 'connecting', State}; connecting('cast', {'agent_resp', _Resp}, State) -> lager:debug("agent resp must have just missed cutoff"), {'next_state', 'connecting', State}; -connecting('cast', {'accepted', AcceptJObj}, #state{queue_proc=Srv +connecting('cast', {'accepted', AcceptJObj}, #state{listener_proc=ListenerSrv ,member_call=Call ,account_id=AccountId ,queue_id=QueueId @@ -456,7 +462,7 @@ connecting('cast', {'accepted', AcceptJObj}, #state{queue_proc=Srv CallId = kapps_call:call_id(Call), webseq:evt(?WSD_ID, self(), CallId, <<"member call - agent acceptance">>), - acdc_queue_listener:finish_member_call(Srv, AcceptJObj), + acdc_queue_listener:finish_member_call(ListenerSrv, AcceptJObj), acdc_stats:call_handled(AccountId, QueueId, CallId ,kz_json:get_value(<<"Agent-ID">>, AcceptJObj) ), @@ -497,13 +503,13 @@ connecting('cast', {'retry', RetryJObj}, #state{agent_ring_timer_ref=AgentRef {'next_state', 'connecting', State} end; -connecting('cast', {'member_hungup', CallEvt}, #state{queue_proc=Srv +connecting('cast', {'member_hungup', CallEvt}, #state{listener_proc=ListenerSrv ,account_id=AccountId ,queue_id=QueueId ,member_call=Call }=State) -> lager:debug("caller hungup while we waited for the agent to connect"), - acdc_queue_listener:cancel_member_call(Srv, CallEvt), + acdc_queue_listener:cancel_member_call(ListenerSrv, CallEvt), CallId = kapps_call:call_id(Call), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_HANGUP), @@ -521,13 +527,13 @@ connecting('cast', {'member_finished'}, #state{member_call=Call}=State) -> end, {'next_state', 'ready', clear_member_call(State), 'hibernate'}; connecting('cast', {'dtmf_pressed', DTMF}, #state{caller_exit_key=DTMF - ,queue_proc=Srv + ,listener_proc=ListenerSrv ,account_id=AccountId ,queue_id=QueueId ,member_call=Call }=State) when is_binary(DTMF) -> lager:debug("member pressed the exit key (~s)", [DTMF]), - acdc_queue_listener:exit_member_call(Srv), + acdc_queue_listener:exit_member_call(ListenerSrv), CallId = kapps_call:call_id(Call), webseq:evt(?WSD_ID, self(), CallId, <<"member call finish - DTMF">>), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_EXIT), @@ -572,13 +578,13 @@ connecting({'call', From}, Event, State) -> connecting('info', {'timeout', AgentRef, ?AGENT_RING_TIMEOUT_MESSAGE}, #state{agent_ring_timer_ref=AgentRef ,member_call_winner=Winner - ,queue_proc=Srv + ,listener_proc=ListenerSrv }=State) -> lager:debug("timed out waiting for agent to pick up"), lager:debug("let's try another agent"), erlang:send(self(), {'timeout', 'undefined', ?COLLECT_RESP_MESSAGE}), - acdc_queue_listener:timeout_agent(Srv, Winner), + acdc_queue_listener:timeout_agent(ListenerSrv, Winner), {'next_state', 'connect_req', State#state{agent_ring_timer_ref='undefined' ,member_call_winner='undefined' @@ -586,7 +592,7 @@ connecting('info', {'timeout', AgentRef, ?AGENT_RING_TIMEOUT_MESSAGE}, #state{ag connecting('info', {'timeout', _OtherAgentRef, ?AGENT_RING_TIMEOUT_MESSAGE}, #state{agent_ring_timer_ref=_AgentRef}=State) -> lager:debug("unknown agent ref: ~p known: ~p", [_OtherAgentRef, _AgentRef]), {'next_state', 'connect_req', State}; -connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{queue_proc=Srv +connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{listener_proc=ListenerSrv ,connection_timer_ref=ConnRef ,account_id=AccountId ,queue_id=QueueId @@ -595,7 +601,7 @@ connecting('info', {'timeout', ConnRef, ?CONNECTION_TIMEOUT_MESSAGE}, #state{que }=State) -> lager:debug("connection timeout occurred, bounce the caller out of the queue"), - maybe_timeout_winner(Srv, Winner), + maybe_timeout_winner(ListenerSrv, Winner), CallId = kapps_call:call_id(Call), acdc_stats:call_abandoned(AccountId, QueueId, CallId, ?ABANDON_TIMEOUT), @@ -759,7 +765,7 @@ elapsed(Time) -> kz_time:elapsed_s(Time). %%------------------------------------------------------------------------------ -spec maybe_delay_connect_req(kapps_call:call(), kz_json:object(), gen_listener:basic_deliver(), state()) -> {'next_state', 'ready' | 'connect_req', state()}. -maybe_delay_connect_req(Call, CallJObj, Delivery, #state{queue_proc=QueueSrv +maybe_delay_connect_req(Call, CallJObj, Delivery, #state{listener_proc=ListenerSrv ,manager_proc=MgrSrv ,connection_timeout=ConnTimeout ,connection_timer_ref=ConnRef @@ -773,7 +779,7 @@ maybe_delay_connect_req(Call, CallJObj, Delivery, #state{queue_proc=QueueSrv webseq:note(?WSD_ID, self(), 'right', [CallId, <<": member call">>]), webseq:evt(?WSD_ID, CallId, self(), <<"member call received">>), - acdc_queue_listener:member_connect_req(QueueSrv, CallJObj, Delivery, Url), + acdc_queue_listener:member_connect_req(ListenerSrv, CallJObj, Delivery, Url), maybe_stop_timer(ConnRef), % stop the old one, maybe @@ -837,7 +843,7 @@ update_agent(Agent, Winner) -> -spec handle_agent_responses(state()) -> {atom(), state()}. handle_agent_responses(#state{collect_ref=Ref ,manager_proc=MgrSrv - ,queue_proc=Srv + ,listener_proc=ListenerSrv ,member_call=Call ,account_id=AccountId ,queue_id=QueueId @@ -846,7 +852,7 @@ handle_agent_responses(#state{collect_ref=Ref case acdc_queue_manager:should_ignore_member_call(MgrSrv, Call, AccountId, QueueId) of 'true' -> lager:debug("queue mgr said to ignore this call: ~s, not connecting to agents", [kapps_call:call_id(Call)]), - acdc_queue_listener:finish_member_call(Srv), + acdc_queue_listener:finish_member_call(ListenerSrv), {'ready', State}; 'false' -> lager:debug("done waiting for agents to respond, picking a winner"), @@ -855,7 +861,7 @@ handle_agent_responses(#state{collect_ref=Ref -spec maybe_pick_winner(state()) -> {atom(), state()}. maybe_pick_winner(#state{connect_resps=CRs - ,queue_proc=Srv + ,listener_proc=ListenerSrv ,manager_proc=Mgr ,agent_ring_timeout=RingTimeout ,agent_wrapup_time=AgentWrapup @@ -876,7 +882,7 @@ maybe_pick_winner(#state{connect_resps=CRs ,{<<"Notifications">>, Notifications} ], - _ = [acdc_queue_listener:member_connect_win(Srv, update_agent(Agent, Winner), QueueOpts) + _ = [acdc_queue_listener:member_connect_win(ListenerSrv, update_agent(Agent, Winner), QueueOpts) || Agent <- Agents ], @@ -891,7 +897,7 @@ maybe_pick_winner(#state{connect_resps=CRs 'undefined' -> lager:debug("no more responses to choose from"), - acdc_queue_listener:cancel_member_call(Srv), + acdc_queue_listener:cancel_member_call(ListenerSrv), {'ready', clear_member_call(State)} end. diff --git a/applications/acdc/src/acdc_queue_listener.erl b/applications/acdc/src/acdc_queue_listener.erl index a66fcb436c0..781dcb7d796 100644 --- a/applications/acdc/src/acdc_queue_listener.erl +++ b/applications/acdc/src/acdc_queue_listener.erl @@ -54,8 +54,6 @@ -record(state, {queue_id :: kz_term:ne_binary() ,account_id :: kz_term:ne_binary() - %% PIDs of the gang - ,worker_sup :: pid() ,mgr_pid :: pid() ,fsm_pid :: kz_term:api_pid() ,shared_pid :: kz_term:api_pid() @@ -204,13 +202,10 @@ delivery(Srv) -> init([WorkerSup, MgrPid, AccountId, QueueId]) -> kz_util:put_callid(QueueId), lager:debug("starting queue ~s", [QueueId]), - AccountDb = kz_util:format_account_id(AccountId, 'encoded'), - {'ok', QueueJObj} = kz_datamgr:open_cache_doc(AccountDb, QueueId), - gen_listener:cast(self(), {'start_friends', QueueJObj}), + gen_listener:cast(self(), {'get_friends', WorkerSup}), {'ok', #state{queue_id = QueueId ,account_id = AccountId ,my_id = acdc_util:proc_id() - ,worker_sup = WorkerSup ,mgr_pid = MgrPid }}. @@ -233,49 +228,15 @@ handle_call(_Request, _From, State) -> %% @doc Handling cast messages. %% @end %%------------------------------------------------------------------------------ -find_pid_from_supervisor({'ok', P}) when is_pid(P) -> - {'ok', P}; -find_pid_from_supervisor({'error', {'already_started', P}}) when is_pid(P) -> - {'ok', P}; -find_pid_from_supervisor(E) -> E. - --spec start_shared_queue(state(), pid(), kz_term:api_integer()) -> {'noreply', state()}. -start_shared_queue(#state{account_id=AccountId - ,queue_id=QueueId - ,worker_sup=WorkerSup - }=State, FSMPid, Priority) -> - {'ok', SharedPid} = - find_pid_from_supervisor( - acdc_queue_worker_sup:start_shared_queue(WorkerSup, FSMPid, AccountId, QueueId, Priority) - ), - lager:debug("started shared queue listener: ~p", [SharedPid]), - - {'noreply', State#state{fsm_pid = FSMPid - ,shared_pid = SharedPid - ,my_id = acdc_util:proc_id(FSMPid) - } - }. - -spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). -handle_cast({'start_friends', QueueJObj}, #state{worker_sup=WorkerSup - ,mgr_pid=MgrPid - }=State) -> - Priority = kz_json:get_integer_value(<<"max_priority">>, QueueJObj), - case find_pid_from_supervisor(acdc_queue_worker_sup:start_fsm(WorkerSup, MgrPid, QueueJObj)) of - {'ok', FSMPid} -> - lager:debug("started queue FSM: ~p", [FSMPid]), - start_shared_queue(State, FSMPid, Priority); - {'error', 'already_present'} -> - lager:debug("queue FSM is already present"), - case acdc_queue_worker_sup:fsm(WorkerSup) of - FSMPid when is_pid(FSMPid) -> - lager:debug("found queue FSM pid: ~p", [FSMPid]), - start_shared_queue(State, FSMPid, Priority); - 'undefined' -> - lager:debug("no queue FSM pid found"), - {'stop', 'failed_fsm', State} - end - end; +handle_cast({'get_friends', WorkerSup}, State) -> + FSMPid = acdc_queue_worker_sup:fsm(WorkerSup), + lager:debug("got queue FSM: ~p", [FSMPid]), + SharedPid = acdc_queue_worker_sup:shared_queue(WorkerSup), + lager:debug("got shared queue listener: ~p", [SharedPid]), + {'noreply', State#state{fsm_pid=FSMPid + ,shared_pid=SharedPid + }}; handle_cast({'gen_listener', {'created_queue', Q}}, #state{my_q='undefined'}=State) -> {'noreply', State#state{my_q=Q}, 'hibernate'}; diff --git a/applications/acdc/src/acdc_queue_manager.erl b/applications/acdc/src/acdc_queue_manager.erl index 3bf599d442b..a344eb7d71e 100644 --- a/applications/acdc/src/acdc_queue_manager.erl +++ b/applications/acdc/src/acdc_queue_manager.erl @@ -637,7 +637,7 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ start_secondary_queue(AccountId, QueueId) -> AccountDb = kz_util:format_account_db(AccountId), - Priority = lookup_priority_levels(AccountDb, QueueId), + Priority = acdc_util:max_priority(AccountDb, QueueId), kz_util:spawn(fun gen_listener:add_queue/4 ,[self() ,?SECONDARY_QUEUE_NAME(QueueId) @@ -647,13 +647,6 @@ start_secondary_queue(AccountId, QueueId) -> ,?SECONDARY_BINDINGS(AccountId, QueueId) ]). --spec lookup_priority_levels(kz_term:ne_binary(), kz_term:ne_binary()) -> kz_term:api_integer(). -lookup_priority_levels(AccountDB, QueueId) -> - case kz_datamgr:open_cache_doc(AccountDB, QueueId) of - {'ok', JObj} -> kz_json:get_value(<<"max_priority">>, JObj); - _ -> 'undefined' - end. - make_ignore_key(AccountId, QueueId, CallId) -> {AccountId, QueueId, CallId}. diff --git a/applications/acdc/src/acdc_queue_shared.erl b/applications/acdc/src/acdc_queue_shared.erl index 3c5b34905cf..293b1c974e0 100644 --- a/applications/acdc/src/acdc_queue_shared.erl +++ b/applications/acdc/src/acdc_queue_shared.erl @@ -29,7 +29,7 @@ -define(SERVER, ?MODULE). --record(state, {fsm_pid :: pid() +-record(state, {fsm_pid :: kz_term:api_pid() ,deliveries = [] :: deliveries() }). -type state() :: #state{}. @@ -64,15 +64,16 @@ %% @doc Starts the server. %% @end %%------------------------------------------------------------------------------ --spec start_link(kz_types:server_ref(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:api_integer()) -> kz_types:startlink_ret(). -start_link(FSMPid, AcctId, QueueId, Priority) -> +-spec start_link(pid(), pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> kz_types:startlink_ret(). +start_link(WorkerSup, _, AccountId, QueueId) -> + Priority = acdc_util:max_priority(kz_util:format_account_db(AccountId), QueueId), gen_listener:start_link(?SERVER - ,[{'bindings', ?SHARED_QUEUE_BINDINGS(AcctId, QueueId)} + ,[{'bindings', ?SHARED_QUEUE_BINDINGS(AccountId, QueueId)} ,{'responders', ?RESPONDERS} - ,{'queue_name', kapi_acdc_queue:shared_queue_name(AcctId, QueueId)} + ,{'queue_name', kapi_acdc_queue:shared_queue_name(AccountId, QueueId)} | ?SHARED_BINDING_OPTIONS(Priority) ] - ,[FSMPid] + ,[WorkerSup] ). -spec ack(kz_types:server_ref(), gen_listener:basic_deliver()) -> 'ok'. @@ -98,11 +99,12 @@ deliveries(Srv) -> %% @end %%------------------------------------------------------------------------------ -spec init([pid()]) -> {'ok', state()}. -init([FSMPid]) -> +init([WorkerSup]) -> kz_util:put_callid(?DEFAULT_LOG_SYSTEM_ID), - lager:debug("shared queue proc started, sending messages to FSM ~p", [FSMPid]), - {'ok', #state{fsm_pid=FSMPid}}. + lager:debug("shared queue proc started"), + gen_listener:cast(self(), {'get_fsm_proc', WorkerSup}), + {'ok', #state{}}. %%------------------------------------------------------------------------------ %% @doc Handling call messages. @@ -119,6 +121,10 @@ handle_call(_Request, _From, State) -> %% @end %%------------------------------------------------------------------------------ -spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). +handle_cast({'get_fsm_proc', WorkerSup}, State) -> + FSMPid = acdc_queue_worker_sup:fsm(WorkerSup), + lager:debug("sending messages to FSM ~p", [FSMPid]), + {'noreply', State#state{fsm_pid=FSMPid}}; handle_cast({'delivery', Delivery}, #state{deliveries=Ds}=State) -> {'noreply', State#state{deliveries=[Delivery|Ds]}}; handle_cast({'ack', Delivery}, #state{deliveries=Ds}=State) -> diff --git a/applications/acdc/src/acdc_queue_worker_sup.erl b/applications/acdc/src/acdc_queue_worker_sup.erl index 89e72d972b4..73e4a2f5a8e 100644 --- a/applications/acdc/src/acdc_queue_worker_sup.erl +++ b/applications/acdc/src/acdc_queue_worker_sup.erl @@ -16,15 +16,18 @@ -export([start_link/3 ,stop/1 ,listener/1 - ,shared_queue/1, start_shared_queue/5 - ,fsm/1, start_fsm/3 + ,shared_queue/1 + ,fsm/1 ,status/1 ]). %% Supervisor callbacks -export([init/1]). --define(CHILDREN, [?WORKER_ARGS('acdc_queue_listener', [self() | Args])]). +-define(CHILDREN, [?WORKER_ARGS('acdc_queue_listener', [self() | Args]) + ,?WORKER_ARGS('acdc_queue_shared', [self() | Args]) + ,?WORKER_ARGS('acdc_queue_fsm', [self() | Args]) + ]). %%%============================================================================= %%% API functions @@ -55,10 +58,6 @@ shared_queue(WorkerSup) -> [P] -> P end. --spec start_shared_queue(pid(), pid(), kz_term:ne_binary(), kz_term:ne_binary(), kz_term:api_integer()) -> kz_types:sup_startchild_ret(). -start_shared_queue(WorkerSup, FSMPid, AcctId, QueueId, Priority) -> - supervisor:start_child(WorkerSup, ?WORKER_ARGS('acdc_queue_shared', [FSMPid, AcctId, QueueId, Priority])). - -spec fsm(pid()) -> kz_term:api_pid(). fsm(WorkerSup) -> case child_of_type(WorkerSup, 'acdc_queue_fsm') of @@ -66,11 +65,6 @@ fsm(WorkerSup) -> [P] -> P end. --spec start_fsm(pid(), pid(), kz_json:object()) -> kz_types:sup_startchild_ret(). -start_fsm(WorkerSup, MgrPid, QueueJObj) -> - ListenerPid = self(), - supervisor:start_child(WorkerSup, ?WORKER_ARGS('acdc_queue_fsm', [MgrPid, ListenerPid, QueueJObj])). - -spec child_of_type(pid(), atom()) -> [pid()]. child_of_type(WSup, T) -> [P || {Type, P,'worker', [_]} <- supervisor:which_children(WSup), T =:= Type]. diff --git a/applications/acdc/src/acdc_util.erl b/applications/acdc/src/acdc_util.erl index 4f798f60d01..683ee041c8e 100644 --- a/applications/acdc/src/acdc_util.erl +++ b/applications/acdc/src/acdc_util.erl @@ -18,6 +18,7 @@ ,presence_update/3, presence_update/4 ,send_cdr/2 ,hangup_cause/1 + ,max_priority/2 ]). -include("acdc.hrl"). @@ -146,3 +147,14 @@ hangup_cause(JObj) -> 'undefined' -> <<"unknown">>; Cause -> Cause end. + +-spec max_priority(kz_term:ne_binary(), kz_term:ne_binary()) -> kz_term:api_integer(). +max_priority(AccountDb, QueueId) -> + case kz_datamgr:open_cache_doc(AccountDb, QueueId) of + {'ok', QueueJObj} -> max_priority(QueueJObj); + _ -> 'undefined' + end. + +-spec max_priority(kz_json:object()) -> kz_term:api_integer(). +max_priority(QueueJObj) -> + kz_json:get_integer_value(<<"max_priority">>, QueueJObj).