Skip to content

Commit

Permalink
Switch to grpcbox
Browse files Browse the repository at this point in the history
Use grpcbox as a gRPC client library, because
it is much better maintained.

Shows good performance and generation of protobuf
files is simple and automatic.

Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
  • Loading branch information
drasko committed Oct 29, 2019
1 parent 8f3dff8 commit d11b53f
Show file tree
Hide file tree
Showing 19 changed files with 3,039 additions and 3,709 deletions.
32 changes: 3 additions & 29 deletions mqtt/verne/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,36 +70,10 @@ services:
```

### Native
#### Prepare
Install [gpb](https://github.com/tomas-abrahamsson/gpb)
#### Generapte protobuf files
```
git clone https://github.com/tomas-abrahamsson/gpb.git
cd gpb
git checkout 4.10.5
make -j 16
```
Then generate Erlang proto files:
```
mkdir -p ./src/proto
./gpb/bin/protoc-erl -pkgs -maps -I ./gpb/ ../../*.proto -o ./src/proto
cp ./gpb/include/gpb.hrl ./src/proto/
```
If gRPC us used for auth (not enabled yet, to be enabled in the future):
```
git clone https://github.com/Bluehouse-Technology/grpc_client.git
cd grpc_client && make -j 16
make shell
```
Then in Erlang shell:
```
1> grpc_client:compile("../../../internal.proto", [{use_packages, true}]).
```
Outside of shell:
```
mv ./internal_client.erl ../src/proto
mkdir -p src/protos
rebar3 grpc gen
```
#### Compile
Expand Down
8 changes: 7 additions & 1 deletion mqtt/verne/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
{vernemq_dev, {git, "git://github.com/erlio/vernemq_dev.git", {branch, "master"}}},
{teacup_nats, "0.4.1"},
{gpb, "4.10.5"},
{grpc_client, {git, "https://github.com/galaxie/grpc_client", {branch, "fix/package_name_duplicates"}}},
{grpcbox, "0.11.0"},
{poolboy, "1.5.2"},
{eredis, "1.2.0"}
]}.

{grpc, [{protos, "../.."},
{out_dir, "src/protos"},
{gpb_opts, [{module_name_suffix, "_pb"}]}]}.

{plugins, [grpcbox_plugin]}.
6 changes: 3 additions & 3 deletions mqtt/verne/src/mfx_auth.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
stdlib,
teacup,
gpb,
grpc_client,
poolboy,
eredis
grpcbox,
eredis,
poolboy
]},
{mod, { mfx_auth_app, []}},
{env, [
Expand Down
5 changes: 1 addition & 4 deletions mqtt/verne/src/mfx_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
on_client_gone/1
]).

-include("proto/message.hrl").

%% This file demonstrates the hooks you typically want to use
%% if your plugin deals with Authentication or Authorization.
%%
Expand All @@ -41,7 +39,6 @@ identify(Password) ->
poolboy:checkin(grpc_pool, Worker),
Result.


access(UserName, ChannelId) ->
error_logger:info_msg("access: ~p ~p", [UserName, ChannelId]),
AccessByIdReq = #{thingID => binary_to_list(UserName), chanID => binary_to_list(ChannelId)},
Expand Down Expand Up @@ -126,7 +123,7 @@ auth_on_publish(UserName, {_MountPoint, _ClientId} = SubscriberId, QoS, Topic, P
contentType => ContentType,
payload => Payload
},
mfx_nats:publish(NatsSubject, message:encode_msg(RawMessage, 'mainflux.RawMessage')),
mfx_nats:publish(NatsSubject, message_pb:encode_msg(RawMessage, raw_message)),
ok;
Other ->
error_logger:info_msg("Error auth: ~p", [Other]),
Expand Down
12 changes: 9 additions & 3 deletions mqtt/verne/src/mfx_auth_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ start(_StartType, _StartArgs) ->
false -> "";
InstanceEnv -> InstanceEnv
end,
PoolSize = case os:getenv("MF_MQTT_VERNEMQ_GRPC_POOL_SIZE") of
false ->
10;
PoolSizeEnv ->
{PoolSizeInt, _PoolSizeRest} = string:to_integer(PoolSizeEnv),
PoolSizeInt
end,


ets:insert(mfx_cfg, [
{grpc_url, GrpcUrl},
{nats_url, NatsUrl},
Expand All @@ -42,8 +48,8 @@ start(_StartType, _StartArgs) ->
% Also, init one ETS table for keeping the #{ClientId => Username} mapping
ets:new(mfx_client_map, [set, named_table, public]),

% Start the process
mfx_auth_sup:start_link().
% Start the MFX Auth process
mfx_auth_sup:start_link(PoolSize).

stop(_State) ->
ok.
Expand Down
16 changes: 4 additions & 12 deletions mqtt/verne/src/mfx_auth_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-behaviour(supervisor).

%% API
-export([start_link/0]).
-export([start_link/1]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -15,22 +15,14 @@
%% API functions
%% ===================================================================

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_link(PoolSize) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolSize]).

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init([]) ->
PoolSize = case os:getenv("MF_MQTT_VERNEMQ_GRPC_POOL_SIZE") of
false ->
10;
PoolSizeEnv ->
{PoolSizeInt, _PoolSizeRest} = string:to_integer(PoolSizeEnv),
PoolSizeInt
end,

init([PoolSize]) ->
SizeArgs = [{size, PoolSize}, {max_overflow, PoolSize * 1.5}],
PoolArgs = [{name, {local, grpc_pool}}, {worker_module, mfx_grpc}],
WorkerArgs = [],
Expand Down
88 changes: 27 additions & 61 deletions mqtt/verne/src/mfx_grpc.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
-module(mfx_grpc).
-behaviour(gen_server).
-behaviour(poolboy_worker).

-export([
start_link/0,
Expand All @@ -12,83 +11,50 @@
terminate/2
]).

-record(state, {conn}).
-record(state, {channel}).

init(_Args) ->
error_logger:info_msg("mfx_grpc genserver has started (~w)~n", [self()]),
[{_, GrpcUrl}] = ets:lookup(mfx_cfg, grpc_url),
{ok, {_, _, GrpcHost, GrpcPort, _, _}} = http_uri:parse(GrpcUrl),
error_logger:info_msg("grpc host: ~p, port: ~p", [GrpcHost, GrpcPort]),
{ok, GrpcConn} = grpc_client:connect(tcp, GrpcHost, GrpcPort),
{ok, #state{conn = GrpcConn}}.
error_logger:info_msg("gRPC host: ~p, port: ~p", [GrpcHost, GrpcPort]),
Channel = list_to_atom(pid_to_list(self())),
grpcbox_channel_sup:start_child(Channel, [{http, GrpcHost, GrpcPort, []}], #{}),
{ok, #state{channel = Channel}}.

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

handle_call({identify, Message}, _From, #state{conn = GrpcConn} = State) ->
error_logger:info_msg("mfx_grpc message: ~p", [Message]),
{Status, Result} = internal_client:'IdentifyThing'(GrpcConn, Message, []),
case Status of
ok ->
#{
grpc_status := 0,
headers := #{<<":status">> := <<"200">>},
http_status := HttpStatus,
result :=
#{value := ThingId},
status_message := <<>>,
trailers := #{<<"grpc-status">> := <<"0">>}
} = Result,

case HttpStatus of
200 ->
{reply, {ok, list_to_binary(ThingId)}, State};
_ ->
{reply, {error, HttpStatus}, error}
end;
_ ->
{reply, {error, Status}, State}
end;
handle_call({can_access_by_id, Message}, _From, #state{conn = GrpcConn} = State) ->
error_logger:info_msg("mfx_grpc message: ~p", [Message]),
{Status, Result} = internal_client:'CanAccessByID'(GrpcConn, Message, []),
case Status of
ok ->
#{
grpc_status := 0,
headers := #{
<<":status">> := <<"200">>,
<<"content-type">> := <<"application/grpc+proto">>
},
http_status := HttpStatus,
result := #{},
status_message := <<>>,
trailers := #{
<<"grpc-message">> := <<>>,
<<"grpc-status">> := <<"0">>}
} = Result,

case HttpStatus of
200 ->
{reply, ok, State};
_ ->
{reply, {error, HttpStatus}, State}
end;

_ ->
{reply, {error, Status}, State}
end.
handle_call({identify, Message}, _From, #state{channel = Channel} = State) ->
error_logger:info_msg("mfx_grpc message: ~p, channel: ~p", [Message, Channel]),
{ok, Resp, HeadersAndTrailers} = mainflux_things_service_client:identify(Message, #{channel => Channel}),
case maps:get(<<":status">>, maps:get(headers, HeadersAndTrailers)) of
<<"200">> ->
{reply, {ok, maps:get(value, Resp)}, State};
ErrorStatus ->
{reply, {error, ErrorStatus}, State}
end;

handle_call({can_access_by_id, Message}, _From, #state{channel = Channel} = State) ->
error_logger:info_msg("mfx_grpc message: ~p, channel: ~p", [Message, Channel]),
{ok, _, HeadersAndTrailers} = mainflux_things_service_client:can_access_by_id(Message, #{channel => Channel}),
error_logger:info_msg("mfx_grpc can_access_by_id() HeadersAndTrailers: ~p", [HeadersAndTrailers]),
case maps:get(<<":status">>, maps:get(headers, HeadersAndTrailers)) of
<<"200">> ->
{reply, ok, State};
ErrorStatus ->
{reply, {error, ErrorStatus}, State}
end.

handle_cast(_Request, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(Reason, #state{conn = GrpcConn} = State) ->
grpc_client:stop(GrpcConn),
terminate(Reason, #state{channel = Channel} =State) ->
grpcbox_channel:stop(Channel),
{stop, Reason, State}.

34 changes: 11 additions & 23 deletions mqtt/verne/src/mfx_nats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
loop/1
]).

-include("proto/message.hrl").

-record(state, {conn}).

start_link() ->
Expand Down Expand Up @@ -66,36 +64,26 @@ loop(Conn) ->
{Conn, {msg, <<"teacup.control">>, _, <<"exit">>}} ->
error_logger:info_msg("NATS received exit msg", []);
{Conn, {msg, Subject, _ReplyTo, NatsMsg}} ->
#{protocol := Protocol, contentType := ContentType, payload := Payload} = message:decode_msg(NatsMsg, 'mainflux.RawMessage'),
#{protocol := Protocol, channel := ChannelId, contentType := ContentType,
payload := Payload, subtopic := Subtopic} = message_pb:decode_msg(NatsMsg, raw_message),
error_logger:info_msg("Received NATS protobuf msg with payload: ~p and ContentType: ~p~n", [Payload, ContentType]),
case Protocol of
"mqtt" ->
<<"mqtt">> ->
error_logger:info_msg("Ignoring MQTT message loopback", []),
loop(Conn);
_ ->
error_logger:info_msg("mfx_nats Protocol ~p", [Protocol]),
error_logger:info_msg("Re-publishing on MQTT broker", []),
Subtopic2 = re:split(Subtopic,"/"),
ContentType2 = re:replace(ContentType, "/","_",[global,{return,list}]),
ContentType3 = re:replace(ContentType2, "\\+","-",[global,{return,binary}]),
ContentType3 = re:replace(ContentType2, "\\+","-",[global,{return,binary}]),
{_, PublishFun, {_, _}} = vmq_reg:direct_plugin_exports(?MODULE),
% Topic needs to be in the form of the list, like [<<"channel">>,<<"6def78cd-b441-4fd8-8680-af7e3bbea187">>]
Topic = case re:split(Subject, <<"\\.">>) of
[<<"channel">>, ChannelId] ->
case ContentType of
<<"">> ->
[<<"channels">>, ChannelId, <<"messages">>];
_ ->
[<<"channels">>, ChannelId, <<"messages">>, <<"ct">>, ContentType3]
end;
[<<"channel">>, ChannelId, Subtopic] ->
case ContentType of
<<"">> ->
[<<"channels">>, ChannelId, <<"messages">>, Subtopic];
_ ->
[<<"channels">>, ChannelId, <<"messages">>, Subtopic, <<"ct">>, ContentType3]
end;
Other ->
error_logger:info_msg("Could not match topic: ~p~n", [Other]),
error
Topic = case ContentType of
<<"">> ->
[<<"channels">>, ChannelId, <<"messages">>] ++ Subtopic2;
_ ->
[<<"channels">>, ChannelId, <<"messages">>] ++ Subtopic2 ++ [<<"ct">>, ContentType3]
end,
error_logger:info_msg("Subject: ~p, Topic: ~p, PublishFunction: ~p~n", [Subject, Topic, PublishFun]),
PublishFun(Topic, Payload, #{qos => 0, retain => false}),
Expand Down
Loading

0 comments on commit d11b53f

Please sign in to comment.