Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Dec 17, 2015
1 parent 5b99829 commit 36dc468
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 32 deletions.
1 change: 1 addition & 0 deletions include/leo_gateway.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
-define(ERROR_NOT_MATCH_LENGTH, "Not match object length").
-define(ERROR_FAIL_PUT_OBJ, "Fail put an object").
-define(ERROR_FAIL_RETRIEVE_OBJ, "Fail retrieve an object").
-define(ERROR_COULD_NOT_START_TRAN, "Could not start a transaction").


%%----------------------------------------------------------------------
Expand Down
81 changes: 49 additions & 32 deletions src/leo_large_object_get_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
-define(DEF_SEPARATOR, <<"\n">>).

-undef(DEF_TIMEOUT).
-define(DEF_TIMEOUT, 30000).
-define(DEF_TIMEOUT, timer:seconds(30)).
-define(DEF_RETRY_TIMES, 5).

-record(state, {
key = <<>> :: binary(),
Expand Down Expand Up @@ -111,27 +112,34 @@ handle_call(stop, _From, State) ->

handle_call({get, TotalOfChunkedObjs, Req, Meta}, _From,
#state{key = Key, transport_rec = TransportRec} = State) ->
Reply = case leo_tran_serializable_cntnr:run(Key, null, null, ?MODULE,
{TotalOfChunkedObjs, Req, Meta, TransportRec},
[{?PROP_IS_WAIT_FOR_TRAN, false}], infinity) of
{value, WriterRep} ->
WriterRep;
{error, ?ERROR_ALREADY_HAS_TRAN} ->
{ok, Ref} = put_begin_tran_with_retry(Key),
TotalSize = Meta#?METADATA.dsize,
ReaderRep = case catch handle_read_loop(
TotalSize, #req_info{key = Key,
request = Req,
reference = Ref,
transport_rec = TransportRec}) of
{'EXIT', Reason} ->
{error, Reason};
Ret ->
Ret
end,
leo_cache_api:put_end_tran(Ref, read, Key, undef, true),
ReaderRep
end,
Reply = case leo_tran_serializable_cntnr:run(
Key, null, null, ?MODULE,
{TotalOfChunkedObjs, Req, Meta, TransportRec},
[{?PROP_IS_WAIT_FOR_TRAN, false}], infinity) of
{value, WriterRep} ->
WriterRep;
{error, ?ERROR_ALREADY_HAS_TRAN} ->
case put_begin_tran_with_retry(Key) of
{ok, Ref} ->
TotalSize = Meta#?METADATA.dsize,
ReaderRep =
case catch handle_read_loop(
TotalSize,
#req_info{key = Key,
request = Req,
reference = Ref,
transport_rec = TransportRec}) of
{'EXIT', Reason} ->
{error, Reason};
Ret ->
Ret
end,
_ = leo_cache_api:put_end_tran(Ref, read, Key, undef, true),
ReaderRep;
{error, Cause} ->
{error, Cause}
end
end,
{reply, Reply, State}.

handle_cast(_Msg, State) ->
Expand All @@ -149,9 +157,11 @@ code_change(_OldVsn, State, _Extra) ->
%% Callbacks for leo_tran_behaviour
run(Key, _, _, {TotalOfChunkedObjs, Req, Meta, TransportRec}, _) ->
Ref = case catch leo_cache_api:put_begin_tran(write, Key) of
{ok, R} -> R;
_ -> undefined
end,
{ok, R} ->
R;
_ ->
undefined
end,
Reply = handle_loop(TotalOfChunkedObjs,
#req_info{key = Key,
chunk_key = Key,
Expand Down Expand Up @@ -253,8 +263,8 @@ handle_loop(Index, TotalChunkObjs, #req_info{key = AcctualKey,
%% @doc Read Mode
%% @private
-spec(handle_read_loop(TotalSize, ReqInfo) ->
{ok, any()} | {error, any()} when TotalSize::non_neg_integer(),
ReqInfo::#req_info{}).
{ok, any()} | {error, any()} when TotalSize::non_neg_integer(),
ReqInfo::#req_info{}).
handle_read_loop(TotalSize, ReqInfo) ->
handle_read_loop(0, TotalSize, ReqInfo).

Expand Down Expand Up @@ -285,13 +295,20 @@ handle_read_loop(Offset, TotalSize, #req_info{key = Key,
handle_read_loop(Offset + ReadSize, TotalSize, ReqInfo)
end.


%% @doc Execute begin-tran
%% @private
put_begin_tran_with_retry(Key) ->
put_begin_tran_with_retry(Key, ?DEF_RETRY_TIMES).

%% @private
put_begin_tran_with_retry(_,0) ->
{error, ?ERROR_COULD_NOT_START_TRAN};
put_begin_tran_with_retry(Key, RetryTimes) ->
case leo_cache_api:put_begin_tran(read, Key) of
{ok, Ref} ->
{ok, Ref};
not_found ->
timer:sleep(100),
put_begin_tran_with_retry(Key);
Other ->
Other
_ ->
timer:sleep(timer:seconds(1)),
put_begin_tran_with_retry(Key, RetryTimes - 1)
end.

0 comments on commit 36dc468

Please sign in to comment.