Skip to content

Commit

Permalink
Merge pull request #39 from gbour/feat-monitoring
Browse files Browse the repository at this point in the history
VM & Wave metrics
  • Loading branch information
Guillaume Bour authored Jun 16, 2016
2 parents 513bc1f + 37d1281 commit 8780246
Show file tree
Hide file tree
Showing 16 changed files with 490 additions and 19 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ env=prod
# '/my/file': debug written in '/my/file'
DEBUG=1

EXOMETER_PACKAGES="(minimal)"
export EXOMETER_PACKAGES

##
## -*- RULES -*-
##
Expand Down
20 changes: 12 additions & 8 deletions apps/wave/src/mqtt_ranch_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ start_link(Ref, Socket, Transport, Opts) ->
-spec init(Ref::ranch:ref(), Socket::ranch_socket(), Transport::ranch_transport(), Opts::any()) -> ok.
init(Ref, Socket, Transport, _Opts = []) ->
accept(Transport, Ref),
exometer:update([wave,connections,Transport:name()], 1),

{ok, {Ip,Port}} = peername(Transport, Socket),
Addr = #addr{transport=Transport:name(), ip=inet:ntoa(Ip), port=Port},
Expand All @@ -63,7 +64,7 @@ loop(Socket, Transport, Session, Buffer, Length) ->

{error, timeout} ->
lager:notice("socket timeout. Sending MQTT PINGREQ"),
Transport:send(Socket, mqtt_msg:encode(#mqtt_msg{type='PINGREQ'})),
send(Transport, Socket, #mqtt_msg{type='PINGREQ'}),
loop(Socket, Transport, Session, Buffer, Length);

% socket closed by peer
Expand Down Expand Up @@ -102,8 +103,7 @@ route(Socket, Transport, Session, Raw) ->
% special error case: in case of wrong protocol version, the broker MUST return
% a CONNACK packet with 0x01 error code
% we bypass mqtt_session in this case
Transport:send(Socket, mqtt_msg:encode(
#mqtt_msg{type='CONNACK', payload=[{retcode, 1}]})),
send(Transport, Socket, #mqtt_msg{type='CONNACK', payload=[{retcode, 1}]}),
?GENFSM_STOP(Session, normal, 50),
Transport:close(Socket),
stop;
Expand All @@ -116,12 +116,12 @@ route(Socket, Transport, Session, Raw) ->

{ok, Msg, Rest} ->
lager:debug("IN> ~p", [Msg]),
exometer:update([wave,packets,received], 1),

%case answer(Msg) of
case mqtt_session:handle(Session, Msg) of
{ok, Resp=#mqtt_msg{}} ->
Res = Transport:send(Socket, mqtt_msg:encode(Resp)),
lager:debug("OUT[~p], < ~p", [Res, Resp]),
send(Transport, Socket, Resp),
route(Socket, Transport, Session, Rest);

{ok, undefined} ->
Expand All @@ -135,7 +135,7 @@ route(Socket, Transport, Session, Raw) ->

% send message then close connection
{ok, {disconnect, M=#mqtt_msg{}}} ->
Res = Transport:send(Socket, mqtt_msg:encode(M)),
Res = send(Transport, Socket, M),
lager:debug("OUT[disconnect: ~p] ~p", [Res, M]),
stop
end;
Expand All @@ -160,7 +160,7 @@ route(Socket, Transport, Session, Raw) ->
-spec ping(ranch_transport(), ranch_socket()) -> ok | {error, term()}.
ping(Transport, Socket) ->
Msg = #mqtt_msg{type='PINGREQ'},
Transport:send(Socket, mqtt_msg:encode(Msg)).
send(Transport, Socket, Msg).

% send kindof TCP keepalive
%
Expand All @@ -172,7 +172,11 @@ crlfping(T, S) ->
%
-spec send(ranch_transport(), ranch_socket(), mqtt_msg()) -> ok | {error, term()}.
send(Transport, Socket, Msg) ->
Transport:send(Socket, mqtt_msg:encode(Msg)).
Res = Transport:send(Socket, mqtt_msg:encode(Msg)),
lager:debug("OUT[~p], < ~p", [Res, Msg]),
exometer:update([wave,packets,sent], 1),

Res.

% close underlying socket
%
Expand Down
22 changes: 17 additions & 5 deletions apps/wave/src/mqtt_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ connected(#mqtt_msg{type='PINGRESP'}, _, StateData=#session{pingid=_Ref,keepaliv
connected(Msg=#mqtt_msg{type='PUBLISH', qos=0}, _,
StateData=#session{deviceid=DeviceID,keepalive=Ka,opts=Opts}) ->
% only if retain=1
exometer:update([wave,messages,in,0], 1),
mqtt_retain:store(Msg),

%TODO: save message in DB
Expand All @@ -309,9 +310,10 @@ connected(Msg=#mqtt_msg{type='PUBLISH', qos=0}, _,
{reply, undefined, connected, StateData, Ka};

% qos > 0
connected(Msg=#mqtt_msg{type='PUBLISH', payload=P, dup=Dup}, _,
connected(Msg=#mqtt_msg{type='PUBLISH', payload=P, qos=Qos, dup=Dup}, _,
StateData=#session{deviceid=DeviceID,keepalive=Ka,inflight=Inflight,opts=Opts}) ->

exometer:update([wave,messages,in,Qos], 1),
%TODO: save message in DB
MsgID = proplists:get_value(msgid, P),
{Inflight2, StatusCode} = case proplists:get_value(MsgID, Inflight) of
Expand All @@ -321,7 +323,8 @@ connected(Msg=#mqtt_msg{type='PUBLISH', payload=P, dup=Dup}, _,
% pass MsgID to message_worker
{ok, MsgWorker} = supervisor:start_child(wave_msgworkers_sup, []),
mqtt_message_worker:publish(MsgWorker, self(), Msg#mqtt_msg{retain=0}), % async


exometer:update([wave,messages,inflight], 1),
{[{MsgID, MsgWorker} | Inflight], 200};

% message is already inflight
Expand Down Expand Up @@ -473,6 +476,7 @@ connected({timeout, _, timeout1}, _StateData) ->
connected({publish, _, _, {Topic, _}, Content, Qos=0, Retain},
StateData=#session{transport={Callback,Transport,Socket},keepalive=Ka}) ->
lager:debug("send PUBLISH(topic=~p, qos=~p)", [Topic, Qos]),
exometer:update([wave,messages,out,0], 1),

Msg = #mqtt_msg{type='PUBLISH', qos=Qos, retain=Retain, payload=[{topic,Topic}, {content, Content}]},
State = Callback:send(Transport, Socket, Msg),
Expand All @@ -487,14 +491,17 @@ connected({publish, _, _, {Topic, _}, Content, Qos=0, Retain},
connected({publish, MsgID, From, {Topic,_}, Content, Qos, Retain},
StateData=#session{transport={Callback,Transport,Socket},keepalive=Ka,inflight=Inflight}) ->
lager:debug("send PUBLISH(topic=~p, msgid=~p, qos=~p)", [Topic, MsgID, Qos]),
exometer:update([wave,messages,out,Qos], 1),

Msg = #mqtt_msg{type='PUBLISH', qos=Qos, retain=Retain,
payload=[{topic,Topic}, {msgid, MsgID}, {content, Content}]},
State = Callback:send(Transport, Socket, Msg),

case State of
{error, _Err} -> {stop, normal};
ok -> {next_state, connected, StateData#session{inflight=[{MsgID, From}|Inflight]}, Ka}
ok ->
exometer:update([wave,messages,inflight], 1),
{next_state, connected, StateData#session{inflight=[{MsgID, From}|Inflight]}, Ka}
end;

%
Expand Down Expand Up @@ -549,8 +556,10 @@ connected({ack, MsgID, _Qos=2, _}, StateData=#session{transport={Callback,Transp
%TODO: what if peer disconnected between ack received and message landed ?
% do a pre-check when message received (qos1 = PUBACK, qos2 = PUBCOMP) ?
connected({'msg-landed', MsgID}, StateData=#session{keepalive=Ka, inflight=Inflight,
deviceid=DeviceID}) ->
deviceid=_DeviceID}) ->
lager:debug("#~p message-id is no more in-flight", [MsgID]),
exometer:update([wave,messages,inflight], -1),

{next_state, connected, StateData#session{inflight=proplists:delete(MsgID, Inflight)}, Ka};

% KeepAlive was set and no PINREG (or any other ctrl packet) received in the interval
Expand Down Expand Up @@ -607,7 +616,9 @@ terminate(_Reason, StateName, StateData=#session{deviceid=DeviceID, topics=T, in
[DeviceID, _Reason, StateName, StateData]),

if
length(Inflight) > 0 -> lager:notice("~p: remaining inflight messages: ~p", [DeviceID, Inflight]);
length(Inflight) > 0 ->
exometer:update([wave,messages,inflight], -length(Inflight)),
lager:notice("~p: remaining inflight messages: ~p", [DeviceID, Inflight]);
true -> ok
end,

Expand All @@ -624,6 +635,7 @@ terminate(_Reason, StateName, StateData=#session{deviceid=DeviceID, topics=T, in
% saving topics if clean unset
offline_store(DeviceID, maps:get(clean, Opts, 1), T),
send_last_will(StateData),

terminate.

code_change(_OldVsn, StateName, StateData, _Extra) ->
Expand Down
15 changes: 14 additions & 1 deletion apps/wave/src/mqtt_topic_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


%
-export([dump/0, subscribe/3, unsubscribe/1, unsubscribe/2, match/1]).
-export([count/0, dump/0, subscribe/3, unsubscribe/1, unsubscribe/2, match/1]).
-ifdef(DEBUG).
-export([debug_cleanup/0]).
-endif.
Expand All @@ -49,6 +49,7 @@ start_link() ->
gen_server:start_link({local,?MODULE}, ?MODULE, [], []).

init(_) ->
exometer:update([wave,subscriptions], 0),
{ok, #state{}}.

%%
Expand Down Expand Up @@ -79,6 +80,10 @@ match(Name) ->
gen_server:call(?MODULE, {match, Name}).


-spec count() -> {ok, integer()}.
count() ->
{ok, gen_server:call(?MODULE, count)}.

-spec dump() -> ok.
dump() ->
gen_server:call(?MODULE, dump).
Expand All @@ -93,12 +98,17 @@ debug_cleanup() ->
%% PRIVATE API
%%

handle_call(count, _, State=#state{subscriptions=S}) ->
{reply, erlang:length(S), State};

handle_call(dump, _, State=#state{subscriptions=S}) ->
priv_dump(S),
{reply, ok, State};

handle_call(debug_cleanup, _, _State) ->
lager:warning("clearing registry"),
exometer:update([wave,subscriptions], 0),

{reply, ok, #state{}};

handle_call({subscribe, Topic, Qos, Subscriber}, _, State=#state{subscriptions=Subscriptions}) ->
Expand All @@ -113,6 +123,7 @@ handle_call({subscribe, Topic, Qos, Subscriber}, _, State=#state{subscriptions=S

{Reply, S2} = case lists:filter(fun({T,F,_,S}) -> {T,F,S} =:= {TopicName,Fields,Subscriber} end, Subscriptions) of
[] ->
exometer:update([wave,subscriptions], length(Subscriptions)+1),
{ok, Subscriptions ++ [{TopicName,Fields,Qos,Subscriber}]};

_ ->
Expand All @@ -124,6 +135,7 @@ handle_call({subscribe, Topic, Qos, Subscriber}, _, State=#state{subscriptions=S

handle_call({unsubscribe, Subscriber}, _, State=#state{subscriptions=S}) ->
S2 = priv_unsubscribe(Subscriber, S, []),
exometer:update([wave,subscriptions], length(S2)),
{reply, ok, State#state{subscriptions=S2}};

handle_call({unsubscribe, TopicName, Subscriber}, _, State=#state{subscriptions=S}) ->
Expand All @@ -134,6 +146,7 @@ handle_call({unsubscribe, TopicName, Subscriber}, _, State=#state{subscriptions=
end, S
),

exometer:update([wave,subscriptions], length(S2)),
%lager:debug("unsub2 ~p / ~p", [S, S2]),
{reply, ok, State#state{subscriptions=S2}};

Expand Down
3 changes: 2 additions & 1 deletion apps/wave/src/wave.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
sharded_eredis,
lager,
jiffy,
cowboy
cowboy,
exometer
]},
{env, []},
{modules, []},
Expand Down
18 changes: 18 additions & 0 deletions apps/wave/src/wave_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ start(_StartType, _StartArgs) ->
], [{env, [{dispatch, Dispatch}]}]),
%websocket_sup:start_link().

exometer_init(),
{ok, WaveSup}.

stop(_State) ->
Expand Down Expand Up @@ -149,6 +150,23 @@ loglevel(Level) ->
lager:set_loglevel(lager_console_backend, Level).


%
% automatically subscribe to metrics (sent to statsd)
%
exometer_init() ->
{ok, Interval} = application:get_env(exometer,interval),
exometer_init(exometer:get_values(['_']), Interval),
ok.

exometer_init([], _) ->
ok;
exometer_init([{Name, Metrics}|T], Interval) ->
DPs = lists:map(fun({DP,_}) -> DP end, Metrics),
exometer_report:subscribe(exometer_report_statsd, Name, DPs, Interval),

exometer_init(T, Interval).


-ifdef(DEBUG).
debug_cleanup() ->
mqtt_topic_registry:debug_cleanup(),
Expand Down
11 changes: 10 additions & 1 deletion apps/wave/src/wave_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

-export([get/1, set/2, set/3, del/1]).
-export([incr/1, decr/1]).
-export([append/2, push/2, pop/1, range/1, del/2, search/1, exists/1]).
-export([append/2, push/2, pop/1, range/1, del/2, search/1, exists/1, count/1]).

-type return() :: {ok, Value::eredis:return_value()} | {error, Reason::binary()}.

Expand Down Expand Up @@ -136,6 +136,15 @@ pop(List) ->
range(List) ->
sharded_eredis:q(["LRANGE", List, 0, -1]).

-spec count(binary()) -> integer() | return().
count(Key) ->
%NOTE: 'KEYS' is locking
case sharded_eredis:q(["EVAL", <<"return #redis.pcall('KEYS', '",Key/binary,"')">>, 0]) of
{ok, Res} ->
{ok, wave_utils:int(Res)};
Err -> Err
end.


%%
%%
Expand Down
Loading

0 comments on commit 8780246

Please sign in to comment.