Skip to content

Commit

Permalink
DP-6 working prototype of API entry points coverage, http and pb API
Browse files Browse the repository at this point in the history
I used this branch (see PR #1077):
feature/zl/add-wm-code-for-getting-active-preflist,
which provides useful functions riak_core_apl:get_apl_ann() and friends.

There is a matching commit in a newly created branch
feature/az/api-entrypoints-coverage, for the pb part to become
functional.

The following HTTP requests are accepted:

  /ring/coverage/bucket/B/key/K/?proto=P

returning a JSON of the form:

 {Host:{"ports":[Port]}}

indicating Host:Port is where an entry point is for optimal data access
to the key K in bucket B, via the given protocol API.

Responses are cached for (a default of) 15 secs, configurable via
parameter 'ring_vnodes_cache_expiry_time'.

For pb, two new messages are provided:

 RpbApiEpReq (code 90) and
 RpbApiEpResp(code 91),

with the relevant snippet in riak_pb/src/riak_kv.proto:

 message RpbApiEpReq {
     required bytes bucket = 1;
     required bytes key = 2;
     enum RpbApiProto {
         pbc = 0;
         http = 1;
     }
     optional RpbApiProto proto = 3 [default = pbc];
 }

 message RpbApiEpResp {
     required bytes host = 1;
     required int32 port = 2;
 }

This coverage information is obtained in two steps:

1. using riak_core_apl:get_apl_ann(), get active preflist for given
   Bucket and Key, and extract riak nodes from it;

2. make a rpc call on those nodes to determine which of the relevant
   listener apps (i.e., riak_api_pb_listener, riak_api_web) are running
   on those nodes.

The corresponding new riak python client method is
client.get_api_entry_point(bucket, key), included in
basho/riak-python-client@5921a8cc1
  • Loading branch information
hmmr committed Feb 13, 2015
1 parent 3e2036e commit b92faa7
Show file tree
Hide file tree
Showing 5 changed files with 408 additions and 2 deletions.
115 changes: 115 additions & 0 deletions src/riak_kv_apiep.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_wm_ring_lib: Common functions for ring/coverage protobuff
%% callback and Webmachine resource
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Supporting functions shared between riak_kv_{wm,pb}_ring.

-module(riak_kv_apiep).

-export([get_endpoints/2, get_endpoints_json/2]).

-type hplist() :: [{string(), non_neg_integer()}].

-define(RPC_TIMEOUT, 10000).

-spec get_endpoints_json(http|pb, {binary(), binary()}) -> iolist().
%% @doc Produce requested api endpoints in a JSON form.
get_endpoints_json(Proto, BKey) ->
Endpoints = get_endpoints(Proto, BKey),
mochijson2:encode(
{struct, [{H, {struct, [{ports, P}]}}
|| {H, P} <- Endpoints]}).


-spec get_endpoints(http|pb, {binary(), binary()}) -> hplist().
%% @doc For a given protocol, determine host:port endpoints of riak
%% nodes containing requested bucket and key.
get_endpoints(Proto, {Bucket, Key}) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
UpNodes = riak_core_ring:all_members(Ring),
Preflist = riak_core_apl:get_apl_ann({Bucket, Key}, UpNodes),
Nodes =
lists:usort(
[N || {{_Index, N}, _Type} <- Preflist]), %% filter on type?
%% why bother with nodes? [{Host, [Port]}] is all our clients need, so:
case Proto of
http ->
get_http_endpoints(Nodes);
pbc ->
get_pb_endpoints(Nodes)
end.


%% ===================================================================
%% Local functions
%% ===================================================================

-spec get_http_endpoints([node()]) -> hplist().
%% @private
get_http_endpoints(Nodes) ->
{ResL, FailedNodes} =
rpc:multicall(
Nodes, riak_api_web, get_listeners, [], ?RPC_TIMEOUT),
case FailedNodes of
[] ->
fine;
FailedNodes ->
lagger:warning(
self(), "Failed to get http riak api listeners at node(s) ~9999p", [FailedNodes])
end,
%% there can be (really?) multiple api entry points (on multiple vnodes on same host),
%% so group by host:
lists:foldl(
fun({H, P}, Acc) ->
PP0 = proplists:get_value(H, Acc, []),
lists:keystore(H, 1, Acc, {H, [P|PP0]})
end, [],
[HP || {_Proto, HP} <- lists:flatten(ResL)]).


-spec get_pb_endpoints([node()]) -> [{Host::string(), Port::non_neg_integer()}].
%% @private
get_pb_endpoints(Nodes) ->
{ResL, FailedNodes} =
rpc:multicall(
Nodes, riak_api_pb_listener, get_listeners, [], ?RPC_TIMEOUT),
case FailedNodes of
[] ->
fine;
FailedNodes ->
lagger:warning(
self(), "Failed to get pb riak api listeners at node(s) ~9999p", [FailedNodes])
end,
lists:foldl(
fun({H, P}, Acc) ->
PP0 = proplists:get_value(H, Acc, []),
lists:keystore(H, 1, Acc, {H, [P|PP0]})
end, [],
lists:flatten(ResL)).


%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
%% TODO
-endif.
3 changes: 2 additions & 1 deletion src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
{riak_kv_pb_index, 25, 26}, %% Secondary index requests
{riak_kv_pb_csbucket, 40, 41}, %% CS bucket folding support
{riak_kv_pb_counter, 50, 53}, %% counter requests
{riak_kv_pb_crdt, 80, 83} %% CRDT requests
{riak_kv_pb_crdt, 80, 83}, %% CRDT requests
{riak_kv_pb_apiep, 90, 91} %% API entry points coverage
]).
-define(MAX_FLUSH_PUT_FSM_RETRIES, 10).

Expand Down
77 changes: 77 additions & 0 deletions src/riak_kv_pb_apiep.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
%% -------------------------------------------------------------------
%%
%% riak_api_pb_ring: Protobuff callbacks providing a `location service'
%% to external clients for optimal access to hosts
%% with partitions containing known buckets/key
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc Protobuff callbacks providing a `location service'
%% to external clients for optimal access to hosts
%% with partitions containing certain buckets/key
%%
%% This module serves request RpbApiEntryPointsReq (code 90)
%% returning response RpbApiEntryPointsResp (code 91)

-module(riak_kv_pb_apiep).

-behaviour(riak_api_pb_service).

-export([init/0,
decode/2,
encode/1,
process/2,
process_stream/3]).

-include_lib("riak_pb/include/riak_kv_pb.hrl").

-spec init() -> undefined.
init() ->
undefined.

decode(Code, Bin) when Code == 90; Code == 91 ->
Msg = riak_pb_codec:decode(Code, Bin),
case Msg of
#rpbapiepreq{bucket = B, key = K, proto = P} ->
{ok, Msg, {"riak_kv.apiep", {B, K, P}}}
end.


encode(Message) ->
{ok, riak_pb_codec:encode(Message)}.


process(#rpbapiepreq{bucket = Bucket, key = Key, proto = Proto}, State) ->
{Host, Port} =
case riak_kv_apiep:get_endpoints(Proto, {Bucket, Key}) of
[] ->
{"", 0};
[{H,[P|_]}|_] ->
%% there's a call to underlying function get_listeners(),
%% suggesting multiple entry points are possible, but
%% effectively there is just one
{H, P}
end,
{reply, #rpbapiepresp{host = list_to_binary(Host),
port = Port},
State}.


process_stream(_, _, State) ->
{ignore, State}.
5 changes: 4 additions & 1 deletion src/riak_kv_web.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ raw_dispatch(Name) ->
riak_kv_wm_link_walker, Props},

{Prefix ++ ["buckets", bucket, "index", field, '*'],
riak_kv_wm_index, Props}
riak_kv_wm_index, Props},

{Prefix ++ ["ring", "coverage", "bucket", bucket, "key", key],
riak_kv_wm_apiep, Props}

] || {Prefix, Props} <- Props2 ]).

Expand Down
Loading

0 comments on commit b92faa7

Please sign in to comment.