Skip to content

Commit

Permalink
feat: make greptimedb worker more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Dec 4, 2023
1 parent 6ea8539 commit 92a9646
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,20 @@ do_shoot(State0, Requests0, Pending0, N, Channel) ->
Requests = Requests0#{pending := Pending, pending_count := N - 1},
State1 = State0#state{requests = Requests},
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of
{ok, Stream} ->
shoot(Stream, Req, State1, [ReplyTo]);
_Err ->
try
case greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}) of
{ok, Stream} ->
shoot(Stream, Req, State1, [ReplyTo]);
_Err ->
State0
end
catch
E:R:S ->
logger:error("[GreptimeDB] failed to shoot(pending=~0p,channel=~0p): ~0p ~0p ~p", [N, Channel, E, R, S]),
State0
end.


shoot(Stream, ?REQ(Req, _), #state{requests = #{pending_count := 0}} = State, ReplyToList) ->
%% Write the last request and finish stream
case greptimedb_stream:write_request(Stream, Req) of
Expand Down Expand Up @@ -276,11 +283,17 @@ health_check(Pid) ->
gen_server:call(Pid, health_check, ?HEALTH_CHECK_TIMEOUT).

stream(Pid) ->
case gen_server:call(Pid, channel, ?CALL_TIMEOUT) of
{ok, Channel} ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel});
Err -> Err
try
case gen_server:call(Pid, channel, ?CALL_TIMEOUT) of
{ok, Channel} ->
Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond),
greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel});
Err -> Err
end
catch
E:R:S ->
logger:error("[GreptimeDB] failed to create stream for ~0p: ~0p ~0p ~p", [Pid, E, R, S]),
{error, R}
end.

ddl() ->
Expand Down

0 comments on commit 92a9646

Please sign in to comment.