From c427a56f1100c4e5231a531be2466c584f22cd62 Mon Sep 17 00:00:00 2001 From: Andreas Schultz Date: Mon, 7 Jun 2021 11:43:12 +0200 Subject: [PATCH 1/2] replace map in IP pool registry with ets The map required a synchronous call to the server. The ets table allows for concurent reads without synchnoizing the reader processes. The benchmark suite shows no speed up from. However that is not surprising as the results from the suite do vary heavily between runs. The registry showever up in a observer output with a non-zero message-q during benchmark runs. This change makes that go away. --- apps/ergw_core/src/ergw_local_pool_reg.erl | 38 ++++++++++------------ 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/apps/ergw_core/src/ergw_local_pool_reg.erl b/apps/ergw_core/src/ergw_local_pool_reg.erl index 21e6c435..6aa8e8bd 100644 --- a/apps/ergw_core/src/ergw_local_pool_reg.erl +++ b/apps/ergw_core/src/ergw_local_pool_reg.erl @@ -18,7 +18,7 @@ %% regine_server callbacks -export([init/1, handle_register/4, handle_unregister/3, handle_pid_remove/3, - handle_death/3, handle_call/3, terminate/2]). + handle_death/3, terminate/2]). %% -------------------------------------------------------------------- %% Include files @@ -26,6 +26,7 @@ -include("include/ergw.hrl"). -define(SERVER, ?MODULE). +-define(TAB, ?MODULE). %%%=================================================================== %%% API @@ -38,42 +39,39 @@ register(Pool) -> regine_server:register(?SERVER, self(), Pool, undefined). lookup(Pool) when is_binary(Pool) -> - regine_server:call(?SERVER, {lookup, Pool}). + case ets:lookup(?TAB, Pool) of + [{_, Pid}] -> Pid; + _ -> undefined + end. all() -> - regine_server:call(?SERVER, all). + ets:tab2list(?TAB). %%%=================================================================== %%% regine callbacks %%%=================================================================== init([]) -> - {ok, #{}}. + ets:new(?TAB, [named_table, set, protected, {keypos, 1}, {read_concurrency, true}]), + {ok, state}. -handle_register(_Pid, Pool, _Value, State) - when is_map_key(Pool, State) -> - {error, duplicate}; handle_register(Pid, Pool, _Value, State) -> - {ok, [Pool], maps:put(Pool, Pid, State)}. + case ets:insert_new(?TAB, {Pool, Pid}) of + false -> {error, duplicate}; + true -> {ok, [Pool], State} + end. -handle_unregister(Pool, _Value, State) - when is_map_key(Pool, State) -> - [[maps:get(Pool, State)], maps:remove(Pool, State)]; -handle_unregister(_Pool, _Value, State) -> - {[], State}. +handle_unregister(Pool, _Value, State) -> + Objs = ets:take(?TAB, Pool), + {[Pid || {_, Pid} <- Objs], State}. handle_pid_remove(_Pid, Pools, State) -> - maps:without(Pools, State). + [ets:delete(?TAB, Pool) || Pool <- Pools], + State. handle_death(_Pid, _Reason, State) -> State. -handle_call({lookup, Pool}, _From, State) -> - maps:get(Pool, State, undefined); - -handle_call(all, _From, State) -> - maps:to_list(State). - terminate(_Reason, _State) -> ok. From 1175727774554b521ea8117b66ed84db497d6c46 Mon Sep 17 00:00:00 2001 From: Andreas Schultz Date: Mon, 7 Jun 2021 15:06:02 +0200 Subject: [PATCH 2/2] new IP assignment, random init, fifo reuse of IP Initialized the free IP pool with a random order and time stamp to build a queue in the ets table. Released IPs are appended to the end of the free queue (through the timestamp). --- apps/ergw_core/c_src/Makefile | 74 +++++++++ apps/ergw_core/c_src/uint32_fy_shuffle.c | 202 +++++++++++++++++++++++ apps/ergw_core/rebar.config | 9 + apps/ergw_core/src/ergw_local_pool.erl | 48 +++--- apps/ergw_core/src/uint32_fy_shuffle.erl | 28 ++++ 5 files changed, 338 insertions(+), 23 deletions(-) create mode 100644 apps/ergw_core/c_src/Makefile create mode 100644 apps/ergw_core/c_src/uint32_fy_shuffle.c create mode 100644 apps/ergw_core/rebar.config create mode 100644 apps/ergw_core/src/uint32_fy_shuffle.erl diff --git a/apps/ergw_core/c_src/Makefile b/apps/ergw_core/c_src/Makefile new file mode 100644 index 00000000..1e9b6cbb --- /dev/null +++ b/apps/ergw_core/c_src/Makefile @@ -0,0 +1,74 @@ +# Based on c_src.mk from erlang.mk by Loic Hoguin + +CURDIR := $(shell pwd) +BASEDIR := $(abspath $(CURDIR)/..) + +PROJECT ?= $(notdir $(BASEDIR)) +PROJECT := $(strip $(PROJECT)) + +ERTS_INCLUDE_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts/erts-~ts/include/\", [code:root_dir(), erlang:system_info(version)])." -s init stop) +ERL_INTERFACE_INCLUDE_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts\", [code:lib_dir(erl_interface, include)])." -s init stop) +ERL_INTERFACE_LIB_DIR ?= $(shell erl -noshell -eval "io:format(\"~ts\", [code:lib_dir(erl_interface, lib)])." -s init stop) + +C_SRC_DIR = $(CURDIR) +C_SRC_OUTPUT ?= $(CURDIR)/../priv/$(PROJECT).so + +# System type and C compiler/flags. + +UNAME_SYS := $(shell uname -s) +ifeq ($(UNAME_SYS), Darwin) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -arch x86_64 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -arch x86_64 -finline-functions -Wall + LDFLAGS ?= -arch x86_64 -flat_namespace -undefined suppress +else ifeq ($(UNAME_SYS), FreeBSD) + CC ?= cc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +else ifeq ($(UNAME_SYS), Linux) + CC ?= gcc + CFLAGS ?= -O3 -std=c99 -finline-functions -Wall -Wmissing-prototypes + CXXFLAGS ?= -O3 -finline-functions -Wall +endif + +CFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) +CXXFLAGS += -fPIC -I $(ERTS_INCLUDE_DIR) -I $(ERL_INTERFACE_INCLUDE_DIR) + +LDLIBS += -L $(ERL_INTERFACE_LIB_DIR) -lei +LDFLAGS += -shared + +# Verbosity. + +c_verbose_0 = @echo " C " $(?F); +c_verbose = $(c_verbose_$(V)) + +cpp_verbose_0 = @echo " CPP " $(?F); +cpp_verbose = $(cpp_verbose_$(V)) + +link_verbose_0 = @echo " LD " $(@F); +link_verbose = $(link_verbose_$(V)) + +SOURCES := $(shell find $(C_SRC_DIR) -type f \( -name "*.c" -o -name "*.C" -o -name "*.cc" -o -name "*.cpp" \)) +OBJECTS = $(addsuffix .o, $(basename $(SOURCES))) + +COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c +COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c + +$(C_SRC_OUTPUT): $(OBJECTS) + @mkdir -p $(BASEDIR)/priv/ + $(link_verbose) $(CC) $(OBJECTS) $(LDFLAGS) $(LDLIBS) -o $(C_SRC_OUTPUT) + +%.o: %.c + $(COMPILE_C) $(OUTPUT_OPTION) $< + +%.o: %.cc + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.C + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +%.o: %.cpp + $(COMPILE_CPP) $(OUTPUT_OPTION) $< + +clean: + @rm -f $(C_SRC_OUTPUT) $(OBJECTS) diff --git a/apps/ergw_core/c_src/uint32_fy_shuffle.c b/apps/ergw_core/c_src/uint32_fy_shuffle.c new file mode 100644 index 00000000..90049fe8 --- /dev/null +++ b/apps/ergw_core/c_src/uint32_fy_shuffle.c @@ -0,0 +1,202 @@ +/* Copyright 2021, Travelping GmbH + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + */ + +/* Generate a sequence of at most 2^32 unsigned 32bit integers using the + * Fisher-Yates shuffle */ + +#include +#include +#include + +#define STEPCNT (4*1024) + +#define min(x,y) ( \ + { __auto_type __x = (x); __auto_type __y = (y); \ + __x < __y ? __x : __y; }) + +static ERL_NIF_TERM +mk_atom(ErlNifEnv* env, const char* atom) +{ + ERL_NIF_TERM ret; + + if(!enif_make_existing_atom(env, atom, &ret, ERL_NIF_LATIN1)) + { + return enif_make_atom(env, atom); + } + + return ret; +} + +static void swap (unsigned int *a, unsigned int *b) +{ + unsigned int temp = *a; + *a = *b; + *b = temp; +} + +static ERL_NIF_TERM shuffle_ret(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM new_argv[4]; + ERL_NIF_TERM head, tail; + unsigned int i, n, end; + unsigned int *arr; + ErlNifBinary bin; + + if (argc != 4 || + !enif_get_uint(env, argv[0], &n) || n <= 0 || + !enif_get_uint(env, argv[1], &i) || i > n || + !enif_inspect_binary(env, argv[2], &bin) || + !enif_is_list(env, argv[3])) + return enif_make_badarg(env); + + /* printf("ret i: %u\n\r", i); */ + /* printf("ret n: %u\n\r", n); */ + + tail = argv[3]; + end = min(n, i + STEPCNT); + arr = (unsigned int *)bin.data; + + for (; i < end; i++) { + head = enif_make_tuple1(env, + enif_make_tuple2(env, + enif_make_int64(env, INT64_MIN + i), + enif_make_uint(env, arr[i]))); + tail = enif_make_list_cell(env, head, tail); + } + + if (i >= n) { + enif_release_binary(&bin); + return tail; + } else { + new_argv[0] = argv[0]; + new_argv[1] = enif_make_uint(env, i); + new_argv[2] = argv[2]; + new_argv[3] = tail; + return enif_schedule_nif(env, "shuffle_ret", 0, shuffle_ret, 4, new_argv); + } +} + +static ERL_NIF_TERM shuffle_randomize(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM new_argv[4]; + unsigned int i, n, end; + unsigned int *arr; + ErlNifBinary bin; + + if (argc != 3 || + !enif_get_uint(env, argv[0], &n) || n <= 0 || + !enif_get_uint(env, argv[1], &i) || i > n || + !enif_inspect_binary(env, argv[2], &bin)) + return enif_make_badarg(env); + + if (i == n - 1) + srand(time(NULL)); + + end = (i > STEPCNT) ? i - STEPCNT : 0; + arr = (unsigned int *)bin.data; + + for (; i > end; i--) { + unsigned int j = rand() % (i+1); + swap(&arr[i], &arr[j]); + } + + new_argv[0] = argv[0]; + new_argv[2] = argv[2]; + + if (i == 0) { + new_argv[1] = enif_make_uint(env, 0); + new_argv[3] = enif_make_list(env, 0); + return enif_schedule_nif(env, "shuffle_ret", 0, shuffle_ret, 4, new_argv); + } else { + new_argv[1] = enif_make_uint(env, i); + return enif_schedule_nif(env, "shuffle_randomize", 0, shuffle_randomize, 3, new_argv); + } +} + +static ERL_NIF_TERM shuffle_fill(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM new_argv[3]; + unsigned int i, n, end; + unsigned int *arr; + ErlNifBinary bin; + + if (argc != 3 || + !enif_get_uint(env, argv[0], &n) || n <= 0 || + !enif_get_uint(env, argv[1], &i) || i > n || + !enif_inspect_binary(env, argv[2], &bin)) + return enif_make_badarg(env); + + /* printf("fill i: %u\r\n", i); */ + /* printf("fill n: %u\r\n", n); */ + + end = min(n, i + STEPCNT); + arr = (unsigned int *)bin.data; + + for (; i < end; i++) + arr[i] = i; + + new_argv[0] = argv[0]; + new_argv[2] = argv[2]; + + if (i == n) { + new_argv[1] = enif_make_uint(env, n - 1); + return enif_schedule_nif(env, "shuffle_randomize", 0, shuffle_randomize, 3, new_argv); + } else { + new_argv[1] = enif_make_uint(env, i); + return enif_schedule_nif(env, "shuffle_fill", 0, shuffle_fill, 3, new_argv); + } +} + +static ERL_NIF_TERM shuffle(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM new_argv[3]; + unsigned int n; + + if (!enif_get_uint(env, argv[0], &n) || n <= 0) + return enif_make_badarg(env); + + if (!(enif_make_new_binary(env, n * sizeof(unsigned int), &new_argv[2]))) + return enif_raise_exception(env, mk_atom(env, "out_of_memory")); + + new_argv[0] = argv[0]; + new_argv[1] = enif_make_uint(env, 0); + return enif_schedule_nif(env, "shuffle_fill", 0, shuffle_fill, 3, new_argv); +} + +#if 0 +static ERL_NIF_TERM shuffle(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + ERL_NIF_TERM head, tail; + unsigned int *arr; + unsigned int i, n; + + if (!enif_get_uint(env, argv[0], &n) || n <= 0) + return enif_make_badarg(env); + + if (!(arr = enif_alloc(n * sizeof(arr[0])))) + return enif_raise_exception(env, mk_atom(env, "out_of_memory")); + + for (i = 0; i < n; i++) + arr[i] = i; + //randomize(arr, n); + + tail = enif_make_list(env, 0); + for (i = 0; i < n; i++) { + head = enif_make_uint(env, arr[i]); + tail = enif_make_list_cell(env, head, tail); + } + return tail; +} +#endif + +static ErlNifFunc nif_funcs[] = +{ + {"shuffle", 1, shuffle} +}; + +ERL_NIF_INIT(uint32_fy_shuffle, nif_funcs, NULL, NULL, NULL, NULL) diff --git a/apps/ergw_core/rebar.config b/apps/ergw_core/rebar.config new file mode 100644 index 00000000..b949bef6 --- /dev/null +++ b/apps/ergw_core/rebar.config @@ -0,0 +1,9 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{pre_hooks, + [{"(linux|darwin|solaris)", compile, "make -C c_src"}, + {"(freebsd)", compile, "gmake -C c_src"}]}. +{post_hooks, + [{"(linux|darwin|solaris)", clean, "make -C c_src clean"}, + {"(freebsd)", clean, "gmake -C c_src clean"}]}. diff --git a/apps/ergw_core/src/ergw_local_pool.erl b/apps/ergw_core/src/ergw_local_pool.erl index dc7a4888..d57c7434 100644 --- a/apps/ergw_core/src/ergw_local_pool.erl +++ b/apps/ergw_core/src/ergw_local_pool.erl @@ -22,7 +22,7 @@ terminate/2, code_change/3]). -record(state, {name, pools, ipv4_opts, ipv6_opts}). --record(pool, {name, type, id, first, last, shift, used, free, used_pool, free_pool}). +-record(pool, {name, type, id, base, size, first, last, shift, used, free, used_pool, free_pool}). -record(lease, {ip, client_id}). -include_lib("kernel/include/logger.hrl"). @@ -193,12 +193,10 @@ init_pools(Name, Ranges) -> Pools end, #{}, Ranges). -init_table(_tetTid, Start, End) - when Start > End -> - ok; init_table(Tid, Start, End) -> - ets:insert(Tid, #lease{ip = Start}), - init_table(Tid, Start + 1, End). + Length = End - Start + 1, + IPsList = uint32_fy_shuffle:shuffle(Length), + true = ets:insert(Tid, IPsList). handle_call({get, ClientId, Type, PrefixLen, ReqOpts}, _From, #state{pools = Pools} = State) when is_atom(Type), is_map_key({Type, PrefixLen}, Pools) -> @@ -278,7 +276,7 @@ alloc_reply({error, _} = Error, _, _) -> init_pool(Name, Type, First, Last, Shift) -> UsedTid = ets:new(used_pool, [set, {keypos, #lease.ip}]), - FreeTid = ets:new(free_pool, [set, {keypos, #lease.ip}]), + FreeTid = ets:new(free_pool, [ordered_set]), Id = inet:ntoa(int2ip(Type, First)), Start = First bsr Shift, @@ -296,6 +294,7 @@ init_pool(Name, Type, First, Last, Shift) -> Pool = #pool{ name = Name, type = Type, id = Id, + base = Start, size = Size, first = First, last = Last, shift = Shift, used = 0, free = Size, used_pool = UsedTid, free_pool = FreeTid}, @@ -304,31 +303,33 @@ init_pool(Name, Type, First, Last, Shift) -> Pool. allocate_ip(ClientId, ReqOpts, - #pool{used = Used, free = Free, + #pool{base = Base, + used = Used, free = Free, used_pool = UsedTid, free_pool = FreeTid} = Pool0) when Free =/= 0 -> ?LOG(debug, "~w: Allocate Pool: ~p", [self(), Pool0]), - Id = ets:first(FreeTid), - ets:delete(FreeTid, Id), + {_, Id} = Key = ets:first(FreeTid), + ets:delete(FreeTid, Key), ets:insert(UsedTid, #lease{ip = Id, client_id = ClientId}), - IP = id2ip(Id, ReqOpts, Pool0), + IP = id2ip(Base + Id, ReqOpts, Pool0), Pool = Pool0#pool{used = Used + 1, free = Free - 1}, metrics_sync_gauges(Pool), {{ok, IP}, Pool}; allocate_ip(_ClientId, _ReqOpts, Pool) -> {{error, empty}, Pool}. -release_ip(IP, #pool{first = First, last = Last, - shift = Shift, - used = Used, free = Free, - used_pool = UsedTid, free_pool = FreeTid} = Pool0) +release_ip(IP, #pool{base = Base, first = First, last = Last, + shift = Shift, + used = Used, free = Free, + used_pool = UsedTid, free_pool = FreeTid} = Pool0) when IP >= First andalso IP =< Last -> - Id = IP bsr Shift, + Id = (IP bsr Shift) - Base, case ets:take(UsedTid, Id) of [_] -> - ets:insert(FreeTid, #lease{ip = Id}), + Now = erlang:monotonic_time(), + ets:insert(FreeTid, {{Now, Id}}), Pool = Pool0#pool{used = Used - 1, free = Free + 1}, metrics_sync_gauges(Pool), Pool; @@ -344,19 +345,20 @@ release_ip(_IP, Pool) -> Pool. take_ip(ClientId, IP, ReqOpts, - #pool{first = First, last = Last, + #pool{base = Base, first = First, last = Last, shift = Shift, used = Used, free = Free, used_pool = UsedTid, free_pool = FreeTid} = Pool0) when IP >= First andalso IP =< Last -> - Id = IP bsr Shift, + Id = (IP bsr Shift) - Base, - case ets:take(FreeTid, Id) of - [_] -> + case ets:match_object(FreeTid, {{'_', Id}}) of + [{Key}] -> + ets:delete(FreeTid, Key), ets:insert(UsedTid, #lease{ip = Id, client_id = ClientId}), Pool = Pool0#pool{used = Used + 1, free = Free - 1}, metrics_sync_gauges(Pool), - {{ok, id2ip(Id, ReqOpts, Pool)}, Pool}; + {{ok, id2ip(Base + Id, ReqOpts, Pool)}, Pool}; _ -> ?LOG(warning, "attempt to take already allocated IP: ~p", [id2ip(Id, ReqOpts, Pool0)]), @@ -374,7 +376,7 @@ take_ip(_ClientId, _IP, _ReqOpts, Pool) -> %%%=================================================================== metrics_sync_gauges(#pool{name = Name, type = Type, id = Id, - used = Used, free = Free}) -> + used = Used, free = Free}) -> prometheus_gauge:set(ergw_local_pool_free, [Name, Type, Id], Free), prometheus_gauge:set(ergw_local_pool_used, [Name, Type, Id], Used), ok. diff --git a/apps/ergw_core/src/uint32_fy_shuffle.erl b/apps/ergw_core/src/uint32_fy_shuffle.erl new file mode 100644 index 00000000..e904f3ef --- /dev/null +++ b/apps/ergw_core/src/uint32_fy_shuffle.erl @@ -0,0 +1,28 @@ +-module(uint32_fy_shuffle). + +-export([shuffle/1]). + +-on_load(init/0). + +-define(APPNAME, ergw_core). +-define(LIBNAME, ergw_core). + +shuffle(_) -> + not_loaded(?LINE). + +init() -> + SoName = case code:priv_dir(?APPNAME) of + {error, bad_name} -> + case filelib:is_dir(filename:join(["..", priv])) of + true -> + filename:join(["..", priv, ?LIBNAME]); + _ -> + filename:join([priv, ?LIBNAME]) + end; + Dir -> + filename:join(Dir, ?LIBNAME) + end, + erlang:load_nif(SoName, 0). + +not_loaded(Line) -> + erlang:nif_error({not_loaded, [{module, ?MODULE}, {line, Line}]}).