Skip to content

Commit

Permalink
Support of sharing load balancing config for subdomains
Browse files Browse the repository at this point in the history
  • Loading branch information
Chandru Mullaparthi committed Jun 21, 2016
1 parent b28542d commit 3fc7e78
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 68 deletions.
61 changes: 38 additions & 23 deletions src/ibrowse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,10 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
#url{host = Host,
port = Port,
protocol = Protocol} = Parsed_url ->
Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
{Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options),
Lb_pid = case ets:lookup(ibrowse_lb, {Lb_host, Lb_port}) of
[] ->
get_lb_pid(Parsed_url);
get_lb_pid({Lb_host, Lb_port});
[#lb_pid{pid = Lb_pid_1}] ->
Lb_pid_1
end,
Expand Down Expand Up @@ -420,17 +421,27 @@ merge_options(Host, Port, Options) ->
end
end, Options, Config_options).

get_lb_pid(Url) ->
gen_server:call(?MODULE, {get_lb_pid, Url}).
get_lb_pid(Key) ->
gen_server:call(?MODULE, {get_lb_pid, Key}).

get_host_port_for_lb(Host, Port, Options) ->
case get_value(use_subdomain_lb_config, Options, undefined) of
undefined ->
{Host, Port};
{Sub_h, Sub_p} ->
{Sub_h, Sub_p}
end.

get_max_sessions(Host, Port, Options) ->
{Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options),
get_value(max_sessions, Options,
get_config_value({max_sessions, Host, Port},
get_config_value({max_sessions, Lb_host, Lb_port},
default_max_sessions())).

get_max_pipeline_size(Host, Port, Options) ->
{Lb_host, Lb_port} = get_host_port_for_lb(Host, Port, Options),
get_value(max_pipeline_size, Options,
get_config_value({max_pipeline_size, Host, Port},
get_config_value({max_pipeline_size, Lb_host, Lb_port},
default_max_pipeline_size())).

get_max_attempts(Host, Port, Options) ->
Expand Down Expand Up @@ -685,7 +696,7 @@ show_dest_status() ->
Metrics = get_metrics(),
lists:foreach(
fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
io:format("~40.40s | ~-5.5s | ~-10.10s | ~p~n",
[Host ++ ":" ++ integer_to_list(Port),
integer_to_list(Tid),
integer_to_list(Size),
Expand Down Expand Up @@ -891,8 +902,8 @@ set_config_value(Key, Val) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
Pid = do_get_connection(Url, ets:lookup(ibrowse_lb, {Host, Port})),
handle_call({get_lb_pid, Key}, _From, State) ->
Pid = do_get_connection(Key, ets:lookup(ibrowse_lb, Key)),
{reply, Pid, State};

handle_call(stop, _From, State) ->
Expand Down Expand Up @@ -948,37 +959,41 @@ handle_cast(_Msg, State) ->
handle_info(all_trace_off, State) ->
Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}],
Trace_on_dests = ets:select(ibrowse_conf, Mspec),
Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) ->
Fun = fun(#lb_pid{host_port = {H, P}, ets_tid = Tid}, _) ->
case lists:member({H, P}, Trace_on_dests) of
false ->
ok;
true ->
catch Pid ! {trace, false}
Fun2 = fun({{_, _, Pid}, _}) ->
catch Pid ! {trace, false}
end,
ets:foldl(Fun2, undefined, Tid)
end;
(_, Acc) ->
Acc
end,
ets:foldl(Fun, undefined, ibrowse_lb),
ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]),
{noreply, State};

handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};

handle_info({trace, Bool, Host, Port}, State) ->
Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _)
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};
(_, Acc) ->
Acc
end,
ets:foldl(Fun, undefined, ibrowse_lb),
case ets:lookup(ibrowse_lb, {Host, Port}) of
[#lb_pid{ets_tid = Tid}] ->
Fun = fun({{_, _, Pid}, _}) ->
catch Pid ! {trace, Bool}
end,
ets:foldl(Fun, undefined, Tid);
_ ->
ok
end,
ets:insert(ibrowse_conf, #ibrowse_conf{key = {trace, Host, Port},
value = Bool}),
{noreply, State};

handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -1001,8 +1016,8 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
do_get_connection(#url{host = Host, port = Port}, []) ->
{ok, Pid} = ibrowse_lb:start_link([Host, Port]),
do_get_connection(Key, []) ->
{ok, Pid} = ibrowse_lb:start_link([Key]),
Pid;
do_get_connection(_Url, [#lb_pid{pid = Pid}]) ->
Pid.
82 changes: 37 additions & 45 deletions src/ibrowse_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
]).

-record(state, {parent_pid,
ets_tid,
ets_tids = [],
host,
port,
max_sessions,
Expand Down Expand Up @@ -63,18 +63,18 @@ start_link(Args) ->
%% ignore |
%% {stop, Reason}
%%--------------------------------------------------------------------
init([Host, Port]) ->
init([{Host, Port}]) ->
process_flag(trap_exit, true),
Max_sessions = ibrowse:get_config_value({max_sessions, Host, Port}, 10),
Max_pipe_sz = ibrowse:get_config_value({max_pipeline_size, Host, Port}, 10),
put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
put(ibrowse_trace_token, ["LB: ", Host, $:, integer_to_list(Port)]),
State = #state{parent_pid = whereis(ibrowse),
host = Host,
port = Port,
max_pipeline_size = Max_pipe_sz,
max_sessions = Max_sessions},
State_1 = maybe_create_ets(State),
host = Host,
port = Port,
max_pipeline_size = Max_pipe_sz,
max_sessions = Max_sessions},
{ok, State_1, _} = maybe_create_ets(State),
{ok, State_1}.

spawn_connection(Lb_pid, Url,
Expand Down Expand Up @@ -107,24 +107,21 @@ stop(Lb_pid) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------

handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
gen_server:reply(_From, ok),
{stop, normal, State};

handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
stop_all_conn_procs(Tid),
handle_call(stop, _From, #state{ets_tids = Tids} = State) ->
stop_all_conn_procs(Tids),
gen_server:reply(_From, ok),
{stop, normal, State};

handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
{reply, {error, shutting_down}, State};

handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
State) ->
State_1 = maybe_create_ets(State),
Tid = State_1#state.ets_tid,
Tid_size = ets:info(Tid, size),
case Tid_size >= Max_sess of
#state{ets_tids = Tids} = State) ->
{ok, State_1, Tid} = maybe_create_ets(Url#url.host, Url#url.port, State),
Sess_count = lists:foldl(fun({_X, X_tid}, Acc) ->
Acc + ets:info(X_tid, size)
end, 0, Tids),
case Sess_count >= Max_sess of
true ->
Reply = find_best_connection(Tid, Max_pipe),
{reply, Reply, State_1#state{max_sessions = Max_sess,
Expand Down Expand Up @@ -159,20 +156,6 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------

handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
put(my_trace_flag, Bool),
{noreply, State};

handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
ets:foldl(fun({{_, Pid}, _}, Acc) when is_pid(Pid) ->
catch Pid ! {trace, Bool},
Acc;
(_, Acc) ->
Acc
end, undefined, Tid),
put(my_trace_flag, Bool),
{noreply, State};

handle_info(timeout, State) ->
%% We can't shutdown the process immediately because a request
%% might be in flight. So we first remove the entry from the
Expand All @@ -193,16 +176,18 @@ handle_info(_Info, State) ->
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, #state{host = Host, port = Port, ets_tid = Tid} = _State) ->
terminate(_Reason, #state{host = Host, port = Port, ets_tids = Tids} = _State) ->
catch ets:delete(ibrowse_lb, {Host, Port}),
stop_all_conn_procs(Tid),
stop_all_conn_procs(Tids),
ok.

stop_all_conn_procs(Tid) ->
ets:foldl(fun({{_, _, Pid}, _}, Acc) ->
ibrowse_http_client:stop(Pid),
Acc
end, [], Tid).
stop_all_conn_procs(Tids) ->
lists:foreach(
fun({_, Tid}) ->
ets:foldl(fun({{_, _, Pid}, _}, _) ->
ibrowse_http_client:stop(Pid)
end, [], Tid)
end, Tids).

%%--------------------------------------------------------------------
%% Func: code_change/3
Expand All @@ -225,9 +210,16 @@ find_best_connection(Tid, Max_pipe) ->
{error, retry_later}
end.

maybe_create_ets(#state{ets_tid = undefined, host = Host, port = Port} = State) ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}),
State#state{ets_tid = Tid};
maybe_create_ets(State) ->
State.
maybe_create_ets(#state{host = Host, port = Port} = State) ->
maybe_create_ets(Host, Port, State).

maybe_create_ets(Host, Port, #state{ets_tids = Tids} = State) ->
case lists:keysearch({Host, Port}, 1, Tids) of
false ->
Tid = ets:new(ibrowse_lb, [public, ordered_set]),
ets:insert(ibrowse_lb, #lb_pid{host_port = {Host, Port}, pid = self(), ets_tid = Tid}),
Tids_1 = [{{Host, Port}, Tid} | Tids],
{ok, State#state{ets_tids = Tids_1}, Tid};
{value, {_, Tid}} ->
{ok, State, Tid}
end.

0 comments on commit 3fc7e78

Please sign in to comment.