Skip to content

Commit

Permalink
add wm module for GETing (and encoding) active preflist
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Feb 25, 2015
1 parent 382e295 commit 9c289ad
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
{riak_kv_pb_bucket, 15, 18}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
{riak_kv_pb_index, 25, 26}, %% Secondary index requests
{riak_core_pb_bucket_key_apl, 33, 34}, %% (Active) Preflist requests
{riak_kv_pb_csbucket, 40, 41}, %% CS bucket folding support
{riak_kv_pb_counter, 50, 53}, %% counter requests
{riak_kv_pb_crdt, 80, 83} %% CRDT requests
Expand Down
5 changes: 4 additions & 1 deletion src/riak_kv_pb_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
decode/2,
encode/1,
process/2,
process_stream/3]).
process_stream/3,
maybe_create_bucket_type/2]).

-record(state, {client, % local client
req, % current request (for multi-message requests like list keys)
Expand Down Expand Up @@ -189,5 +190,7 @@ check_bucket_type(Type) ->
-spec maybe_create_bucket_type(binary(), binary()) -> binary() | {binary(), binary()}.
maybe_create_bucket_type(<<"default">>, Bucket) ->
Bucket;
maybe_create_bucket_type(undefined, Bucket) ->
Bucket;
maybe_create_bucket_type(Type, Bucket) when is_binary(Type) ->
{Type, Bucket}.
88 changes: 88 additions & 0 deletions src/riak_kv_pb_bucket_key_apl.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
%% --------------------------------------------------------------------------
%%
%% riak_core_pb_bucket_key_apl: Expose Core active preflist functionality to
%% Protocol Buffers
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% --------------------------------------------------------------------------

%% @doc <p>The Bucket-Key Preflist (Primaries & Fallbacks) PB service
%% for Riak Core. This service covers the following request messages in the
%% original protocol:</p>
%%
%% <pre>
%% 33 - RpbGetBucketKeyPreflistReq
%% </pre>
%%
%% <p>This service produces the following responses:</p>
%%
%% <pre>
%% 34 - RpbGetBucketKeyPreflistResp
%% </pre>
%%
%% @end

-module(riak_kv_pb_bucket_key_apl).

-export([init/0,
decode/2,
encode/1,
process/2,
process_stream/3]).

-include_lib("riak_pb/include/riak_kv_pb.hrl").
-include_lib("riak_pb/include/riak_pb_kv_codec.hrl").

init() ->
undefined.

%% @doc decode/2 callback. Decodes an incoming message.
decode(Code, Bin) when Code == 33 ->
Msg = riak_pb_codec:decode(Code, Bin),
case Msg of
#rpbgetbucketkeypreflistreq{type =T, bucket =B, key =Key} ->
Bucket = riak_core_pb_bucket:bucket_type(T, B),
{ok, Msg, {"riak_core.get_preflist", {Bucket, Key}}}
end.

%% @doc encode/1 callback. Encodes an outgoing response message.
encode(Message) ->
{ok, riak_pb_codec:encode(Message)}.

%% Get bucket-key preflist primaries
process(#rpbgetbucketkeypreflistreq{bucket = <<>>}, State) ->
{error, "Bucket cannot be zero-length", State};
process(#rpbgetbucketkeypreflistreq{key = <<>>}, State) ->
{error, "Key cannot be zero-length", State};
process(#rpbgetbucketkeypreflistreq{type = <<>>}, State) ->
{error, "Type cannot be zero-length", State};
process(#rpbgetbucketkeypreflistreq{type=T, bucket=B0, key =K}, State) ->
B = riak_kv_pb_bucket:maybe_create_bucket_type(T, B0),
Preflist = riak_core_apl:get_apl_ann_with_pnum({B, K}),
case Preflist of
[] ->
{error, {format,
"No preflist for bucket '~s' and key '~s'",
[B, K]}, State};
P ->
PbPreflist = riak_pb_kv_codec:encode_apl_ann(P),
{reply, #rpbgetbucketkeypreflistresp{preflist=PbPreflist}, State}
end.

process_stream(_, _, State) ->
{ignore, State}.
3 changes: 3 additions & 0 deletions src/riak_kv_web.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ raw_dispatch(Name) ->
{Prefix ++ ["buckets", bucket, "keys", key],
riak_kv_wm_object, Props},

{Prefix ++ ["buckets", bucket, "keys", key, "preflist"],
riak_kv_wm_preflist, Props},

{Prefix ++ ["buckets", bucket, "keys", key, '*'],
riak_kv_wm_link_walker, Props},

Expand Down
165 changes: 165 additions & 0 deletions src/riak_kv_wm_preflist.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
%% --------------------------------------------------------------------------
%%
%% riak_kv_wm_preflist - Webmachine resource for getting bucket/key active
%% preflist.
%%
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% --------------------------------------------------------------------------

%% @doc Resource for getting Riak bucket/key active preflist over HTTP.
%%
%% Available operations:
%%
%% GET /types/Type/buckets/Bucket/keys/Key/preflist
%% GET /buckets/Bucket/keys/Key/preflist
%% Get information about the active preflist for a particular buckey/key
%% combo.

-module(riak_kv_wm_preflist).

%% webmachine resource exports
-export([
init/1,
is_authorized/2,
forbidden/2,
allowed_methods/2,
resource_exists/2,
content_types_provided/2,
encodings_provided/2,
produce_preflist_body/2,
malformed_request/2
]).

-record(ctx, {
bucket_type :: binary(), %% bucket type (from uri)
bucket :: binary(), %% Bucket name (from uri)
key :: binary(), %% Key (from uri)
security :: riak_core_security:context() %% security context
}).

-type context() :: #ctx{}.

-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").

-spec init(proplists:proplist()) -> {ok, context()}.
%% @doc Initialize this resource. This function extracts the
%% 'prefix' and 'riak' properties from the dispatch args.
init(Props) ->
{ok, #ctx{
bucket_type=proplists:get_value(bucket_type, Props)}}.

is_authorized(ReqData, Ctx) ->
case riak_api_web_security:is_authorized(ReqData) of
false ->
{"Basic realm=\"Riak\"", ReqData, Ctx};
{true, SecContext} ->
{true, ReqData, Ctx#ctx{security=SecContext}};
insecure ->
%% XXX 301 may be more appropriate here, but since the http and
%% https port are different and configurable, it is hard to figure
%% out the redirect URL to serve.
{{halt, 426}, wrq:append_to_resp_body(<<"Security is enabled and "
"Riak does not accept credentials over HTTP. Try HTTPS "
"instead.">>, ReqData), Ctx}
end.

forbidden(RD, Ctx = #ctx{security=undefined}) ->
{riak_kv_wm_utils:is_forbidden(RD), RD, Ctx};
forbidden(RD, Ctx) ->
case riak_kv_wm_utils:is_forbidden(RD) of
true ->
{true, RD, Ctx};
false ->
Res = riak_core_security:check_permission({"riak_kv.get_preflist",
Ctx#ctx.bucket_type},
Ctx#ctx.security),
case Res of
{false, Error, _} ->
RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
{true, wrq:append_to_resp_body(
unicode:characters_to_binary(
Error, utf8, utf8), RD1), Ctx};
{true, _} ->
{false, RD, Ctx}
end
end.

-spec allowed_methods(#wm_reqdata{}, context()) ->
{[atom()], #wm_reqdata{}, context()}.
%% @doc Get the list of methods this resource supports.
%% Properties allows, GET.
allowed_methods(RD, Ctx) ->
{['HEAD', 'GET'], RD, Ctx}.

-spec content_types_provided(#wm_reqdata{}, context()) ->
{[{ContentType::string(), Producer::atom()}], #wm_reqdata{}, context()}.
%% @doc List the content types available for representing this resource.
%% "application/json" is the content-type for listing keys.
content_types_provided(RD, Ctx) ->
%% bucket-level: JSON description only
{[{"application/json", produce_preflist_body}], RD, Ctx}.

-spec encodings_provided(#wm_reqdata{}, context()) ->
{[{Encoding::string(), Producer::function()}], #wm_reqdata{}, context()}.
%% @doc List the encodings available for representing this resource.
%% "identity" and "gzip" are available for a preflist request.
encodings_provided(RD, Ctx) ->
%% identity and gzip for top-level and bucket-level requests
{riak_kv_wm_utils:default_encodings(), RD, Ctx}.

resource_exists(RD, #ctx{bucket_type=BType}=Ctx) ->
{riak_kv_wm_utils:bucket_type_exists(BType), RD, Ctx}.

-spec malformed_request(#wm_reqdata{}, context()) ->
{boolean(), #wm_reqdata{}, context()}.
malformed_request(RD, Ctx) ->
{false, RD, Ctx}.

-spec produce_preflist_body(#wm_reqdata{}, context()) ->
{mochijson2:json_object(),
#wm_reqdata{},
context()}.
%% @doc Produce bucket/key annotated preflist as JSON.
produce_preflist_body(RD, #ctx{bucket=Bucket0,
bucket_type=BType,
key=Key}=Ctx) ->
Bucket = riak_kv_wm_utils:maybe_bucket_type(BType, Bucket0),
Preflist = riak_core_apl:get_apl_ann_with_pnum({Bucket, Key}),
%% Encode
Json = mochijson2:encode({struct,
[{<<"preflist">>,
lists:flatten(jsonify_preflist(Preflist))}]}),
{Json, RD, Ctx}.

%% Private

-spec jsonify_preflist(riak_core_apl:preflist_with_pnum_ann()) -> list().
%% @doc Jsonify active preflist to json.
jsonify_preflist(Preflist) ->
[jsonify_preflist_results(PartitionNumber, Node, T) ||
{{PartitionNumber, Node}, T} <- Preflist].

-spec jsonify_preflist_results(non_neg_integer(), node(), primary|fallback) ->
[{struct, list()}].
jsonify_preflist_results(PartitionNumber, Node, Ann) ->
[{struct,
[{<<"partition">>, PartitionNumber},
{<<"node">>, atom_to_binary(Node, utf8)},
{<<"primary">>, Ann =:= primary}]}].

0 comments on commit 9c289ad

Please sign in to comment.