Skip to content

Commit

Permalink
feat: some minor changes (#25)
Browse files Browse the repository at this point in the history
* feat: add uint values

* feat: normalize unauth error

* fix: rename health_pb to greptimedb_health_pb

* refactor: rename all proto modules with prefix greptimedb

* feat: adds deadline for gRPC and timeout for gen_server

* feat: set health checking timeout to be 1 second
  • Loading branch information
killme2008 authored Jul 20, 2023
1 parent 3cded68 commit c1c6733
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 39 deletions.
9 changes: 4 additions & 5 deletions src/greptime_v_1_greptime_database_bhvr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
-module(greptime_v_1_greptime_database_bhvr).

%% Unary RPC
-callback handle(ctx:t(), database_pb:greptime_request()) ->
{ok, database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response().
-callback handle(ctx:t(), greptimedb_database_pb:greptime_request()) ->
{ok, greptimedb_database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response().

%%
%%
-callback handle_requests(reference(), grpcbox_stream:t()) ->
{ok, database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response().

{ok, greptimedb_database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response().
17 changes: 8 additions & 9 deletions src/greptime_v_1_greptime_database_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

-define(SERVICE, 'greptime.v1.GreptimeDatabase').
-define(PROTO_MODULE, 'database_pb').
-define(PROTO_MODULE, 'greptimedb_database_pb').
-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE,
Expand All @@ -24,24 +24,24 @@
unmarshal_fun=?UNMARSHAL_FUN(Output)}).

%% @doc Unary RPC
-spec handle(database_pb:greptime_request()) ->
{ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec handle(greptimedb_database_pb:greptime_request()) ->
{ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
handle(Input) ->
handle(ctx:new(), Input, #{}).

-spec handle(ctx:t() | database_pb:greptime_request(), database_pb:greptime_request() | grpcbox_client:options()) ->
{ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec handle(ctx:t() | greptimedb_database_pb:greptime_request(), greptimedb_database_pb:greptime_request() | grpcbox_client:options()) ->
{ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
handle(Ctx, Input) when ?is_ctx(Ctx) ->
handle(Ctx, Input, #{});
handle(Input, Options) ->
handle(ctx:new(), Input, Options).

-spec handle(ctx:t(), database_pb:greptime_request(), grpcbox_client:options()) ->
{ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec handle(ctx:t(), greptimedb_database_pb:greptime_request(), grpcbox_client:options()) ->
{ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
handle(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/greptime.v1.GreptimeDatabase/Handle">>, Input, ?DEF(greptime_request, greptime_response, <<"greptime.v1.GreptimeRequest">>), Options).

%% @doc
%% @doc
-spec handle_requests() ->
{ok, grpcbox_client:stream()} | grpcbox_stream:grpc_error_response() | {error, any()}.
handle_requests() ->
Expand All @@ -58,4 +58,3 @@ handle_requests(Options) ->
{ok, grpcbox_client:stream()} | grpcbox_stream:grpc_error_response() | {error, any()}.
handle_requests(Ctx, Options) ->
grpcbox_client:stream(Ctx, <<"/greptime.v1.GreptimeDatabase/HandleRequests">>, ?DEF(greptime_request, greptime_response, <<"greptime.v1.GreptimeRequest">>), Options).

5 changes: 2 additions & 3 deletions src/greptime_v_1_health_check_bhvr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@
-module(greptime_v_1_health_check_bhvr).

%% Unary RPC
-callback health_check(ctx:t(), health_pb:health_check_request()) ->
{ok, health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response().

-callback health_check(ctx:t(), greptimedb_health_pb:health_check_request()) ->
{ok, greptimedb_health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response().
15 changes: 7 additions & 8 deletions src/greptime_v_1_health_check_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

-define(SERVICE, 'greptime.v1.HealthCheck').
-define(PROTO_MODULE, 'health_pb').
-define(PROTO_MODULE, 'greptimedb_health_pb').
-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE,
Expand All @@ -24,20 +24,19 @@
unmarshal_fun=?UNMARSHAL_FUN(Output)}).

%% @doc Unary RPC
-spec health_check(health_pb:health_check_request()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec health_check(greptimedb_health_pb:health_check_request()) ->
{ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Input) ->
health_check(ctx:new(), Input, #{}).

-spec health_check(ctx:t() | health_pb:health_check_request(), health_pb:health_check_request() | grpcbox_client:options()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec health_check(ctx:t() | greptimedb_health_pb:health_check_request(), greptimedb_health_pb:health_check_request() | grpcbox_client:options()) ->
{ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Ctx, Input) when ?is_ctx(Ctx) ->
health_check(Ctx, Input, #{});
health_check(Input, Options) ->
health_check(ctx:new(), Input, Options).

-spec health_check(ctx:t(), health_pb:health_check_request(), grpcbox_client:options()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
-spec health_check(ctx:t(), greptimedb_health_pb:health_check_request(), grpcbox_client:options()) ->
{ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/greptime.v1.HealthCheck/HealthCheck">>, Input, ?DEF(health_check_request, health_check_response, <<"greptime.v1.HealthCheckRequest">>), Options).

2 changes: 1 addition & 1 deletion src/column_pb.erl → src/greptimedb_column_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.7
%% Version source: file
-module(column_pb).
-module(greptimedb_column_pb).

-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
Expand Down
2 changes: 1 addition & 1 deletion src/common_pb.erl → src/greptimedb_common_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.7
%% Version source: file
-module(common_pb).
-module(greptimedb_common_pb).

-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
Expand Down
2 changes: 1 addition & 1 deletion src/database_pb.erl → src/greptimedb_database_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.7
%% Version source: file
-module(database_pb).
-module(greptimedb_database_pb).

-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
Expand Down
2 changes: 1 addition & 1 deletion src/ddl_pb.erl → src/greptimedb_ddl_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.7
%% Version source: file
-module(ddl_pb).
-module(greptimedb_ddl_pb).

-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
Expand Down
2 changes: 1 addition & 1 deletion src/health_pb.erl → src/greptimedb_health_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.19.7
%% Version source: file
-module(health_pb).
-module(greptimedb_health_pb).

-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
Expand Down
9 changes: 8 additions & 1 deletion src/greptimedb_values.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
-module(greptimedb_values).

-export([int32_value/1, int64_value/1, float64_value/1, boolean_value/1, binary_value/1,
string_value/1, date_value/1, datetime_value/1, timestamp_second_value/1,
string_value/1, date_value/1, datetime_value/1, timestamp_second_value/1, uint32_value/1,
uint64_value/1,
timestamp_millisecond_value/1, timestamp_microsecond_value/1,
timestamp_nanosecond_value/1]).

Expand All @@ -25,6 +26,12 @@ int32_value(V) ->
int64_value(V) ->
#{values => #{i64_values => [V]}, datatype => 'INT64'}.

uint32_value(V) ->
#{values => #{u32_values => [V]}, datatype => 'UINT32'}.

uint64_value(V) ->
#{values => #{u64_values => [V]}, datatype => 'UINT64'}.

float64_value(V) ->
#{values => #{f64_values => [V]}, datatype => 'FLOAT64'}.

Expand Down
27 changes: 20 additions & 7 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,48 @@

-behavihour(ecpool_worker).

-include_lib("grpcbox/include/grpcbox.hrl").

-export([handle/2, stream/1, ddl/0, health_check/1]).
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([connect/1]).

-record(state, {channel}).

-define(CALL_TIMEOUT, 12_000).
-define(HEALTH_CHECK_TIMEOUT, 1_000).
-define(REQUEST_TIMEOUT, 10_000).
-define(CONNECT_TIMEOUT, 5_000).

%% ===================================================================
%% gen_server callbacks
%% ===================================================================
init(Args) ->
logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]),
Endpoints = proplists:get_value(endpoints, Args),
Options = proplists:get_value(gprc_options, Args, #{}),
Options = proplists:get_value(gprc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}),
Channels =
lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints),
Channel = list_to_atom(pid_to_list(self())),
{ok, _} = grpcbox_channel_sup:start_child(Channel, Channels, Options),
{ok, #state{channel = Channel}}.

handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Reply = greptime_v_1_greptime_database_client:handle(Request, #{channel => Channel}),
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
{error, {?GRPC_STATUS_UNAUTHENTICATED, Msg}, Other} ->
{reply, {error, {unauth, Msg, Other}}, State};
Err ->
{reply, Err, State}
end;
handle_call(health_check, _From, #state{channel = Channel} = State) ->
Request = #{},
Reply = greptime_v_1_health_check_client:health_check(Request, #{channel => Channel}),
Ctx = ctx:with_deadline_after(?HEALTH_CHECK_TIMEOUT, millisecond),
Reply =
greptime_v_1_health_check_client:health_check(Ctx, Request, #{channel => Channel}),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
Expand Down Expand Up @@ -75,14 +87,15 @@ terminate(Reason, #state{channel = Channel} = State) ->
%%% Public functions
%%%===================================================================
handle(Pid, Request) ->
gen_server:call(Pid, {handle, Request}).
gen_server:call(Pid, {handle, Request}, ?CALL_TIMEOUT).

health_check(Pid) ->
gen_server:call(Pid, health_check).
gen_server:call(Pid, health_check, ?HEALTH_CHECK_TIMEOUT).

stream(Pid) ->
{ok, Channel} = gen_server:call(Pid, channel),
greptime_v_1_greptime_database_client:handle_requests(#{channel => Channel}).
{ok, Channel} = gen_server:call(Pid, channel, ?CALL_TIMEOUT),
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}).

ddl() ->
todo.
Expand Down
29 changes: 28 additions & 1 deletion test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-include_lib("eunit/include/eunit.hrl").

all() ->
[t_write, t_write_stream, t_insert_requests, t_write_batch, t_bench_perf].
[t_write, t_write_stream, t_insert_requests, t_write_batch, t_bench_perf, t_auth_error].

%%[t_bench_perf].
%%[t_insert_requests, t_bench_perf].
Expand Down Expand Up @@ -131,6 +131,33 @@ t_write(_) ->
greptimedb:stop_client(Client),
ok.

t_auth_error(_) ->
Metric = <<"temperatures">>,
Points =
[#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => greptimedb_values:int64_value(0),
<<"region">> => <<"hangzhou">>},
timestamp => 1619775142098},
#{fields => #{<<"temperature">> => 2},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverB">>,
<<"qos">> => greptimedb_values:int64_value(1),
<<"region">> => <<"ningbo">>,
<<"to">> => <<"kafka">>},
timestamp => 1619775143098}],
Options =
[{endpoints, [{http, "localhost", 4001}]},
{pool, greptimedb_client_pool},
{pool_size, 5},
{pool_type, random},
{auth, {basic, #{username => <<"greptime_user">>, password => <<"wrong_pwd">>}}}],
{ok, Client} = greptimedb:start_client(Options),
{error, {unauth, _, _}} = greptimedb:write(Client, Metric, Points).

t_write_stream(_) ->
Options =
[{endpoints, [{http, "localhost", 4001}]},
Expand Down

0 comments on commit c1c6733

Please sign in to comment.