Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle get_object with range #21

Merged
merged 10 commits into from
Jun 16, 2015
Merged
100 changes: 67 additions & 33 deletions src/leo_gateway_http_commons.erl
Original file line number Diff line number Diff line change
Expand Up @@ -738,30 +738,63 @@ get_range_object(Req, Bucket, Key, {error, badarg}) ->
?reply_bad_range([?SERVER_HEADER], Key, <<>>, Req);
get_range_object(Req, Bucket, Key, {_Unit, Range}) when is_list(Range) ->
Mime = leo_mime:guess_mime(Key),
Header = [?SERVER_HEADER,
{?HTTP_HEAD_RESP_CONTENT_TYPE, Mime}],
{ok, Req2} = cowboy_req:chunked_reply(?HTTP_ST_PARTIAL_CONTENT, Header, Req),
get_range_object_1(Req2, Bucket, Key, Range, undefined).
case get_body_length_1(Key, Range) of
error ->
?reply_service_unavailable_error([?SERVER_HEADER], Key, <<>>, Req);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To respond the http status code properly,
We need to check why this error occured, and respond http status corresponding with the error like

  • {error, not_found} reply_not_found_without_body
  • {error, unavailable} reply_service_unavailable_error
  • {error, ?ERR_TYPE_INTERNAL_ERROR} reply_internal_error_without_body
  • {error, timeout} reply_timeout_without_body

Length ->
Header = [?SERVER_HEADER,
{?HTTP_HEAD_RESP_CONTENT_TYPE, Mime},
{?HTTP_HEAD_RESP_CONTENT_LENGTH,integer_to_list(Length)}],
Req2 = cowboy_req:set_resp_body_fun(
fun(Socket, Transport) ->
get_range_object_1(Req, Bucket, Key, Range, undefined, Socket, Transport)
end,
Req),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@windkit I found out why CI failed.
You need to use set_resp_body_fun's another version.

https://github.com/ninenines/cowboy/blob/1.0.0/src/cowboy_req.erl#L776

Try to pass the content length you calculated as the first argument.

Since R16B03-1 coudn't understand transfer-encoding: identify, this error have occured on CI.
https://github.com/erlang/otp/blob/OTP_R16B03-1/lib/inets/src/http_client/httpc_handler.erl#L1122

The latest one can understand this header.
https://github.com/erlang/otp/blob/OTP-17.5.6/lib/inets/src/http_client/httpc_handler.erl#L1130

?reply_ok(Header, Req2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@windkit @yosukehara
Sorry for the late reply to this pull request.

To comply with HTTP spec,
We need to respond HTTP_ST_PARTIAL_CONTENT(206) as a status code so reply_ok can't use.

end.

get_body_length_1(Key, Range) ->
case leo_gateway_rpc_handler:head(Key) of
{ok, #?METADATA{dsize = ObjectSize}} ->
get_body_length(Range, ObjectSize, 0);
{_, _} ->
error
end.

get_range_object_1(Req,_Bucket,_Key, [], _) ->
get_body_length([], _ObjectSize, Acc) ->
Acc;
get_body_length([{Start, infinity}|Rest], ObjectSize, Acc) ->
get_body_length(Rest, ObjectSize, Acc + ObjectSize - Start);
get_body_length([{Start, End}|Rest], ObjectSize, Acc) when End < 0 ->
get_body_length(Rest, ObjectSize, Acc + ObjectSize - Start);
get_body_length([{Start, End}|Rest], ObjectSize, Acc) when End < ObjectSize ->
get_body_length(Rest, ObjectSize, Acc + End - Start + 1);
get_body_length([End|Rest], ObjectSize, Acc) when End < 0 ->
get_body_length(Rest, ObjectSize, Acc + ObjectSize);
get_body_length([End|Rest], ObjectSize, Acc) when End < ObjectSize ->
get_body_length(Rest, ObjectSize, Acc + End + 1);
get_body_length(_, _, _) ->
error.

get_range_object_1(Req,_Bucket,_Key, [], _, _Socket, _Transport) ->
{ok, Req};
get_range_object_1(Req,_Bucket,_Key, _, {error, _}) ->
get_range_object_1(Req,_Bucket,_Key, _, {error, _}, _Socket, _Transport) ->
{ok, Req};
get_range_object_1(Req, Bucket, Key, [{Start, infinity}|Rest], _) ->
Ret = get_range_object_2(Req, Bucket, Key, Start, 0),
get_range_object_1(Req, Bucket, Key, Rest, Ret);
get_range_object_1(Req, Bucket, Key, [{Start, End}|Rest], _) ->
Ret = get_range_object_2(Req, Bucket, Key, Start, End),
get_range_object_1(Req, Bucket, Key, Rest, Ret);
get_range_object_1(Req, Bucket, Key, [End|Rest], _) ->
Ret = get_range_object_2(Req, Bucket, Key, 0, End),
get_range_object_1(Req, Bucket, Key, Rest, Ret).

get_range_object_2(Req, Bucket, Key, Start, End) ->
get_range_object_1(Req, Bucket, Key, [{Start, infinity}|Rest], _, Socket, Transport) ->
Ret = get_range_object_2(Req, Bucket, Key, Start, 0, Socket, Transport),
get_range_object_1(Req, Bucket, Key, Rest, Ret, Socket, Transport);
get_range_object_1(Req, Bucket, Key, [{Start, End}|Rest], _, Socket, Transport) ->
Ret = get_range_object_2(Req, Bucket, Key, Start, End, Socket, Transport),
get_range_object_1(Req, Bucket, Key, Rest, Ret, Socket, Transport);
get_range_object_1(Req, Bucket, Key, [End|Rest], _, Socket, Transport) ->
Ret = get_range_object_2(Req, Bucket, Key, 0, End, Socket, Transport),
get_range_object_1(Req, Bucket, Key, Rest, Ret, Socket, Transport).

get_range_object_2(Req, Bucket, Key, Start, End, Socket, Transport) ->
case leo_gateway_rpc_handler:head(Key) of
{ok, #?METADATA{del = 0,
cnumber = 0}} ->
get_range_object_small(Req, Bucket, Key, Start, End);
get_range_object_small(Req, Bucket, Key, Start, End, Socket, Transport);
{ok, #?METADATA{del = 0,
cnumber = N,
dsize = ObjectSize,
Expand All @@ -784,7 +817,8 @@ get_range_object_2(Req, Bucket, Key, Start, End) ->
{CurPos, Index}
end,
get_range_object_large(Req, Bucket, Key,
NewStartPos, NewEndPos, N, Index_1, CurPos_1);
NewStartPos, NewEndPos, N, Index_1, CurPos_1,
Socket, Transport);
{error, unavailable} ->
?reply_service_unavailable_error([?SERVER_HEADER], Key, <<>>, Req);
_ ->
Expand All @@ -794,14 +828,14 @@ get_range_object_2(Req, Bucket, Key, Start, End) ->

%% @doc Retrieve the small object
%% @private
get_range_object_small(Req, Bucket, Key, Start, End) ->
get_range_object_small(Req, Bucket, Key, Start, End, Socket, Transport) ->
case leo_gateway_rpc_handler:get(Key, Start, End) of
{ok, _Meta, <<>>} ->
?access_log_get(Bucket, Key, 0, ?HTTP_ST_OK),
ok;
{ok, _Meta, Bin} ->
?access_log_get(Bucket, Key, byte_size(Bin), ?HTTP_ST_OK),
cowboy_req:chunk(Bin, Req);
Transport:send(Socket, Bin);
{error, unavailable} ->
?reply_service_unavailable_error([?SERVER_HEADER], Key, <<>>, Req);
{error, Cause} ->
Expand Down Expand Up @@ -830,25 +864,25 @@ calc_pos(StartPos, EndPos, _ObjectSize) ->

%% @doc Retrieve the large object
%% @private
get_range_object_large(_Req,_Bucket,_Key,_Start,_End, Total, Total, CurPos) ->
get_range_object_large(_Req,_Bucket,_Key,_Start,_End, Total, Total, CurPos, _Socket, _Transport) ->
{ok, CurPos};
get_range_object_large(_Req,_Bucket,_Key,_Start, End,_Total,_Index, CurPos) when CurPos > End ->
get_range_object_large(_Req,_Bucket,_Key,_Start, End,_Total,_Index, CurPos, _Socket, _Transport) when CurPos > End ->
{ok, CurPos};
get_range_object_large( Req, Bucket, Key, Start, End, Total, Index, CurPos) ->
get_range_object_large( Req, Bucket, Key, Start, End, Total, Index, CurPos, Socket, Transport) ->
IndexBin = list_to_binary(integer_to_list(Index + 1)),
Key2 = << Key/binary, ?DEF_SEPARATOR/binary, IndexBin/binary >>,

case leo_gateway_rpc_handler:head(Key2) of
{ok, #?METADATA{cnumber = 0,
dsize = CS}} ->
%% get and chunk an object
NewPos = send_chunk(Req, Bucket, Key2, Start, End, CurPos, CS),
get_range_object_large(Req, Bucket, Key, Start, End, Total, Index + 1, NewPos);
NewPos = send_chunk(Req, Bucket, Key2, Start, End, CurPos, CS, Socket, Transport),
get_range_object_large(Req, Bucket, Key, Start, End, Total, Index + 1, NewPos, Socket, Transport);

{ok, #?METADATA{cnumber = GrandChildNum}} ->
case get_range_object_large(Req, Bucket, Key2, Start, End, GrandChildNum, 0, CurPos) of
case get_range_object_large(Req, Bucket, Key2, Start, End, GrandChildNum, 0, CurPos, Socket, Transport) of
{ok, NewPos} ->
get_range_object_large(Req, Bucket, Key, Start, End, Total, Index + 1, NewPos);
get_range_object_large(Req, Bucket, Key, Start, End, Total, Index + 1, NewPos, Socket, Transport);
{error, Cause} ->
{error, Cause}
end;
Expand All @@ -860,22 +894,22 @@ get_range_object_large( Req, Bucket, Key, Start, End, Total, Index, CurPos) ->

%% @doc
%% @private
send_chunk(_Req,_,_Key, Start,_End, CurPos, ChunkSize) when (CurPos + ChunkSize - 1) < Start ->
send_chunk(_Req,_,_Key, Start,_End, CurPos, ChunkSize, _Socket, _Transport) when (CurPos + ChunkSize - 1) < Start ->
%% skip proc
CurPos + ChunkSize;
send_chunk(Req,_Bucket, Key, Start, End, CurPos, ChunkSize) when CurPos >= Start andalso
send_chunk(_Req,_Bucket, Key, Start, End, CurPos, ChunkSize, Socket, Transport) when CurPos >= Start andalso
(CurPos + ChunkSize - 1) =< End ->
%% whole get
case leo_gateway_rpc_handler:get(Key) of
{ok, _Meta, Bin} ->
%% @FIXME current impl can't handle a file which consist of grand children
%% ?access_log_get(Bucket, Key, ChunkSize, ?HTTP_ST_OK),
cowboy_req:chunk(Bin, Req),
Transport:send(Socket, Bin),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing bug not handling errors returned by Transport:send|cowboy_req:chunk.
In this case, We need to handle this request as a error but we already sent the http header and a part of body,
What should we do?
The answer is do Transport:close.

Since there are lots of errors to be handled as the same above,
Please fix those bugs as well.

CurPos + ChunkSize;
Error ->
Error
end;
send_chunk(Req, _Bucket, Key, Start, End, CurPos, ChunkSize) ->
send_chunk(_Req, _Bucket, Key, Start, End, CurPos, ChunkSize, Socket, Transport) ->
%% partial get
StartPos = case Start =< CurPos of
true -> 0;
Expand All @@ -891,7 +925,7 @@ send_chunk(Req, _Bucket, Key, Start, End, CurPos, ChunkSize) ->
{ok, _Meta, Bin} ->
%% @FIXME current impl can't handle a file which consist of grand childs
%% ?access_log_get(Bucket, Key, ChunkSize, ?HTTP_ST_OK),
cowboy_req:chunk(Bin, Req),
Transport:send(Socket, Bin),
CurPos + ChunkSize;
{error, Cause} ->
{error, Cause}
Expand Down
2 changes: 1 addition & 1 deletion test/leo_gateway_web_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ range_object_base([_TermFun, _Node0, Node1], RangeValue) ->
?TARGET_HOST,
":8080/a/b.png"]),
[{"connection", "close"},{"range", RangeValue}]}, [], []),
?assertEqual(206, SC),
?assertEqual(200, SC),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the above comment, this should be rollbacked.

?assertEqual("od", Body)
catch
throw:Reason ->
Expand Down