From 4b05d9d27a1377b1e52799103d94c36a87ab8ffb Mon Sep 17 00:00:00 2001 From: Daniel Finke Date: Thu, 17 Dec 2020 22:41:23 +0000 Subject: [PATCH] PISTON-1177: new approach to determine initial availability for agents of queue - when acdc_queue_manager boots, do not immediately assume all member agents are available - when manager proc is ready, pub msg so that agents send an availability update - any subsequent agent starts/availability changes will already be picked up by existing bindings --- applications/acdc/src/acdc_agent_fsm.erl | 21 +++-- applications/acdc/src/acdc_agent_handler.erl | 14 ++++ applications/acdc/src/acdc_agent_listener.erl | 72 +++++++++--------- applications/acdc/src/acdc_queue_manager.erl | 76 +++++-------------- applications/acdc/src/kapi_acdc_queue.erl | 67 +++++++++++++++- applications/crossbar/priv/api/swagger.json | 30 ++++++++ .../kapi.acdc_queue.started_notif.json | 32 ++++++++ 7 files changed, 208 insertions(+), 104 deletions(-) create mode 100644 applications/crossbar/priv/couchdb/schemas/kapi.acdc_queue.started_notif.json diff --git a/applications/acdc/src/acdc_agent_fsm.erl b/applications/acdc/src/acdc_agent_fsm.erl index c5a2e4e4046..ee37e3b0b53 100644 --- a/applications/acdc/src/acdc_agent_fsm.erl +++ b/applications/acdc/src/acdc_agent_fsm.erl @@ -28,6 +28,7 @@ ,end_wrapup/1 ,add_acdc_queue/2, rm_acdc_queue/2 + ,send_availability_update/2 ,update_presence/3 ,agent_logout/1 ,refresh/2 @@ -308,6 +309,14 @@ add_acdc_queue(ServerRef, QueueId) -> rm_acdc_queue(ServerRef, QueueId) -> gen_statem:cast(ServerRef, {'rm_acdc_queue', QueueId}). +%%------------------------------------------------------------------------------ +%% @doc Send an availability update +%% @end +%%------------------------------------------------------------------------------ +-spec send_availability_update(kz_types:server_ref(), kz_term:ne_binary()) -> 'ok'. +send_availability_update(ServerRef, QueueId) -> + gen_statem:cast(ServerRef, {'send_availability_update', QueueId}). + %%------------------------------------------------------------------------------ %% @doc %% @end @@ -1398,6 +1407,9 @@ handle_event({'add_acdc_queue', QueueId}, StateName, #state{agent_listener=Agent handle_event({'rm_acdc_queue', QueueId}, StateName, #state{agent_listener=AgentListener}=State) -> acdc_agent_listener:rm_acdc_queue(AgentListener, QueueId), {'next_state', StateName, State}; +handle_event({'send_availability_update', QueueId}, StateName, #state{agent_listener=AgentListener}=State) -> + acdc_agent_listener:send_availability_update(AgentListener, StateName, QueueId), + {'next_state', StateName, State}; handle_event({'update_presence', PresenceId, PresenceState}, 'ready', State) -> handle_presence_update(PresenceId, PresenceState, State), {'next_state', 'ready', State}; @@ -1967,14 +1979,11 @@ apply_state_updates_fold({_, StateName, #state{account_id=AccountId ,pause_alias=Alias }}=Acc, []) -> lager:debug("resulting agent state ~s", [StateName]), + acdc_agent_listener:send_availability_update(AgentListener, StateName), case StateName of - 'ready' -> - acdc_agent_listener:send_agent_available(AgentListener), - acdc_agent_stats:agent_ready(AccountId, AgentId); + 'ready' -> acdc_agent_stats:agent_ready(AccountId, AgentId); 'wrapup' -> acdc_agent_stats:agent_wrapup(AccountId, AgentId, time_left(WRef)); - 'paused' -> - acdc_agent_listener:send_agent_busy(AgentListener), - acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias) + 'paused' -> acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias) end, Acc; apply_state_updates_fold({_, _, State}, [{'pause', Timeout, Alias}|Updates]) -> diff --git a/applications/acdc/src/acdc_agent_handler.erl b/applications/acdc/src/acdc_agent_handler.erl index 6a17454311c..5cc4424b771 100644 --- a/applications/acdc/src/acdc_agent_handler.erl +++ b/applications/acdc/src/acdc_agent_handler.erl @@ -19,6 +19,7 @@ ,handle_agent_message/2 ,handle_config_change/2 ,handle_presence_probe/2 + ,handle_queue_started_notif/2 ]). -include("acdc.hrl"). @@ -490,3 +491,16 @@ maybe_update_presence(Sup, JObj, PresenceState) -> APid = acdc_agent_sup:listener(Sup), acdc_agent_listener:maybe_update_presence_id(APid, presence_id(JObj)), acdc_agent_listener:presence_update(APid, presence_state(JObj, PresenceState)). + +%%------------------------------------------------------------------------------ +%% @doc When a queue this agent is bound to (a member of) is started, handle the +%% notif by sending an availability update to the queue so it knows the +%% availability state of this agent for taking calls. +%% @end +%%------------------------------------------------------------------------------ +-spec handle_queue_started_notif(kz_json:object(), kz_term:proplist()) -> 'ok'. +handle_queue_started_notif(JObj, Props) -> + 'true' = kapi_acdc_queue:started_notif_v(JObj), + FSM = props:get_value('fsm_pid', Props), + QueueId = kz_json:get_ne_binary_value(<<"Queue-ID">>, JObj), + acdc_agent_fsm:send_availability_update(FSM, QueueId). diff --git a/applications/acdc/src/acdc_agent_listener.erl b/applications/acdc/src/acdc_agent_listener.erl index e260c4c274e..6d0c1edbd22 100644 --- a/applications/acdc/src/acdc_agent_listener.erl +++ b/applications/acdc/src/acdc_agent_listener.erl @@ -24,8 +24,7 @@ ,originate_execute/2 ,originate_uuid/3 ,outbound_call/2 - ,send_agent_available/1 - ,send_agent_busy/1 + ,send_availability_update/2, send_availability_update/3 ,send_sync_req/1 ,send_sync_resp/3, send_sync_resp/4 ,config/1, refresh_config/3 @@ -154,6 +153,9 @@ ,{{'acdc_agent_handler', 'handle_config_change'} ,[{<<"configuration">>, <<"*">>}] } + ,{{'acdc_agent_handler', 'handle_queue_started_notif'} + ,[{<<"queue">>, <<"started_notif">>}] + } ]). %%%============================================================================= @@ -254,13 +256,13 @@ originate_uuid(Srv, UUID, CtlQ) -> outbound_call(Srv, CallId) -> gen_listener:cast(Srv, {'outbound_call', CallId}). --spec send_agent_available(pid()) -> 'ok'. -send_agent_available(Srv) -> - gen_listener:cast(Srv, 'send_agent_available'). +-spec send_availability_update(pid(), fsm_state_name()) -> 'ok'. +send_availability_update(Srv, StateName) -> + gen_listener:cast(Srv, {'send_availability_update', StateName}). --spec send_agent_busy(pid()) -> 'ok'. -send_agent_busy(Srv) -> - gen_listener:cast(Srv, 'send_agent_busy'). +-spec send_availability_update(pid(), fsm_state_name(), kz_term:ne_binary()) -> 'ok'. +send_availability_update(Srv, StateName, QueueId) -> + gen_listener:cast(Srv, {'send_availability_update', StateName, QueueId}). -spec send_sync_req(pid()) -> 'ok'. send_sync_req(Srv) -> gen_listener:cast(Srv, {'send_sync_req'}). @@ -410,16 +412,14 @@ handle_cast({'fsm_started', FSMPid}, State) -> handle_cast({'gen_listener', {'created_queue', Q}}, State) -> {'noreply', State#state{my_q=Q}, 'hibernate'}; -handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs - ,acct_id=AcctId - ,agent_id=AgentId - }=State) when is_binary(Q) -> +handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs}=State) when is_binary(Q) -> case lists:member(Q, Qs) of 'true' -> lager:debug("queue ~s already added", [Q]), + do_send_availability_update(Q, StateName, State), {'noreply', State}; 'false' -> - add_queue_binding(AcctId, AgentId, Q, StateName), + add_queue_binding(Q, StateName, State), {'noreply', State#state{agent_queues=[Q|Qs]}} end; @@ -445,11 +445,8 @@ handle_cast({'rm_acdc_queue', Q}, #state{agent_queues=Qs {'noreply', State} end; -handle_cast('bind_to_member_reqs', #state{agent_queues=Qs - ,acct_id=AcctId - ,agent_id=AgentId - }=State) -> - _ = [add_queue_binding(AcctId, AgentId, Q, 'ready') || Q <- Qs], +handle_cast('bind_to_member_reqs', #state{agent_queues=Qs}=State) -> + _ = [add_queue_binding(Q, 'ready', State) || Q <- Qs], {'noreply', State}; handle_cast({'rebind_events', OldCallId, NewCallId}, State) -> @@ -739,18 +736,12 @@ handle_cast({'outbound_call', CallId}, #state{agent_id=AgentId lager:debug("bound to agent's outbound call ~s", [CallId]), {'noreply', State#state{call=kapps_call:set_call_id(CallId, kapps_call:new())}, 'hibernate'}; -handle_cast('send_agent_available', #state{agent_id=AgentId - ,acct_id=AcctId - ,agent_queues=Qs - }=State) -> - [send_agent_available(AcctId, AgentId, QueueId) || QueueId <- Qs], +handle_cast({'send_availability_update', StateName}, #state{agent_queues=Qs}=State) -> + [do_send_availability_update(QueueId, StateName, State) || QueueId <- Qs], {'noreply', State}; -handle_cast('send_agent_busy', #state{agent_id=AgentId - ,acct_id=AcctId - ,agent_queues=Qs - }=State) -> - [send_agent_busy(AcctId, AgentId, QueueId) || QueueId <- Qs], +handle_cast({'send_availability_update', StateName, QueueId}, State) -> + do_send_availability_update(QueueId, StateName, State), {'noreply', State}; handle_cast({'send_sync_req'}, #state{my_id=MyId @@ -1093,9 +1084,11 @@ outbound_call_id(CallId, AgentId) when is_binary(CallId) -> outbound_call_id(Call, AgentId) -> outbound_call_id(kapps_call:call_id(Call), AgentId). --spec add_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) -> +-spec add_queue_binding(kz_term:ne_binary(), fsm_state_name(), state()) -> 'ok'. -add_queue_binding(AcctId, AgentId, QueueId, StateName) -> +add_queue_binding(QueueId, StateName, #state{agent_id=AgentId + ,acct_id=AcctId + }=State) -> lager:debug("adding queue binding for ~s", [QueueId]), Body = kz_json:from_list([{<<"agent_id">>, AgentId} ,{<<"queue_id">>, QueueId} @@ -1104,11 +1097,11 @@ add_queue_binding(AcctId, AgentId, QueueId, StateName) -> kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId), gen_listener:add_binding(self() ,'acdc_queue' - ,[{'restrict_to', ['member_connect_req']} + ,[{'restrict_to', ['member_connect_req', 'started_notif']} ,{'queue_id', QueueId} ,{'account_id', AcctId} ]), - send_availability_update(AcctId, AgentId, QueueId, StateName). + do_send_availability_update(QueueId, StateName, State). -spec rm_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'. rm_queue_binding(AcctId, AgentId, QueueId) -> @@ -1120,17 +1113,20 @@ rm_queue_binding(AcctId, AgentId, QueueId) -> kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId), gen_listener:rm_binding(self() ,'acdc_queue' - ,[{'restrict_to', ['member_connect_req']} + ,[{'restrict_to', ['member_connect_req', 'started_notif']} ,{'queue_id', QueueId} ,{'account_id', AcctId} ]), send_agent_unavailable(AcctId, AgentId, QueueId). --spec send_availability_update(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) -> - 'ok'. -send_availability_update(AcctId, AgentId, QueueId, 'ready') -> +-spec do_send_availability_update(kz_term:ne_binary(), fsm_state_name(), state()) -> 'ok'. +do_send_availability_update(QueueId, 'ready', #state{agent_id=AgentId + ,acct_id=AcctId + }) -> send_agent_available(AcctId, AgentId, QueueId); -send_availability_update(AcctId, AgentId, QueueId, _) -> +do_send_availability_update(QueueId, _, #state{agent_id=AgentId + ,acct_id=AcctId + }) -> send_agent_busy(AcctId, AgentId, QueueId). -spec send_agent_available(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'. @@ -1238,7 +1234,7 @@ stop_agent_leg(ACallId, ACtrlQ) -> Command = [{<<"Application-Name">>, <<"hangup">>} ,{<<"Insert-At">>, <<"now">>} ,{<<"Call-ID">>, ACallId} - | kz_api:default_headers(<<>>, <<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION) + | kz_api:default_headers(<<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION) ], lager:debug("sending hangup to ~s: ~s", [ACallId, ACtrlQ]), kapi_dialplan:publish_command(ACtrlQ, Command). diff --git a/applications/acdc/src/acdc_queue_manager.erl b/applications/acdc/src/acdc_queue_manager.erl index 5e0c557dbf0..934fd3fcf84 100644 --- a/applications/acdc/src/acdc_queue_manager.erl +++ b/applications/acdc/src/acdc_queue_manager.erl @@ -314,9 +314,7 @@ init(Super, AccountId, QueueId, QueueJObj) -> gen_listener:cast(self(), {'start_workers'}), Strategy = get_strategy(kz_json:get_value(<<"strategy">>, QueueJObj)), - StrategyState = create_strategy_state(Strategy, AccountDb, QueueId), - - _ = update_strategy_state(self(), Strategy, StrategyState), + StrategyState = create_strategy_state(Strategy), lager:debug("queue mgr started for ~s", [QueueId]), {'ok', update_properties(QueueJObj, #state{account_id=AccountId @@ -524,15 +522,18 @@ handle_cast({'reject_member_call', Call, JObj}, #state{account_id=AccountId publish_member_call_failure(Q, AccountId, QueueId, kapps_call:call_id(Call), <<"no agents">>), {'noreply', State}; -handle_cast({'sync_with_agent', A}, #state{account_id=AccountId}=State) -> - {'ok', Status} = acdc_agent_util:most_recent_status(AccountId, A), - case acdc_agent_util:status_should_auto_start(Status) of - 'true' -> 'ok'; - 'false' -> gen_listener:cast(self(), {'agent_unavailable', A}) - end, +handle_cast({'gen_listener', {'created_queue', ?SECONDARY_QUEUE_NAME(QueueId)}}, #state{queue_id=QueueId}=State) -> {'noreply', State}; -handle_cast({'gen_listener', {'created_queue', _}}, State) -> +handle_cast({'gen_listener', {'created_queue', _}}, #state{account_id=AccountId + ,queue_id=QueueId + }=State) -> + kapi_acdc_queue:publish_started_notif( + kz_json:from_list([{<<"Account-ID">>, AccountId} + ,{<<"Queue-ID">>, QueueId} + | kz_api:default_headers(?APP_NAME, ?APP_VERSION) + ]) + ), {'noreply', State}; handle_cast({'refresh', QueueJObj}, State) -> @@ -855,55 +856,14 @@ get_strategy(<<"round_robin">>) -> 'rr'; get_strategy(<<"most_idle">>) -> 'mi'; get_strategy(_) -> 'rr'. --spec create_strategy_state(queue_strategy(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state(). -create_strategy_state(Strategy, AcctDb, QueueId) -> - create_strategy_state(Strategy, #strategy_state{}, AcctDb, QueueId). - --spec create_strategy_state(queue_strategy(), strategy_state(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state(). -create_strategy_state('rr', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) -> - create_strategy_state('rr', SS#strategy_state{agents=queue:new()}, AcctDb, QueueId); -create_strategy_state('rr', #strategy_state{agents=AgentQ}=SS, AcctDb, QueueId) -> - case acdc_util:agents_in_queue(AcctDb, QueueId) of - [] -> lager:debug("no agents around"), SS; - JObjs -> - Q = queue:from_list([Id || JObj <- JObjs, - not queue:member((Id = kz_doc:id(JObj)), AgentQ) - ]), - Details = lists:foldl(fun(JObj, Acc) -> - dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc) - end, dict:new(), JObjs), - SS#strategy_state{agents=queue:join(AgentQ, Q) - ,details=Details - } - end; -create_strategy_state('mi', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) -> - create_strategy_state('mi', SS#strategy_state{agents=[]}, AcctDb, QueueId); -create_strategy_state('mi', #strategy_state{agents=AgentL}=SS, AcctDb, QueueId) -> - case acdc_util:agents_in_queue(AcctDb, QueueId) of - [] -> lager:debug("no agents around"), SS; - JObjs -> - AgentL1 = lists:foldl(fun(JObj, Acc) -> - Id = kz_doc:id(JObj), - case lists:member(Id, Acc) of - 'true' -> Acc; - 'false' -> [Id | Acc] - end - end, AgentL, JObjs), - Details = lists:foldl(fun(JObj, Acc) -> - dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc) - end, dict:new(), JObjs), - SS#strategy_state{agents=AgentL1 - ,details=Details - } - end. +-spec create_strategy_state(queue_strategy()) -> strategy_state(). +create_strategy_state(Strategy) -> + Agents = create_ss_agents(Strategy), + #strategy_state{agents=Agents}. -update_strategy_state(Srv, 'rr', #strategy_state{agents=AgentQueue}) -> - L = queue:to_list(AgentQueue), - update_strategy_state(Srv, L); -update_strategy_state(Srv, 'mi', #strategy_state{agents=AgentL}) -> - update_strategy_state(Srv, AgentL). -update_strategy_state(Srv, L) -> - [gen_listener:cast(Srv, {'sync_with_agent', A}) || A <- L]. +-spec create_ss_agents(queue_strategy()) -> queue_strategy_state(). +create_ss_agents('rr') -> queue:new(); +create_ss_agents('mi') -> []. -spec call_position(kz_term:ne_binary(), [kapps_call:call()]) -> kz_term:api_integer(). call_position(CallId, Calls) -> diff --git a/applications/acdc/src/kapi_acdc_queue.erl b/applications/acdc/src/kapi_acdc_queue.erl index 2da6f210362..c05597200c2 100644 --- a/applications/acdc/src/kapi_acdc_queue.erl +++ b/applications/acdc/src/kapi_acdc_queue.erl @@ -22,6 +22,7 @@ ,sync_req/1, sync_req_v/1 ,sync_resp/1, sync_resp_v/1 ,agent_change/1, agent_change_v/1 + ,started_notif/1, started_notif_v/1 ,queue_member_add/1, queue_member_add_v/1 ,queue_member_remove/1, queue_member_remove_v/1 ]). @@ -51,6 +52,7 @@ ,publish_sync_req/1, publish_sync_req/2 ,publish_sync_resp/2, publish_sync_resp/3 ,publish_agent_change/1, publish_agent_change/2 + ,publish_started_notif/1, publish_started_notif/2 ,publish_queue_member_add/1, publish_queue_member_add/2 ,publish_queue_member_remove/1, publish_queue_member_remove/2 ]). @@ -513,6 +515,46 @@ agent_change_v(Prop) when is_list(Prop) -> agent_change_v(JObj) -> agent_change_v(kz_json:to_proplist(JObj)). %%------------------------------------------------------------------------------ +%% Event for announcing that a queue has been started so that agents that are +%% members of the queue can inform the queue of their availability +%%------------------------------------------------------------------------------ +-spec started_notif_routing_key(kz_term:api_terms()) -> kz_term:ne_binary(). +started_notif_routing_key(Prop) when is_list(Prop) -> + started_notif_routing_key(props:get_value(<<"Account-ID">>, Prop) + ,props:get_value(<<"Queue-ID">>, Prop) + ); +started_notif_routing_key(JObj) -> + started_notif_routing_key(kz_json:get_value(<<"Account-ID">>, JObj) + ,kz_json:get_value(<<"Queue-ID">>, JObj) + ). + +-spec started_notif_routing_key(kz_term:ne_binary(), kz_term:ne_binary()) -> kz_term:ne_binary(). +started_notif_routing_key(AccountId, QueueId) -> + <<"acdc.queue.started_notif.", AccountId/binary, ".", QueueId/binary>>. + +-define(STARTED_NOTIF_HEADERS, [<<"Account-ID">>, <<"Queue-ID">>]). +-define(OPTIONAL_STARTED_NOTIF_HEADERS, []). +-define(STARTED_NOTIF_VALUES, [{<<"Event-Category">>, <<"queue">>} + ,{<<"Event-Name">>, <<"started_notif">>} + ]). +-define(STARTED_NOTIF_TYPES, [{<<"Account-ID">>, fun kz_term:is_ne_binary/1} + ,{<<"Queue-ID">>, fun kz_term:is_ne_binary/1} + ]). + +-spec started_notif(kz_term:api_terms()) -> + {'ok', iolist()} | + {'error', string()}. +started_notif(Prop) when is_list(Prop) -> + case started_notif_v(Prop) of + 'true' -> kz_api:build_message(Prop, ?STARTED_NOTIF_HEADERS, ?OPTIONAL_STARTED_NOTIF_HEADERS); + 'false' -> {'error', "proplist failed validation for started_notif"} + end; +started_notif(JObj) -> started_notif(kz_json:to_proplist(JObj)). + +-spec started_notif_v(kz_term:api_terms()) -> boolean(). +started_notif_v(Prop) when is_list(Prop) -> + kz_api:validate(Prop, ?STARTED_NOTIF_HEADERS, ?STARTED_NOTIF_VALUES, ?STARTED_NOTIF_TYPES); +started_notif_v(JObj) -> started_notif_v(kz_json:to_proplist(JObj)). %%------------------------------------------------------------------------------ %% Queue Position tracking @@ -619,7 +661,8 @@ bind_q(Q, AcctId, QID, CallId, 'undefined') -> kz_amqp_util:bind_q_to_callmgr(Q, member_call_routing_key(AcctId, QID)), kz_amqp_util:bind_q_to_callmgr(Q, member_call_result_routing_key(AcctId, QID, CallId)), kz_amqp_util:bind_q_to_callmgr(Q, member_connect_req_routing_key(AcctId, QID)), - kz_amqp_util:bind_q_to_kapps(Q, queue_member_routing_key(AcctId, QID)); + kz_amqp_util:bind_q_to_kapps(Q, queue_member_routing_key(AcctId, QID)), + kz_amqp_util:bind_q_to_kapps(Q, started_notif_routing_key(AcctId, QID)); bind_q(Q, AcctId, QID, CallId, ['member_call'|T]) -> kz_amqp_util:bind_q_to_callmgr(Q, member_call_routing_key(AcctId, QID)), bind_q(Q, AcctId, QID, CallId, T); @@ -638,6 +681,9 @@ bind_q(Q, AcctId, QID, CallId, ['agent_change'|T]) -> bind_q(Q, AcctId, QID, CallId, ['member_addremove'|T]) -> kz_amqp_util:bind_q_to_kapps(Q, queue_member_routing_key(AcctId, QID)), bind_q(Q, AcctId, QID, CallId, T); +bind_q(Q, AcctId, QID, CallId, ['started_notif'|T]) -> + kz_amqp_util:bind_q_to_kapps(Q, started_notif_routing_key(AcctId, QID)), + bind_q(Q, AcctId, QID, CallId, T); bind_q(Q, AcctId, QID, CallId, [_|T]) -> bind_q(Q, AcctId, QID, CallId, T); bind_q(_, _, _, _, []) -> 'ok'. @@ -655,7 +701,8 @@ unbind_q(Q, AcctId, QID, CallId, 'undefined') -> _ = kz_amqp_util:unbind_q_from_callmgr(Q, member_call_routing_key(AcctId, QID)), _ = kz_amqp_util:unbind_q_from_callmgr(Q, member_call_result_routing_key(AcctId, QID, CallId)), _ = kz_amqp_util:unbind_q_from_callmgr(Q, member_connect_req_routing_key(AcctId, QID)), - _ = kz_amqp_util:unbind_q_from_kapps(Q, queue_member_routing_key(AcctId, QID)); + _ = kz_amqp_util:unbind_q_from_kapps(Q, queue_member_routing_key(AcctId, QID)), + _ = kz_amqp_util:unbind_q_from_kapps(Q, started_notif_routing_key(AcctId, QID)); unbind_q(Q, AcctId, QID, CallId, ['member_call'|T]) -> _ = kz_amqp_util:unbind_q_from_callmgr(Q, member_call_routing_key(AcctId, QID)), unbind_q(Q, AcctId, QID, CallId, T); @@ -674,6 +721,9 @@ unbind_q(Q, AcctId, QID, CallId, ['agent_change'|T]) -> unbind_q(Q, AcctId, QID, CallId, ['member_addremove'|T]) -> _ = kz_amqp_util:unbind_q_from_kapps(Q, queue_member_routing_key(AcctId, QID)), unbind_q(Q, AcctId, QID, CallId, T); +unbind_q(Q, AcctId, QID, CallId, ['started_notif'|T]) -> + _ = kz_amqp_util:unbind_q_from_kapps(Q, started_notif_routing_key(AcctId, QID)), + unbind_q(Q, AcctId, QID, CallId, T); unbind_q(Q, AcctId, QID, CallId, [_|T]) -> unbind_q(Q, AcctId, QID, CallId, T); unbind_q(_, _, _, _, []) -> 'ok'. @@ -836,6 +886,19 @@ publish_agent_change(API, ContentType) -> {'ok', Payload} = kz_api:prepare_api_payload(API, ?AGENT_CHANGE_VALUES, fun agent_change/1), kz_amqp_util:kapps_publish(agent_change_publish_key(API), Payload, ContentType). +%%------------------------------------------------------------------------------ +%% Event for announcing that a queue has been started so that agents that are +%% members of the queue can inform the queue of their availability +%%------------------------------------------------------------------------------ +-spec publish_started_notif(kz_term:api_terms()) -> 'ok'. +publish_started_notif(JObj) -> + publish_started_notif(JObj, ?DEFAULT_CONTENT_TYPE). + +-spec publish_started_notif(kz_term:api_terms(), kz_term:ne_binary()) -> 'ok'. +publish_started_notif(API, ContentType) -> + {'ok', Payload} = kz_api:prepare_api_payload(API, ?STARTED_NOTIF_VALUES, fun started_notif/1), + kz_amqp_util:kapps_publish(started_notif_routing_key(API), Payload, ContentType). + -spec publish_queue_member_add(kz_term:api_terms()) -> 'ok'. publish_queue_member_add(JObj) -> publish_queue_member_add(JObj, ?DEFAULT_CONTENT_TYPE). diff --git a/applications/crossbar/priv/api/swagger.json b/applications/crossbar/priv/api/swagger.json index c695d105e99..155a59a31f9 100644 --- a/applications/crossbar/priv/api/swagger.json +++ b/applications/crossbar/priv/api/swagger.json @@ -7751,6 +7751,36 @@ ], "type": "object" }, + "kapi.acdc_queue.started_notif": { + "description": "AMQP API for acdc_queue.started_notif", + "properties": { + "Account-ID": { + "minLength": 1, + "type": "string" + }, + "Event-Category": { + "enum": [ + "queue" + ], + "type": "string" + }, + "Event-Name": { + "enum": [ + "started_notif" + ], + "type": "string" + }, + "Queue-ID": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "Account-ID", + "Queue-ID" + ], + "type": "object" + }, "kapi.acdc_queue.sync_req": { "description": "AMQP API for acdc_queue.sync_req", "properties": { diff --git a/applications/crossbar/priv/couchdb/schemas/kapi.acdc_queue.started_notif.json b/applications/crossbar/priv/couchdb/schemas/kapi.acdc_queue.started_notif.json new file mode 100644 index 00000000000..c77e850d45a --- /dev/null +++ b/applications/crossbar/priv/couchdb/schemas/kapi.acdc_queue.started_notif.json @@ -0,0 +1,32 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "_id": "kapi.acdc_queue.started_notif", + "description": "AMQP API for acdc_queue.started_notif", + "properties": { + "Account-ID": { + "minLength": 1, + "type": "string" + }, + "Event-Category": { + "enum": [ + "queue" + ], + "type": "string" + }, + "Event-Name": { + "enum": [ + "started_notif" + ], + "type": "string" + }, + "Queue-ID": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "Account-ID", + "Queue-ID" + ], + "type": "object" +}