Skip to content

Commit

Permalink
WIP2
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Apr 4, 2024
1 parent d9063ac commit 4b69fce
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
17 changes: 12 additions & 5 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,20 @@ rdbms_backend_db_queries(_Config) ->
TS = rpc(mim(), mongoose_rdbms_timestamp, select, []),
TS2 = TS + 100,

%% Make sure "test1" and "test2" are not in the table
?assertEqual({updated, 1}, delete_node_from_db(<<"test1">>)),
?assertEqual({updated, 1}, delete_node_from_db(<<"test2">>)),

%% insertion fails if node name or node num is already added for the cluster
?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)),
?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)),
?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)),

%% insertion fails if node is a member of another cluster
?assertMatch({error, _}, insert_new(<<"my-cluster">>, <<"test1">>, 1, <<>>, TS)),

%% update of the timestamp works correctly
{selected, SelectedNodes1} = select(CN),
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]),
Expand All @@ -183,8 +190,8 @@ rdbms_backend_db_queries(_Config) ->
?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]),
lists:sort(SelectedNodes2)),

%% node removal work correctly
?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)),
%% node removal works correctly
?assertEqual({updated, 1}, delete_node_from_db(<<"test1">>)),
?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)).

rdbms_backend_publishes_node_ip(_Config) ->
Expand Down Expand Up @@ -316,9 +323,9 @@ update_existing(CN, BinNode, Address, TS) ->
ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]),
Ret.

delete_node_from_db(CN, BinNode) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]),
ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]),
delete_node_from_db(BinNode) ->
Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [BinNode]),
ct:log("delete_node_from_db(~p) = ~p", [BinNode, Ret]),
Ret.

start_cets_discovery(Config) ->
Expand Down
5 changes: 5 additions & 0 deletions priv/migrations/pgsql_6.2.0_x.x.x.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ CREATE TABLE caps (
features text NOT NULL,
PRIMARY KEY (node, sub_node)
);

ALTER TABLE discovery_nodes DROP CONSTRAINT discovery_nodes_pkey;
-- In case of duplicates, remove stale rows manually or wait for cleanup
ALTER TABLE discovery_nodes ADD PRIMARY KEY (node_name);
CREATE UNIQUE INDEX i_discovery_nodes_node_name ON discovery_nodes USING BTREE(cluster_name, node_name);
3 changes: 2 additions & 1 deletion priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,10 @@ CREATE TABLE discovery_nodes (
node_num INT NOT NULL,
address varchar(250) NOT NULL DEFAULT '', -- empty means we should ask DNS
updated_timestamp BIGINT NOT NULL, -- in seconds
PRIMARY KEY (cluster_name, node_name)
PRIMARY KEY (node_name)
);
CREATE UNIQUE INDEX i_discovery_nodes_node_num ON discovery_nodes USING BTREE(cluster_name, node_num);
CREATE UNIQUE INDEX i_discovery_nodes_node_name ON discovery_nodes USING BTREE(cluster_name, node_name);

CREATE TABLE caps (
node varchar(250) NOT NULL,
Expand Down
20 changes: 11 additions & 9 deletions src/mongoose_cets_discovery_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
-export([init/1, get_nodes/1]).

%% these functions are exported for testing purposes only.
-export([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]).
-ignore_xref([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]).
-export([select/1, insert_new/5, update_existing/4, delete_node_from_db/1]).
-ignore_xref([select/1, insert_new/5, update_existing/4, delete_node_from_db/1]).

-include("mongoose_logger.hrl").

Expand Down Expand Up @@ -78,14 +78,15 @@ try_register(ClusterName, Node, State = #{node_ip_binary := Address})
Num;
false ->
Num = first_free_num(lists:usort(Nums)),
delete_node_from_db(Node), % Delete node if it was a member of another cluster
%% Could fail with duplicate node_num reason.
%% In this case just wait for the next get_nodes call.
case insert_new(ClusterName, Node, Num, Address, Timestamp) of
{error, _} -> 0; %% return default node num
{updated, 1} -> Num
end
end,
RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
RunCleaningResult = run_cleaning(Timestamp, Rows, State),
%% This could be used for debugging
Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
address => Address,
Expand All @@ -95,12 +96,12 @@ try_register(ClusterName, Node, State = #{node_ip_binary := Address})
skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
(Nodes -- ExpiredNodes).

run_cleaning(ClusterName, Timestamp, Rows, State) ->
run_cleaning(Timestamp, Rows, State) ->
#{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State,
ExpiredNodes = [DbNode || {DbNode, _Num, _Addr, DbTS} <- Rows,
is_expired(DbTS, Timestamp, ExpireTime),
DbNode =/= CurrentNode],
[delete_node_from_db(ClusterName, DbNode) || DbNode <- ExpiredNodes],
[delete_node_from_db(DbNode) || DbNode <- ExpiredNodes],
case ExpiredNodes of
[] -> ok;
[_ | _] ->
Expand All @@ -120,7 +121,8 @@ prepare() ->
mongoose_rdbms_timestamp:prepare(),
mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
mongoose_rdbms:prepare(cets_disco_insert_new, T,
[cluster_name, node_name, node_num, address, updated_timestamp], insert_new()),
[cluster_name, node_name, node_num, address, updated_timestamp],
insert_new()),
mongoose_rdbms:prepare(cets_disco_update_existing, T,
[updated_timestamp, address, cluster_name, node_name], update_existing()),
mongoose_rdbms:prepare(cets_delete_node_from_db, T,
Expand All @@ -147,10 +149,10 @@ update_existing(ClusterName, NodeName, Address, UpdatedTimestamp) ->
mongoose_rdbms:execute(global, cets_disco_update_existing, [UpdatedTimestamp, Address, ClusterName, NodeName]).

delete_node_from_db() ->
<<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.
<<"DELETE FROM discovery_nodes WHERE node_name = ?">>.

delete_node_from_db(ClusterName, Node) ->
mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).
delete_node_from_db(Node) ->
mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [Node]).

%% in seconds
timestamp() ->
Expand Down

0 comments on commit 4b69fce

Please sign in to comment.