diff --git a/core/kazoo_token_buckets-1.0.0/src/kz_buckets.erl b/core/kazoo_token_buckets-1.0.0/src/kz_buckets.erl index 0fe86946a7a..2f8bd689803 100644 --- a/core/kazoo_token_buckets-1.0.0/src/kz_buckets.erl +++ b/core/kazoo_token_buckets-1.0.0/src/kz_buckets.erl @@ -1,5 +1,5 @@ %%%------------------------------------------------------------------- -%%% @copyright (C) 2014, 2600Hz INC +%%% @copyright (C) 2014-2015, 2600Hz INC %%% @doc %%% API interface for buckets %%% ETS writer for table @@ -55,7 +55,7 @@ -record(bucket, {key :: {ne_binary(), ne_binary()} | '_' ,srv :: pid() | '$1' | '_' ,ref :: reference() | '$2' | '_' - ,accessed = os:timestamp() :: wh_now() | '_' + ,accessed = wh_util:now_s(os:timestamp()) :: gregorian_seconds() | '$1' | '_' }). -type bucket() :: #bucket{}. @@ -207,20 +207,38 @@ start_bucket(App, Name, MaxTokens, FillRate, FillTime) -> gen_server:call(?MODULE, {'start', App, Name, MaxTokens, FillRate, FillTime}) end. +-define(TOKEN_FORMAT_STRING, " ~20s | ~50.50s | ~15.15s | ~6.6s | ~20.20s |~n"). -spec tokens() -> 'ok'. tokens() -> - io:format("~60.60s | ~20.20s | ~10.10s | ~20.20s |~n", [<<"Key">>, <<"Pid">>, <<"Tokens">>, <<"Last Accessed">>]), - tokens_traverse(ets:first(table_id())). -tokens_traverse('$end_of_table') -> - io:format("~s~n", [<<"No more token servers">>]); -tokens_traverse(Key) -> - [#bucket{key=K, srv=P, accessed=Accessed}] = ets:lookup(table_id(), Key), - io:format("~60.60s | ~20.20s | ~10.10s | ~20.20s |~n" - ,[K, pid_to_list(P), integer_to_list(kz_token_bucket:tokens(P)) - ,integer_to_list(wh_util:elapsed_s(Accessed)) + io:format(?TOKEN_FORMAT_STRING + ,[<<"Application">>, <<"Key">>, <<"Pid">>, <<"Tokens">>, <<"Last Accessed">>] + ), + + lists:foldl(fun print_bucket_info/2 + ,'undefined' + ,lists:keysort(#bucket.key, ets:tab2list(table_id())) + ), + 'ok'. + +print_bucket_info(#bucket{key={CurrentApp, Name} + ,srv=P + ,accessed=Accessed + } + ,CurrentApp) -> + io:format(?TOKEN_FORMAT_STRING + ,["" + ,Name + ,pid_to_list(P) + ,integer_to_list(kz_token_bucket:tokens(P)) + ,wh_util:pretty_print_elapsed_s(wh_util:elapsed_s(Accessed)) ] ), - tokens_traverse(ets:next(table_id(), Key)). + CurrentApp; +print_bucket_info(#bucket{key={App, _}}=Bucket, _OldApp) -> + io:format(?TOKEN_FORMAT_STRING + ,[App, "", "", "" ,""] + ), + print_bucket_info(Bucket, App). %%%=================================================================== %%% ETS @@ -306,7 +324,7 @@ handle_cast(_Req, #state{table_id='undefined'}=State) -> lager:debug("ignoring req: ~p", [_Req]), {'noreply', State}; handle_cast({'bucket_accessed', Key}, State) -> - ets:update_element(table_id(), Key, {#bucket.accessed, os:timestamp()}), + ets:update_element(table_id(), Key, {#bucket.accessed, wh_util:now_s(os:timestamp())}), {'noreply', State}; handle_cast(_Msg, State) -> {'noreply', State}. @@ -387,35 +405,16 @@ start_inactivity_timer() -> erlang:send_after(?MILLISECONDS_IN_MINUTE, self(), ?INACTIVITY_MSG). -spec check_for_inactive_buckets() -> 'ok'. --spec check_for_inactive_buckets(wh_now(), pos_integer(), {ne_binary(), ne_binary()} | '$end_of_table') -> 'ok'. - check_for_inactive_buckets() -> wh_util:put_callid(?MODULE), - Now = os:timestamp(), - InactivityTimeout = ?INACTIVITY_TIMEOUT_MS, - check_for_inactive_buckets(Now, InactivityTimeout, ets:first(table_id())). - -check_for_inactive_buckets(_Now, _InactivityTimeout, '$end_of_table') -> 'ok'; -check_for_inactive_buckets(Now, InactivityTimeout, {App, Key}) -> - case get_bucket(App, Key, 'record') of - 'undefined' -> 'ok'; - Bucket -> - maybe_stop_bucket(Now, InactivityTimeout, Bucket) - end, - check_for_inactive_buckets(Now, InactivityTimeout, ets:next(table_id(), {App, Key})). - --spec maybe_stop_bucket(wh_now(), pos_integer(), bucket()) -> 'ok'. -maybe_stop_bucket(Now - ,InactivityTimeout - ,#bucket{accessed=Accessed - ,srv=Srv - ,key=Key - } - ) -> - case wh_util:elapsed_ms(Accessed, Now) > InactivityTimeout of - 'false' -> 'ok'; - 'true' -> - lager:debug("bucket ~p(~p) hasn't been accessed recently, stopping", [Key, Srv]), - kz_token_bucket:stop(Srv), - 'ok' + Now = wh_util:now_s(os:timestamp()), + InactivityTimeout = ?INACTIVITY_TIMEOUT_S, + + MS = [{#bucket{accessed='$1', srv='$2', _='_'} + ,[{'<', '$1', {'const', Now-InactivityTimeout}}] + ,['$2'] + }], + case [kz_token_bucket:stop(Srv) || Srv <- ets:select(?MODULE:table_id(), MS)] of + [] -> 'ok'; + L -> lager:debug("stopped ~p servers", [length(L)]) end. diff --git a/core/kazoo_token_buckets-1.0.0/src/kz_buckets.hrl b/core/kazoo_token_buckets-1.0.0/src/kz_buckets.hrl index 0ded76ed2a9..ef37203ff9b 100644 --- a/core/kazoo_token_buckets-1.0.0/src/kz_buckets.hrl +++ b/core/kazoo_token_buckets-1.0.0/src/kz_buckets.hrl @@ -6,9 +6,8 @@ -define(APP_NAME, <<"token_buckets">>). -define(DEFAULT_APP, <<"default">>). --define(INACTIVITY_TIMEOUT_MS +-define(INACTIVITY_TIMEOUT_S ,whapps_config:get_integer(?APP_NAME, <<"inactivity_timeout_s">>, ?SECONDS_IN_MINUTE * 10) - * ?MILLISECONDS_IN_SECOND ). -define(INACTIVITY_MSG, 'inactivity_timeout'). diff --git a/core/kazoo_token_buckets-1.0.0/src/kz_token_bucket.erl b/core/kazoo_token_buckets-1.0.0/src/kz_token_bucket.erl index 263747496a6..c48579df122 100644 --- a/core/kazoo_token_buckets-1.0.0/src/kz_token_bucket.erl +++ b/core/kazoo_token_buckets-1.0.0/src/kz_token_bucket.erl @@ -1,5 +1,5 @@ %%%------------------------------------------------------------------- -%%% @copyright (C) 2013-2014, 2600Hz +%%% @copyright (C) 2013-2015, 2600Hz %%% @doc %%% Implementation of a token bucket as gen_server %%% https://en.wikipedia.org/wiki/Token_bucket#The_token_bucket_algorithm @@ -149,7 +149,7 @@ init([Max, FillRate, FillAsBlock, FillTime]) -> ,fill_as_block=FillAsBlock ,tokens=Max } - ,?INACTIVITY_TIMEOUT_MS + ,?INACTIVITY_TIMEOUT_S * 1000 }. %%-------------------------------------------------------------------- @@ -170,25 +170,25 @@ handle_call({'consume', Req}, _From, #state{tokens=Current}=State) -> case Current - Req of N when N >= 0 -> lager:debug("consumed ~p, ~p left", [Req, N]), - {'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS}; + {'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000}; _ -> lager:debug("not enough tokens (~p) to consume ~p", [Current, Req]), - {'reply', 'false', State, ?INACTIVITY_TIMEOUT_MS} + {'reply', 'false', State, ?INACTIVITY_TIMEOUT_S * 1000} end; handle_call({'consume_until', Req}, _From, #state{tokens=Current}=State) -> case Current - Req of N when N >= 0 -> lager:debug("consumed ~p, ~p left", [Req, N]), - {'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS}; + {'reply', 'true', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000}; _N -> lager:debug("not enough tokens (~p) to consume ~p, zeroing out tokens", [Current, Req]), - {'reply', 'false', State#state{tokens=0}, ?INACTIVITY_TIMEOUT_MS} + {'reply', 'false', State#state{tokens=0}, ?INACTIVITY_TIMEOUT_S * 1000} end; handle_call({'tokens'}, _From, #state{tokens=Current}=State) -> - {'reply', Current, State, ?INACTIVITY_TIMEOUT_MS}; + {'reply', Current, State, ?INACTIVITY_TIMEOUT_S * 1000}; handle_call(_Request, _From, State) -> lager:debug("unhandled call: ~p", [_Request]), - {'reply', 'ok', State, ?INACTIVITY_TIMEOUT_MS}. + {'reply', 'ok', State, ?INACTIVITY_TIMEOUT_S * 1000}. %%-------------------------------------------------------------------- %% @private @@ -206,21 +206,21 @@ handle_cast({'credit', Req}, #state{tokens=Current case Current + Req of N when N > Max -> lager:debug("credit of ~p tokens overfills, setting to ~p", [Req, Max]), - {'noreply', State#state{tokens=Max}, ?INACTIVITY_TIMEOUT_MS}; + {'noreply', State#state{tokens=Max}, ?INACTIVITY_TIMEOUT_S * 1000}; N -> lager:debug("crediting ~p tokens, now at ~p", [Req, N]), - {'noreply', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_MS} + {'noreply', State#state{tokens=N}, ?INACTIVITY_TIMEOUT_S * 1000} end; handle_cast({'name', Name}, State) -> wh_util:put_callid(Name), lager:debug("updated name to ~p", [Name]), - {'noreply', State, ?INACTIVITY_TIMEOUT_MS}; + {'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000}; handle_cast('stop', State) -> lager:debug("asked to stop"), {'stop', 'normal', State}; handle_cast(_Msg, State) -> lager:debug("unhandled cast: ~p", [_Msg]), - {'noreply', State, ?INACTIVITY_TIMEOUT_MS}. + {'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000}. %%-------------------------------------------------------------------- %% @private @@ -246,11 +246,11 @@ handle_info({'timeout', Ref, ?TOKEN_FILL_TIME}, #state{max_tokens=Max ,State#state{tokens=add_tokens(Max, Current, FillRate, FillAsBlock, FillTime) ,fill_ref=start_fill_timer(FillRate, FillAsBlock, FillTime) } - ,?INACTIVITY_TIMEOUT_MS + ,?INACTIVITY_TIMEOUT_S * 1000 }; handle_info(_Info, State) -> lager:debug("unhandled message: ~p", [_Info]), - {'noreply', State, ?INACTIVITY_TIMEOUT_MS}. + {'noreply', State, ?INACTIVITY_TIMEOUT_S * 1000}. %%-------------------------------------------------------------------- %% @private