Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas i1804 peerdiscovery #1812

Merged
merged 16 commits into from
May 11, 2022
Merged

Mas i1804 peerdiscovery #1812

merged 16 commits into from
May 11, 2022

Conversation

martinsumner
Copy link
Contributor

See #1804

The heart of the problem is how to avoid needing configuration changes on sink clusters when source clusters are bing changed. This allows for new nodes to be discovered automatically, from configured nodes.

Default behaviour is to always fallback to configured behaviour.

Worker Counts and Per Peer Limits need to be set based on an understanding of whether this will be enabled. Although, if per peer limit is left to default, the consequence will be the worker count will be evenly distributed (independently by each node). Note, if Worker Count mod (Src Node Count) =/= 0 - then there will be no balancing of the excess workers across the sink nodes.

Refactoring of riak_kv_replrtq to allow sharing of code and interaction between snk and peer
Adds some further logging.  Also corrects the comparison between current and discovered peers to avoid unnecessary resets.
Adds operator riak_client functions
%% Prompt for the discovery of peers
-spec update_discovery(riak_kv_replrtq_snk:queue_name()) -> boolean().
update_discovery(QueueName) ->
gen_server:call(?MODULE, {update_discovery, QueueName}, 60 * 1000).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want 60 hardcoded here or rather use the macro ?DISCOVERY_TIMEOUT_SECONDS.

fun({QueueName, _PeerInfo}) ->
erlang:send_after(MinDelay * 1000,
self(),
{prompt_discovery, QueueName})
Copy link
Contributor

@ThomasArts ThomasArts Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes a few moments to realize that there are 2 interfaces for prompt discovery where this one goes via handle_info and has only one argument that does a lookup for the PeerInfo at the time it executes, whereas the cast version gets the PeerInfo in the interface.

Probably one should comment this difference more clearly. In particular why one does not need to do a state lookup for the cast version

handle_info({prompt_discovery, QueueName}, State) ->
{QueueName, PeerInfo} =
lists:keyfind(QueueName, 1, State#state.discovery_peers),
ok = prompt_discovery({QueueName, PeerInfo}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of dislike this... you leave the context of the genserver in the cast to later return to it... I wonder if it would not be better to actually call prompt_discovery(QueueName, PeerInfo, regular) at this point.

riak_kv_replrtq_snk:current_peers(QueueName)))
end,
case discover_peers(PeerInfo, StartDelayMS) of
CurrentPeers ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is by design, but if you run with Type count_change then CurrentPeers is the empty list. So in case discover_peers returns an empty list, you don't know in which of the two cases you are.

Both cases return false, but the side effects are different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to avoid this confusion by not relying on the empty list mis-match - rather a specific mis-match between list and atom. Commented as well. Don't think it is super-clean still, but improved maybe.

non_neg_integer()) -> list(node()).
replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
UpNodes = riak_core_node_watcher:nodes(riak_kv),
lists:foldl(replrtq_resetcount_fun(WorkerC, PerPeerL), [], UpNodes).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I find the code clearer if you inline this function.

@@ -409,13 +428,45 @@ handle_info({prompt_requeue, WorkItem}, State) ->
requeue_work(WorkItem),
{noreply, State}.

terminate(_Reason, _State) ->
terminate(_Reason, State) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This terminate function may now take considerably more time.

If the supervisor terminates this server, how likely is it that this takes too long compared to the time you want it restarted?

{SnkWorkerCount, PerPeerLimit}.

set_worker_counts(SnkWorkerCount, PerPeerLimit) ->
application:set_env(riak_kv, replrtq_sinkworkers, SnkWorkerCount),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using app_helper for getting and application for setting? Is this an artefcat of OTP18 or so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never thought about this much, just following inline with its use elsewhere. Looking at the docs application:get_env/3 didn't exist until R16B ... and Riak was initially written prior to that ... so I guess it is just a legacy of this.

It might be one thing to add to the list for the OTP 24+ version of Riak, to refactor throughout and to take this out.

Copy link
Contributor

@ThomasArts ThomasArts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some smaller comments, nothing structural, just making sure you did things on purpose.

@martinsumner martinsumner merged commit aeca1ca into develop-3.0 May 11, 2022
@martinsumner martinsumner deleted the mas-i1804-peerdiscovery branch May 12, 2022 14:28
martinsumner added a commit that referenced this pull request May 31, 2022
Merge in changes from Riak 3.0.10.

Includes PRs:

- #1809
- #1812
- #1814
- #1816
- #1829
- #1830
hmmr pushed a commit to TI-Tokyo/riak_kv that referenced this pull request Nov 23, 2022
See basho#1804

The heart of the problem is how to avoid needing configuration changes on sink clusters when source clusters are bing changed. This allows for new nodes to be discovered automatically, from configured nodes.

Default behaviour is to always fallback to configured behaviour.

Worker Counts and Per Peer Limits need to be set based on an understanding of whether this will be enabled. Although, if per peer limit is left to default, the consequence will be the worker count will be evenly distributed (independently by each node). Note, if Worker Count mod (Src Node Count) =/= 0 - then there will be no balancing of the excess workers across the sink nodes.
# Conflicts:
#	rebar.config
#	src/riak_kv_replrtq_peer.erl
#	src/riak_kv_replrtq_snk.erl
#	src/riak_kv_replrtq_src.erl
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants