Skip to content

Commit

Permalink
[4.3] HELP-16635: deduct tokens on all API requests (#6628)
Browse files Browse the repository at this point in the history
* [4.3] HELP-16635: deduct tokens on all API requests

Each time an API request is made, deduct the token cost
from the associated bucket.

As part of the changes, some property tests needed updating because
they exceed the token bucket. These tests disable the cost while they
run and replace the prior value when they end.

Another issue resolved is that bindings in blackhole, when using the
kz_hooks facility, were done asynchronously. This meant that a speedy
response to the websocket client followed by a generated event could
occur faster than the kz_hook listener could bind its AMQP queue to
the new binding.

The blackhole response to a subscription now blocks until the AMQP
queue is established.

* edoc
  • Loading branch information
jamesaimonetti authored Sep 3, 2020
1 parent 2bfbf8b commit 3806bff
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 42 deletions.
10 changes: 8 additions & 2 deletions applications/blackhole/src/blackhole_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ send_error_module_resp(EventJObj, Error) ->

-spec add_binding(bh_event_binding()) -> 'ok'.
add_binding(Binding) ->
gen_listener:cast(?SERVER, {'add_bh_binding', Binding}).
gen_listener:call(?SERVER, {'add_bh_binding', Binding}).

-spec add_bindings(bh_event_bindings()) -> 'ok'.
add_bindings(Bindings) ->
gen_listener:cast(?SERVER, {'add_bh_bindings', Bindings}).
gen_listener:call(?SERVER, {'add_bh_bindings', Bindings}).

-spec remove_binding(bh_event_binding()) -> 'ok'.
remove_binding(Binding) ->
Expand All @@ -207,6 +207,12 @@ init([]) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call({'add_bh_bindings', Bindings}, _From, #state{bindings=ETS}=State) ->
_ = add_bh_bindings(ETS, Bindings),
{'reply', 'ok', State};
handle_call({'add_bh_binding', Binding}, _From, #state{bindings=ETS}=State) ->
_ = add_bh_binding(ETS, Binding),
{'noreply', 'ok', State};
handle_call(_Request, _From, State) ->
{'reply', {'error', 'not_implemented'}, State}.

Expand Down
55 changes: 36 additions & 19 deletions applications/crossbar/src/modules/cb_token_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec init() -> ok.
-spec init() -> 'ok'.
init() ->
_ = crossbar_bindings:bind(<<"*.authenticate">>, ?MODULE, 'authenticate'),
_ = crossbar_bindings:bind(<<"*.early_authenticate">>, ?MODULE, 'early_authenticate'),
Expand All @@ -39,7 +39,7 @@ init() ->
_ = crossbar_bindings:bind(<<"*.resource_exists.token_auth">>, ?MODULE, 'resource_exists'),
_ = crossbar_bindings:bind(<<"*.validate.token_auth">>, ?MODULE, 'validate'),
_ = crossbar_bindings:bind(<<"*.execute.delete.token_auth">>, ?MODULE, 'delete'),
ok.
'ok'.

-spec allowed_methods() -> http_methods().
allowed_methods() -> [?HTTP_DELETE, ?HTTP_GET].
Expand Down Expand Up @@ -116,27 +116,35 @@ authenticate(Context) ->
-spec authenticate(cb_context:context(), kz_term:api_ne_binary(), atom()) ->
boolean() |
{'true' | 'stop', cb_context:context()}.
authenticate(_Context, ?NE_BINARY = _AccountId, 'x-auth-token') -> 'true';
authenticate(Context, <<_AccountId/binary>>, 'x-auth-token') ->
%% if the auth account id is known already, early-auth validated it
%% just deduct tokens in that case
case deduct_tokens(Context) of
'ok' -> 'true';
Error -> Error
end;
authenticate(Context, 'undefined', 'x-auth-token') ->
_ = cb_context:put_reqid(Context),
case deduct_tokens(Context) of
'ok' ->
lager:info("checking for x-auth-token"),
check_auth_token(Context);
Error ->
lager:info("rate limiting threshold hit for ~s!", [cb_context:client_ip(Context)]),
Error
end;
authenticate(_Context, _AccountId, _TokenType) -> 'false'.

deduct_tokens(Context) ->
Bucket = cb_modules_util:bucket_name(Context),
Cost = cb_modules_util:token_cost(Context),

case kz_buckets:consume_tokens(?APP_NAME, Bucket, Cost) of
'true' ->
lager:info("checking for x-auth-token"),
check_auth_token(Context
,cb_context:auth_token(Context)
,cb_context:magic_pathed(Context)
);
'true' -> 'ok';
'false' ->
lager:warning("bucket ~s does not have enough tokens(~b needed) for this request"
,[Bucket, Cost]
),
lager:info("rate limiting threshold hit for ~s!", [cb_context:client_ip(Context)]),
{'stop', cb_context:add_system_error('too_many_requests', Context)}
end;
authenticate(_Context, _AccountId, _TokenType) -> 'false'.
end.

-spec early_authenticate(cb_context:context()) ->
boolean() |
Expand All @@ -155,9 +163,18 @@ early_authenticate(_Context, _TokenType) -> 'false'.
-spec early_authenticate_token(cb_context:context(), kz_term:api_binary()) ->
boolean() |
{'true', cb_context:context()}.
early_authenticate_token(Context, AuthToken) when is_binary(AuthToken) ->
validate_auth_token(Context, AuthToken);
early_authenticate_token(_Context, 'undefined') -> 'true'.
early_authenticate_token(_Context, 'undefined') -> 'true';
early_authenticate_token(Context, <<AuthToken/binary>>) ->
validate_auth_token(Context, AuthToken).

-spec check_auth_token(cb_context:context()) ->
boolean() |
{'true', cb_context:context()}.
check_auth_token(Context) ->
check_auth_token(Context
,cb_context:auth_token(Context)
,cb_context:magic_pathed(Context)
).

-spec check_auth_token(cb_context:context(), kz_term:api_binary(), boolean()) ->
boolean() |
Expand All @@ -174,7 +191,7 @@ check_auth_token(Context, AuthToken, _MagicPathed) ->
-spec validate_auth_token(cb_context:context(), kz_term:ne_binary()) ->
boolean() |
{'true', cb_context:context()}.
validate_auth_token(Context, ?NE_BINARY = AuthToken) ->
validate_auth_token(Context, AuthToken) ->
Options = [{<<"account_id">>, cb_context:req_header(Context, <<"x-auth-account-id">>)}],
lager:debug("checking auth token"),
case crossbar_auth:validate_auth_token(AuthToken, props:filter_undefined(Options)) of
Expand Down Expand Up @@ -238,7 +255,7 @@ check_as_payload(Context, JObj, AccountId) ->
{'true', cb_context:context()}.
check_descendants(Context, JObj, AccountId, AsAccountId, AsOwnerId) ->
case kapps_util:account_descendants(AccountId) of
[] -> false;
[] -> 'false';
Descendants ->
case lists:member(AsAccountId, Descendants) of
'false' -> 'false';
Expand Down
54 changes: 35 additions & 19 deletions core/kazoo_events/src/kz_hooks_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
-module(kz_hooks_listener).
-behaviour(gen_listener).

-export([start_link/0]).
-export([start_link/0
,maybe_add_binding/1
]).
-export([init/1
,handle_call/3
,handle_cast/2
Expand All @@ -22,8 +24,6 @@
-include("kazoo_events.hrl").
-include("kz_hooks.hrl").

-define(SERVER, ?MODULE).

%% Three main call events
-define(ALL_EVENTS, [<<"CHANNEL_CREATE">>
,<<"CHANNEL_ANSWER">>
Expand Down Expand Up @@ -56,7 +56,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_listener:start_link({'local', ?SERVER}
gen_listener:start_link({'local', ?MODULE}
,?MODULE
,[{'bindings', ?BINDINGS}
,{'responders', ?RESPONDERS}
Expand All @@ -67,6 +67,11 @@ start_link() ->
,[]
).

-spec maybe_add_binding('all' | kz_term:ne_binary()) -> 'ok'.
maybe_add_binding(EventName) ->
Events = gen_listener:call(?MODULE, 'current_events'),
maybe_add_binding(EventName, Events).

%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
Expand All @@ -87,6 +92,8 @@ init([]) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call('current_events', _From, #state{call_events=Events}=State) ->
{'reply', Events, State};
handle_call(_Request, _From, State) ->
{'reply', {'error', 'not_implemented'}, State}.

Expand All @@ -95,22 +102,10 @@ handle_call(_Request, _From, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast({'maybe_add_binding', 'all'}, #state{call_events=Events}=State) ->
case [E || E <- ?ALL_EVENTS, not lists:member(E, Events)] of
[] -> {'noreply', State};
Es ->
lager:debug("adding bindings for ~p", [Es]),
gen_listener:add_binding(self(), ?CALL_BINDING(Es)),
{'noreply', State#state{call_events=Es ++ Events}}
end;
handle_cast({'maybe_add_binding', Event}, #state{call_events=Events}=State) ->
case lists:member(Event, Events) of
'true' -> {'noreply', State};
'false' ->
lager:debug("adding bindings for ~s", [Event]),
gen_listener:add_binding(self(), ?CALL_BINDING([Event])),
{'noreply', State#state{call_events=[Event | Events]}}
end;
NewEvents = maybe_add_binding(Events, Event),
{'noreply', State#state{call_events=NewEvents}};

handle_cast({'maybe_remove_binding', 'all'}, #state{call_events=Events}=State) ->
case [E || E <- ?ALL_EVENTS, lists:member(E, Events)] of
[] -> {'noreply', State};
Expand Down Expand Up @@ -175,3 +170,24 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @doc
%% @end
%%------------------------------------------------------------------------------
maybe_add_binding('all', Events) ->
case [E || E <- ?ALL_EVENTS, not lists:member(E, Events)] of
[] -> Events;
Es ->
lager:info("adding bindings for ~p", [Es]),
gen_listener:b_add_binding(?MODULE, ?CALL_BINDING(Es)),
Es ++ Events
end;
maybe_add_binding(Event, Events) ->
case lists:member(Event, Events) of
'true' -> Events;
'false' ->
lager:info("adding bindings for ~s", [Event]),
gen_listener:b_add_binding(?MODULE, ?CALL_BINDING([Event])),
[Event | Events]
end.
32 changes: 31 additions & 1 deletion core/kazoo_events/src/kz_hooks_shared_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

-behaviour(gen_listener).

-export([start_link/0]).
-export([start_link/0
,maybe_add_binding/1
]).
-export([init/1
,handle_call/3
,handle_cast/2
Expand Down Expand Up @@ -69,6 +71,11 @@ start_link() ->
,[]
).

-spec maybe_add_binding('all' | kz_term:ne_binary()) -> 'ok'.
maybe_add_binding(EventName) ->
Events = gen_listener:call(?MODULE, 'current_events'),
maybe_add_binding(EventName, Events).

%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
Expand All @@ -88,6 +95,8 @@ init([]) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call('current_events', _From, #state{call_events=Events}=State) ->
{'reply', Events, State};
handle_call(_Request, _From, State) ->
{'reply', {'error', 'not_implemented'}, State}.

Expand Down Expand Up @@ -176,3 +185,24 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================

%%------------------------------------------------------------------------------
%% @doc
%% @end
%%------------------------------------------------------------------------------
maybe_add_binding('all', Events) ->
case [E || E <- ?ALL_EVENTS, not lists:member(E, Events)] of
[] -> Events;
Es ->
lager:debug("adding bindings for ~p", [Es]),
gen_listener:b_add_binding(?MODULE, ?CALL_BINDING(Es)),
Es ++ Events
end;
maybe_add_binding(Event, Events) ->
case lists:member(Event, Events) of
'true' -> Events;
'false' ->
lager:debug("adding bindings for ~s", [Event]),
gen_listener:b_add_binding(?MODULE, ?CALL_BINDING([Event])),
[Event | Events]
end.
3 changes: 2 additions & 1 deletion core/kazoo_events/src/kz_hooks_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ maybe_add_binding_to_listener(ServerName) ->

-spec maybe_add_binding_to_listener(atom(), kz_term:ne_binary() | 'all') -> 'ok'.
maybe_add_binding_to_listener(ServerName, EventName) ->
gen_listener:cast(ServerName, {'maybe_add_binding', EventName}).
lager:info("adding event ~s to ~s", [EventName, ServerName]),
ServerName:maybe_add_binding(EventName).

-spec maybe_remove_hook(tuple()) -> 'true'.
maybe_remove_hook(Hook) ->
Expand Down
14 changes: 14 additions & 0 deletions core/kazoo_proper/src/pqc_cb_directories.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ seq() ->
Model = initial_state(),
API = pqc_kazoo_model:api(Model),

OldCost = kapps_config:get(<<"crossbar">>, <<"token_costs">>),
_ = kapps_config:set_default(<<"crossbar">>, <<"token_costs">>, 0),

Result = try seq(API)
catch E:R -> {E, R}
after
kapps_config:set_default(<<"crossbar">>, <<"token_costs">>, OldCost)
end,
return_result(Result).

return_result({E, R}) when is_function(E, 1) -> E(R);
return_result(Result) -> Result.

seq(API) ->
AccountResp = pqc_cb_accounts:create_account(API, hd(?ACCOUNT_NAMES)),
lager:info("created account: ~s", [AccountResp]),

Expand Down

0 comments on commit 3806bff

Please sign in to comment.