Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.3] PISTON-1177: new approach to determine initial availability for agents of queue #6688

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
,end_wrapup/1

,add_acdc_queue/2, rm_acdc_queue/2
,send_availability_update/2
,update_presence/3
,agent_logout/1
,refresh/2
Expand Down Expand Up @@ -308,6 +309,14 @@ add_acdc_queue(ServerRef, QueueId) ->
rm_acdc_queue(ServerRef, QueueId) ->
gen_statem:cast(ServerRef, {'rm_acdc_queue', QueueId}).

%%------------------------------------------------------------------------------
%% @doc Send an availability update
%% @end
%%------------------------------------------------------------------------------
-spec send_availability_update(kz_types:server_ref(), kz_term:ne_binary()) -> 'ok'.
send_availability_update(ServerRef, QueueId) ->
gen_statem:cast(ServerRef, {'send_availability_update', QueueId}).

%%------------------------------------------------------------------------------
%% @doc
%% @end
Expand Down Expand Up @@ -1398,6 +1407,9 @@ handle_event({'add_acdc_queue', QueueId}, StateName, #state{agent_listener=Agent
handle_event({'rm_acdc_queue', QueueId}, StateName, #state{agent_listener=AgentListener}=State) ->
acdc_agent_listener:rm_acdc_queue(AgentListener, QueueId),
{'next_state', StateName, State};
handle_event({'send_availability_update', QueueId}, StateName, #state{agent_listener=AgentListener}=State) ->
acdc_agent_listener:send_availability_update(AgentListener, StateName, QueueId),
{'next_state', StateName, State};
handle_event({'update_presence', PresenceId, PresenceState}, 'ready', State) ->
handle_presence_update(PresenceId, PresenceState, State),
{'next_state', 'ready', State};
Expand Down Expand Up @@ -1967,14 +1979,11 @@ apply_state_updates_fold({_, StateName, #state{account_id=AccountId
,pause_alias=Alias
}}=Acc, []) ->
lager:debug("resulting agent state ~s", [StateName]),
acdc_agent_listener:send_availability_update(AgentListener, StateName),
case StateName of
'ready' ->
acdc_agent_listener:send_agent_available(AgentListener),
acdc_agent_stats:agent_ready(AccountId, AgentId);
'ready' -> acdc_agent_stats:agent_ready(AccountId, AgentId);
'wrapup' -> acdc_agent_stats:agent_wrapup(AccountId, AgentId, time_left(WRef));
'paused' ->
acdc_agent_listener:send_agent_busy(AgentListener),
acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias)
'paused' -> acdc_agent_stats:agent_paused(AccountId, AgentId, time_left(PRef), Alias)
end,
Acc;
apply_state_updates_fold({_, _, State}, [{'pause', Timeout, Alias}|Updates]) ->
Expand Down
14 changes: 14 additions & 0 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
,handle_agent_message/2
,handle_config_change/2
,handle_presence_probe/2
,handle_queue_started_notif/2
]).

-include("acdc.hrl").
Expand Down Expand Up @@ -490,3 +491,16 @@ maybe_update_presence(Sup, JObj, PresenceState) ->
APid = acdc_agent_sup:listener(Sup),
acdc_agent_listener:maybe_update_presence_id(APid, presence_id(JObj)),
acdc_agent_listener:presence_update(APid, presence_state(JObj, PresenceState)).

%%------------------------------------------------------------------------------
%% @doc When a queue this agent is bound to (a member of) is started, handle the
%% notif by sending an availability update to the queue so it knows the
%% availability state of this agent for taking calls.
%% @end
%%------------------------------------------------------------------------------
-spec handle_queue_started_notif(kz_json:object(), kz_term:proplist()) -> 'ok'.
handle_queue_started_notif(JObj, Props) ->
'true' = kapi_acdc_queue:started_notif_v(JObj),
FSM = props:get_value('fsm_pid', Props),
QueueId = kz_json:get_ne_binary_value(<<"Queue-ID">>, JObj),
acdc_agent_fsm:send_availability_update(FSM, QueueId).
72 changes: 34 additions & 38 deletions applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
,originate_execute/2
,originate_uuid/3
,outbound_call/2
,send_agent_available/1
,send_agent_busy/1
,send_availability_update/2, send_availability_update/3
,send_sync_req/1
,send_sync_resp/3, send_sync_resp/4
,config/1, refresh_config/3
Expand Down Expand Up @@ -154,6 +153,9 @@
,{{'acdc_agent_handler', 'handle_config_change'}
,[{<<"configuration">>, <<"*">>}]
}
,{{'acdc_agent_handler', 'handle_queue_started_notif'}
,[{<<"queue">>, <<"started_notif">>}]
}
]).

%%%=============================================================================
Expand Down Expand Up @@ -254,13 +256,13 @@ originate_uuid(Srv, UUID, CtlQ) ->
outbound_call(Srv, CallId) ->
gen_listener:cast(Srv, {'outbound_call', CallId}).

-spec send_agent_available(pid()) -> 'ok'.
send_agent_available(Srv) ->
gen_listener:cast(Srv, 'send_agent_available').
-spec send_availability_update(pid(), fsm_state_name()) -> 'ok'.
send_availability_update(Srv, StateName) ->
gen_listener:cast(Srv, {'send_availability_update', StateName}).

-spec send_agent_busy(pid()) -> 'ok'.
send_agent_busy(Srv) ->
gen_listener:cast(Srv, 'send_agent_busy').
-spec send_availability_update(pid(), fsm_state_name(), kz_term:ne_binary()) -> 'ok'.
send_availability_update(Srv, StateName, QueueId) ->
gen_listener:cast(Srv, {'send_availability_update', StateName, QueueId}).

-spec send_sync_req(pid()) -> 'ok'.
send_sync_req(Srv) -> gen_listener:cast(Srv, {'send_sync_req'}).
Expand Down Expand Up @@ -410,16 +412,14 @@ handle_cast({'fsm_started', FSMPid}, State) ->
handle_cast({'gen_listener', {'created_queue', Q}}, State) ->
{'noreply', State#state{my_q=Q}, 'hibernate'};

handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs
,acct_id=AcctId
,agent_id=AgentId
}=State) when is_binary(Q) ->
handle_cast({'add_acdc_queue', Q, StateName}, #state{agent_queues=Qs}=State) when is_binary(Q) ->
case lists:member(Q, Qs) of
'true' ->
lager:debug("queue ~s already added", [Q]),
do_send_availability_update(Q, StateName, State),
{'noreply', State};
'false' ->
add_queue_binding(AcctId, AgentId, Q, StateName),
add_queue_binding(Q, StateName, State),
{'noreply', State#state{agent_queues=[Q|Qs]}}
end;

Expand All @@ -445,11 +445,8 @@ handle_cast({'rm_acdc_queue', Q}, #state{agent_queues=Qs
{'noreply', State}
end;

handle_cast('bind_to_member_reqs', #state{agent_queues=Qs
,acct_id=AcctId
,agent_id=AgentId
}=State) ->
_ = [add_queue_binding(AcctId, AgentId, Q, 'ready') || Q <- Qs],
handle_cast('bind_to_member_reqs', #state{agent_queues=Qs}=State) ->
_ = [add_queue_binding(Q, 'ready', State) || Q <- Qs],
{'noreply', State};

handle_cast({'rebind_events', OldCallId, NewCallId}, State) ->
Expand Down Expand Up @@ -739,18 +736,12 @@ handle_cast({'outbound_call', CallId}, #state{agent_id=AgentId
lager:debug("bound to agent's outbound call ~s", [CallId]),
{'noreply', State#state{call=kapps_call:set_call_id(CallId, kapps_call:new())}, 'hibernate'};

handle_cast('send_agent_available', #state{agent_id=AgentId
,acct_id=AcctId
,agent_queues=Qs
}=State) ->
[send_agent_available(AcctId, AgentId, QueueId) || QueueId <- Qs],
handle_cast({'send_availability_update', StateName}, #state{agent_queues=Qs}=State) ->
[do_send_availability_update(QueueId, StateName, State) || QueueId <- Qs],
{'noreply', State};

handle_cast('send_agent_busy', #state{agent_id=AgentId
,acct_id=AcctId
,agent_queues=Qs
}=State) ->
[send_agent_busy(AcctId, AgentId, QueueId) || QueueId <- Qs],
handle_cast({'send_availability_update', StateName, QueueId}, State) ->
do_send_availability_update(QueueId, StateName, State),
{'noreply', State};

handle_cast({'send_sync_req'}, #state{my_id=MyId
Expand Down Expand Up @@ -1093,9 +1084,11 @@ outbound_call_id(CallId, AgentId) when is_binary(CallId) ->
outbound_call_id(Call, AgentId) ->
outbound_call_id(kapps_call:call_id(Call), AgentId).

-spec add_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) ->
-spec add_queue_binding(kz_term:ne_binary(), fsm_state_name(), state()) ->
'ok'.
add_queue_binding(AcctId, AgentId, QueueId, StateName) ->
add_queue_binding(QueueId, StateName, #state{agent_id=AgentId
,acct_id=AcctId
}=State) ->
lager:debug("adding queue binding for ~s", [QueueId]),
Body = kz_json:from_list([{<<"agent_id">>, AgentId}
,{<<"queue_id">>, QueueId}
Expand All @@ -1104,11 +1097,11 @@ add_queue_binding(AcctId, AgentId, QueueId, StateName) ->
kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId),
gen_listener:add_binding(self()
,'acdc_queue'
,[{'restrict_to', ['member_connect_req']}
,[{'restrict_to', ['member_connect_req', 'started_notif']}
,{'queue_id', QueueId}
,{'account_id', AcctId}
]),
send_availability_update(AcctId, AgentId, QueueId, StateName).
do_send_availability_update(QueueId, StateName, State).

-spec rm_queue_binding(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
rm_queue_binding(AcctId, AgentId, QueueId) ->
Expand All @@ -1120,17 +1113,20 @@ rm_queue_binding(AcctId, AgentId, QueueId) ->
kz_edr:event(?APP_NAME, ?APP_VERSION, 'ok', 'info', Body, AcctId),
gen_listener:rm_binding(self()
,'acdc_queue'
,[{'restrict_to', ['member_connect_req']}
,[{'restrict_to', ['member_connect_req', 'started_notif']}
,{'queue_id', QueueId}
,{'account_id', AcctId}
]),
send_agent_unavailable(AcctId, AgentId, QueueId).

-spec send_availability_update(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary(), fsm_state_name()) ->
'ok'.
send_availability_update(AcctId, AgentId, QueueId, 'ready') ->
-spec do_send_availability_update(kz_term:ne_binary(), fsm_state_name(), state()) -> 'ok'.
do_send_availability_update(QueueId, 'ready', #state{agent_id=AgentId
,acct_id=AcctId
}) ->
send_agent_available(AcctId, AgentId, QueueId);
send_availability_update(AcctId, AgentId, QueueId, _) ->
do_send_availability_update(QueueId, _, #state{agent_id=AgentId
,acct_id=AcctId
}) ->
send_agent_busy(AcctId, AgentId, QueueId).

-spec send_agent_available(kz_term:ne_binary(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
Expand Down Expand Up @@ -1238,7 +1234,7 @@ stop_agent_leg(ACallId, ACtrlQ) ->
Command = [{<<"Application-Name">>, <<"hangup">>}
,{<<"Insert-At">>, <<"now">>}
,{<<"Call-ID">>, ACallId}
| kz_api:default_headers(<<>>, <<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION)
| kz_api:default_headers(<<"call">>, <<"command">>, ?APP_NAME, ?APP_VERSION)
],
lager:debug("sending hangup to ~s: ~s", [ACallId, ACtrlQ]),
kapi_dialplan:publish_command(ACtrlQ, Command).
Expand Down
76 changes: 18 additions & 58 deletions applications/acdc/src/acdc_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ init(Super, AccountId, QueueId, QueueJObj) ->

gen_listener:cast(self(), {'start_workers'}),
Strategy = get_strategy(kz_json:get_value(<<"strategy">>, QueueJObj)),
StrategyState = create_strategy_state(Strategy, AccountDb, QueueId),

_ = update_strategy_state(self(), Strategy, StrategyState),
StrategyState = create_strategy_state(Strategy),

lager:debug("queue mgr started for ~s", [QueueId]),
{'ok', update_properties(QueueJObj, #state{account_id=AccountId
Expand Down Expand Up @@ -524,15 +522,18 @@ handle_cast({'reject_member_call', Call, JObj}, #state{account_id=AccountId
publish_member_call_failure(Q, AccountId, QueueId, kapps_call:call_id(Call), <<"no agents">>),
{'noreply', State};

handle_cast({'sync_with_agent', A}, #state{account_id=AccountId}=State) ->
{'ok', Status} = acdc_agent_util:most_recent_status(AccountId, A),
case acdc_agent_util:status_should_auto_start(Status) of
'true' -> 'ok';
'false' -> gen_listener:cast(self(), {'agent_unavailable', A})
end,
handle_cast({'gen_listener', {'created_queue', ?SECONDARY_QUEUE_NAME(QueueId)}}, #state{queue_id=QueueId}=State) ->
{'noreply', State};

handle_cast({'gen_listener', {'created_queue', _}}, State) ->
handle_cast({'gen_listener', {'created_queue', _}}, #state{account_id=AccountId
,queue_id=QueueId
}=State) ->
kapi_acdc_queue:publish_started_notif(
kz_json:from_list([{<<"Account-ID">>, AccountId}
,{<<"Queue-ID">>, QueueId}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
])
),
{'noreply', State};

handle_cast({'refresh', QueueJObj}, State) ->
Expand Down Expand Up @@ -855,55 +856,14 @@ get_strategy(<<"round_robin">>) -> 'rr';
get_strategy(<<"most_idle">>) -> 'mi';
get_strategy(_) -> 'rr'.

-spec create_strategy_state(queue_strategy(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state().
create_strategy_state(Strategy, AcctDb, QueueId) ->
create_strategy_state(Strategy, #strategy_state{}, AcctDb, QueueId).

-spec create_strategy_state(queue_strategy(), strategy_state(), kz_term:ne_binary(), kz_term:ne_binary()) -> strategy_state().
create_strategy_state('rr', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) ->
create_strategy_state('rr', SS#strategy_state{agents=queue:new()}, AcctDb, QueueId);
create_strategy_state('rr', #strategy_state{agents=AgentQ}=SS, AcctDb, QueueId) ->
case acdc_util:agents_in_queue(AcctDb, QueueId) of
[] -> lager:debug("no agents around"), SS;
JObjs ->
Q = queue:from_list([Id || JObj <- JObjs,
not queue:member((Id = kz_doc:id(JObj)), AgentQ)
]),
Details = lists:foldl(fun(JObj, Acc) ->
dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc)
end, dict:new(), JObjs),
SS#strategy_state{agents=queue:join(AgentQ, Q)
,details=Details
}
end;
create_strategy_state('mi', #strategy_state{agents='undefined'}=SS, AcctDb, QueueId) ->
create_strategy_state('mi', SS#strategy_state{agents=[]}, AcctDb, QueueId);
create_strategy_state('mi', #strategy_state{agents=AgentL}=SS, AcctDb, QueueId) ->
case acdc_util:agents_in_queue(AcctDb, QueueId) of
[] -> lager:debug("no agents around"), SS;
JObjs ->
AgentL1 = lists:foldl(fun(JObj, Acc) ->
Id = kz_doc:id(JObj),
case lists:member(Id, Acc) of
'true' -> Acc;
'false' -> [Id | Acc]
end
end, AgentL, JObjs),
Details = lists:foldl(fun(JObj, Acc) ->
dict:store(kz_doc:id(JObj), {1, 'undefined'}, Acc)
end, dict:new(), JObjs),
SS#strategy_state{agents=AgentL1
,details=Details
}
end.
-spec create_strategy_state(queue_strategy()) -> strategy_state().
create_strategy_state(Strategy) ->
Agents = create_ss_agents(Strategy),
#strategy_state{agents=Agents}.

update_strategy_state(Srv, 'rr', #strategy_state{agents=AgentQueue}) ->
L = queue:to_list(AgentQueue),
update_strategy_state(Srv, L);
update_strategy_state(Srv, 'mi', #strategy_state{agents=AgentL}) ->
update_strategy_state(Srv, AgentL).
update_strategy_state(Srv, L) ->
[gen_listener:cast(Srv, {'sync_with_agent', A}) || A <- L].
-spec create_ss_agents(queue_strategy()) -> queue_strategy_state().
create_ss_agents('rr') -> queue:new();
create_ss_agents('mi') -> [].

-spec call_position(kz_term:ne_binary(), [kapps_call:call()]) -> kz_term:api_integer().
call_position(CallId, Calls) ->
Expand Down
Loading