Skip to content

Commit

Permalink
3.19: get list of buckets to stop
Browse files Browse the repository at this point in the history
don't iterate over the table, as it can crash during concurrent updates.
  • Loading branch information
jamesaimonetti committed Mar 6, 2015
1 parent a0873e9 commit 558372a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 58 deletions.
83 changes: 41 additions & 42 deletions core/kazoo_token_buckets-1.0.0/src/kz_buckets.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2014, 2600Hz INC
%%% @copyright (C) 2014-2015, 2600Hz INC
%%% @doc
%%% API interface for buckets
%%% ETS writer for table
Expand Down Expand Up @@ -55,7 +55,7 @@
-record(bucket, {key :: {ne_binary(), ne_binary()} | '_'
,srv :: pid() | '$1' | '_'
,ref :: reference() | '$2' | '_'
,accessed = os:timestamp() :: wh_now() | '_'
,accessed = wh_util:now_s(os:timestamp()) :: gregorian_seconds() | '$1' | '_'
}).
-type bucket() :: #bucket{}.

Expand Down Expand Up @@ -207,20 +207,38 @@ start_bucket(App, Name, MaxTokens, FillRate, FillTime) ->
gen_server:call(?MODULE, {'start', App, Name, MaxTokens, FillRate, FillTime})
end.

-define(TOKEN_FORMAT_STRING, " ~20s | ~50.50s | ~15.15s | ~6.6s | ~20.20s |~n").
-spec tokens() -> 'ok'.
tokens() ->
io:format("~60.60s | ~20.20s | ~10.10s | ~20.20s |~n", [<<"Key">>, <<"Pid">>, <<"Tokens">>, <<"Last Accessed">>]),
tokens_traverse(ets:first(table_id())).
tokens_traverse('$end_of_table') ->
io:format("~s~n", [<<"No more token servers">>]);
tokens_traverse(Key) ->
[#bucket{key=K, srv=P, accessed=Accessed}] = ets:lookup(table_id(), Key),
io:format("~60.60s | ~20.20s | ~10.10s | ~20.20s |~n"
,[K, pid_to_list(P), integer_to_list(kz_token_bucket:tokens(P))
,integer_to_list(wh_util:elapsed_s(Accessed))
io:format(?TOKEN_FORMAT_STRING
,[<<"Application">>, <<"Key">>, <<"Pid">>, <<"Tokens">>, <<"Last Accessed">>]
),

lists:foldl(fun print_bucket_info/2
,'undefined'
,lists:keysort(#bucket.key, ets:tab2list(table_id()))
),
'ok'.

print_bucket_info(#bucket{key={CurrentApp, Name}
,srv=P
,accessed=Accessed
}
,CurrentApp) ->
io:format(?TOKEN_FORMAT_STRING
,[""
,Name
,pid_to_list(P)
,integer_to_list(kz_token_bucket:tokens(P))
,wh_util:pretty_print_elapsed_s(wh_util:elapsed_s(Accessed))
]
),
tokens_traverse(ets:next(table_id(), Key)).
CurrentApp;
print_bucket_info(#bucket{key={App, _}}=Bucket, _OldApp) ->
io:format(?TOKEN_FORMAT_STRING
,[App, "", "", "" ,""]
),
print_bucket_info(Bucket, App).

%%%===================================================================
%%% ETS
Expand Down Expand Up @@ -306,7 +324,7 @@ handle_cast(_Req, #state{table_id='undefined'}=State) ->
lager:debug("ignoring req: ~p", [_Req]),
{'noreply', State};
handle_cast({'bucket_accessed', Key}, State) ->
ets:update_element(table_id(), Key, {#bucket.accessed, os:timestamp()}),
ets:update_element(table_id(), Key, {#bucket.accessed, wh_util:now_s(os:timestamp())}),
{'noreply', State};
handle_cast(_Msg, State) ->
{'noreply', State}.
Expand Down Expand Up @@ -387,35 +405,16 @@ start_inactivity_timer() ->
erlang:send_after(?MILLISECONDS_IN_MINUTE, self(), ?INACTIVITY_MSG).

-spec check_for_inactive_buckets() -> 'ok'.
-spec check_for_inactive_buckets(wh_now(), pos_integer(), {ne_binary(), ne_binary()} | '$end_of_table') -> 'ok'.

check_for_inactive_buckets() ->
wh_util:put_callid(?MODULE),
Now = os:timestamp(),
InactivityTimeout = ?INACTIVITY_TIMEOUT_MS,
check_for_inactive_buckets(Now, InactivityTimeout, ets:first(table_id())).

check_for_inactive_buckets(_Now, _InactivityTimeout, '$end_of_table') -> 'ok';
check_for_inactive_buckets(Now, InactivityTimeout, {App, Key}) ->
case get_bucket(App, Key, 'record') of
'undefined' -> 'ok';
Bucket ->
maybe_stop_bucket(Now, InactivityTimeout, Bucket)
end,
check_for_inactive_buckets(Now, InactivityTimeout, ets:next(table_id(), {App, Key})).

-spec maybe_stop_bucket(wh_now(), pos_integer(), bucket()) -> 'ok'.
maybe_stop_bucket(Now
,InactivityTimeout
,#bucket{accessed=Accessed
,srv=Srv
,key=Key
}
) ->
case wh_util:elapsed_ms(Accessed, Now) > InactivityTimeout of
'false' -> 'ok';
'true' ->
lager:debug("bucket ~p(~p) hasn't been accessed recently, stopping", [Key, Srv]),
kz_token_bucket:stop(Srv),
'ok'
Now = wh_util:now_s(os:timestamp()),
InactivityTimeout = ?INACTIVITY_TIMEOUT_S,

MS = [{#bucket{accessed='$1', srv='$2', _='_'}
,[{'<', '$1', {'const', Now-InactivityTimeout}}]
,['$2']
}],
case [kz_token_bucket:stop(Srv) || Srv <- ets:select(?MODULE:table_id(), MS)] of
[] -> 'ok';
L -> lager:debug("stopped ~p servers", [length(L)])
end.
3 changes: 1 addition & 2 deletions core/kazoo_token_buckets-1.0.0/src/kz_buckets.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
-define(APP_NAME, <<"token_buckets">>).
-define(DEFAULT_APP, <<"default">>).

-define(INACTIVITY_TIMEOUT_MS
-define(INACTIVITY_TIMEOUT_S
,whapps_config:get_integer(?APP_NAME, <<"inactivity_timeout_s">>, ?SECONDS_IN_MINUTE * 10)
* ?MILLISECONDS_IN_SECOND
).
-define(INACTIVITY_MSG, 'inactivity_timeout').

Expand Down
28 changes: 14 additions & 14 deletions core/kazoo_token_buckets-1.0.0/src/kz_token_bucket.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2013-2014, 2600Hz
%%% @copyright (C) 2013-2015, 2600Hz
%%% @doc
%%% Implementation of a token bucket as gen_server
%%% https://en.wikipedia.org/wiki/Token_bucket#The_token_bucket_algorithm
Expand Down Expand Up @@ -149,7 +149,7 @@ init([Max, FillRate, FillAsBlock, FillTime]) ->
,fill_as_block=FillAsBlock
,tokens=Max
}
,?INACTIVITY_TIMEOUT_MS
,?INACTIVITY_TIMEOUT_S * 1000
}.

%%--------------------------------------------------------------------
Expand All @@ -170,25 +170,25 @@ handle_call({'consume', Req}, _From, #state{tokens=Current}=State) ->
case Current - Req of
N when N >= 0 ->
lager:debug("consumed ~p, ~p left", [Req, N]),
{'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS};
{'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000};
_ ->
lager:debug("not enough tokens (~p) to consume ~p", [Current, Req]),
{'reply', 'false', State, ?INACTIVITY_TIMEOUT_MS}
{'reply', 'false', State, ?INACTIVITY_TIMEOUT_S * 1000}
end;
handle_call({'consume_until', Req}, _From, #state{tokens=Current}=State) ->
case Current - Req of
N when N >= 0 ->
lager:debug("consumed ~p, ~p left", [Req, N]),
{'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS};
{'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000};
_N ->
lager:debug("not enough tokens (~p) to consume ~p, zeroing out tokens", [Current, Req]),
{'reply', 'false', State#state{tokens=0}, ?INACTIVITY_TIMEOUT_MS}
{'reply', 'false', State#state{tokens=0}, ?INACTIVITY_TIMEOUT_S * 1000}
end;
handle_call({'tokens'}, _From, #state{tokens=Current}=State) ->
{'reply', Current, State, ?INACTIVITY_TIMEOUT_MS};
{'reply', Current, State, ?INACTIVITY_TIMEOUT_S * 1000};
handle_call(_Request, _From, State) ->
lager:debug("unhandled call: ~p", [_Request]),
{'reply', 'ok', State, ?INACTIVITY_TIMEOUT_MS}.
{'reply', 'ok', State, ?INACTIVITY_TIMEOUT_S * 1000}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -206,21 +206,21 @@ handle_cast({'credit', Req}, #state{tokens=Current
case Current + Req of
N when N > Max ->
lager:debug("credit of ~p tokens overfills, setting to ~p", [Req, Max]),
{'noreply', State#state{tokens=Max}, ?INACTIVITY_TIMEOUT_MS};
{'noreply', State#state{tokens=Max}, ?INACTIVITY_TIMEOUT_S * 1000};
N ->
lager:debug("crediting ~p tokens, now at ~p", [Req, N]),
{'noreply', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS}
{'noreply', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000}
end;
handle_cast({'name', Name}, State) ->
wh_util:put_callid(Name),
lager:debug("updated name to ~p", [Name]),
{'noreply', State, ?INACTIVITY_TIMEOUT_MS};
{'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000};
handle_cast('stop', State) ->
lager:debug("asked to stop"),
{'stop', 'normal', State};
handle_cast(_Msg, State) ->
lager:debug("unhandled cast: ~p", [_Msg]),
{'noreply', State, ?INACTIVITY_TIMEOUT_MS}.
{'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -246,11 +246,11 @@ handle_info({'timeout', Ref, ?TOKEN_FILL_TIME}, #state{max_tokens=Max
,State#state{tokens=add_tokens(Max, Current, FillRate, FillAsBlock, FillTime)
,fill_ref=start_fill_timer(FillRate, FillAsBlock, FillTime)
}
,?INACTIVITY_TIMEOUT_MS
,?INACTIVITY_TIMEOUT_S * 1000
};
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{'noreply', State, ?INACTIVITY_TIMEOUT_MS}.
{'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000}.

%%--------------------------------------------------------------------
%% @private
Expand Down

0 comments on commit 558372a

Please sign in to comment.