diff --git a/include/leo_object_storage.hrl b/include/leo_object_storage.hrl index 311a92f..6144ec3 100644 --- a/include/leo_object_storage.hrl +++ b/include/leo_object_storage.hrl @@ -582,11 +582,8 @@ }). %% @doc Retrieve compaction-proc's step parameters --define(step_compaction_proc_values(_State), +-define(step_compaction_proc_values(_RegBatchProcs,_RegInterval,_NumOfSteps), begin - #compaction_state{interval = _RegInterval, - num_of_batch_procs = _RegBatchProcs, - num_of_steps = _NumOfSteps} = _State, _StepBatchOfProcs = leo_math:ceiling(_RegBatchProcs / _NumOfSteps), _StepInterval = leo_math:ceiling(_RegInterval / _NumOfSteps), {ok, {_StepBatchOfProcs,_StepInterval}} diff --git a/src/leo_compact_fsm_worker.erl b/src/leo_compact_fsm_worker.erl index e450c50..9ad2dc7 100644 --- a/src/leo_compact_fsm_worker.erl +++ b/src/leo_compact_fsm_worker.erl @@ -237,8 +237,10 @@ idling(#compaction_event_info{event = ?EVENT_RUN, error_pos = 0, set_errors = sets:new(), acc_errors = [], + interval = ?env_compaction_interval_reg(), max_interval = ?env_compaction_interval_max(), - max_num_of_batch_procs = ?env_compaction_num_of_batch_procs_max(), + num_of_batch_procs = ?env_compaction_num_of_batch_procs_reg(), + max_num_of_batch_procs = ?env_compaction_num_of_batch_procs_max(), compaction_prms = CompactionPrms#compaction_prms{ num_of_active_objs = 0, @@ -275,23 +277,12 @@ idling(#compaction_event_info{event = ?EVENT_STATE, {next_state, NextStatus, State#compaction_state{status = NextStatus}}; idling(#compaction_event_info{event = ?EVENT_INCREASE}, State) -> - BatchProcs = ?env_compaction_num_of_batch_procs_reg(), - Interval = ?env_compaction_interval_reg(), NextStatus = ?ST_IDLING, - {next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs, - interval = Interval, - status = NextStatus}}; + {next_state, NextStatus, State#compaction_state{status = NextStatus}}; -idling(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_of_batch_procs = BatchProcs, - interval = Interval, - max_interval = MaxInterval} = State) -> - {ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State), - BatchProcs_1 = decr_batch_of_msgs_fun(BatchProcs, StepBatchProcs), - Interval_1 = incr_interval_fun(Interval, MaxInterval, StepInterval), +idling(#compaction_event_info{event = ?EVENT_DECREASE}, State) -> NextStatus = ?ST_IDLING, - {next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1, - interval = Interval_1, - status = NextStatus}}; + {next_state, NextStatus, State#compaction_state{status = NextStatus}}; idling(_, State) -> NextStatus = ?ST_IDLING, {next_state, NextStatus, State#compaction_state{status = NextStatus}}. @@ -321,6 +312,7 @@ running(#compaction_event_info{event = ?EVENT_RUN, compaction_prms = #compaction_prms{ start_lock_offset = StartLockOffset}} = State) -> + %% Temporally suspend the compaction %% in order to decrease i/o load CountProcs_1 = case (CountProcs < 1) of @@ -396,8 +388,12 @@ running(#compaction_event_info{event = ?EVENT_STATE, running(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_of_batch_procs = BatchProcs, max_num_of_batch_procs = MaxBatchProcs, - interval = Interval} = State) -> - {ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State), + interval = Interval, + num_of_steps = NumOfSteps} = State) -> + {ok, {StepBatchProcs, StepInterval}} = + ?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(), + ?env_compaction_interval_reg(), + NumOfSteps), BatchProcs_1 = incr_batch_of_msgs_fun(BatchProcs, MaxBatchProcs, StepBatchProcs), Interval_1 = decr_interval_fun(Interval, StepInterval), @@ -408,15 +404,13 @@ running(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_o running(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_of_batch_procs = BatchProcs, interval = Interval, - max_interval = MaxInterval} = State) -> - {ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State), - Interval_1 = Interval + StepInterval, - Interval_2 = case (Interval_1 >= MaxInterval) of - true -> - MaxInterval; - false -> - Interval_1 - end, + max_interval = MaxInterval, + num_of_steps = NumOfSteps} = State) -> + {ok, {StepBatchProcs, StepInterval}} = + ?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(), + ?env_compaction_interval_reg(), + NumOfSteps), + Interval_1 = incr_interval_fun(Interval, MaxInterval, StepInterval), {NextStatus, BatchProcs_1} = case (BatchProcs =< 0) of @@ -427,7 +421,7 @@ running(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_o decr_batch_of_msgs_fun(BatchProcs, StepBatchProcs)} end, {next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1, - interval = Interval_2, + interval = Interval_1, status = NextStatus}}; running(_, State) -> @@ -458,14 +452,22 @@ suspending(#compaction_event_info{event = ?EVENT_STATE, erlang:send(Client, NextStatus), {next_state, NextStatus, State#compaction_state{status = NextStatus}}; -suspending(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_of_batch_procs = BatchProcs, - max_num_of_batch_procs = MaxBatchProcs, - interval = Interval} = State) -> - {ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State), +suspending(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{id = Id, + is_diagnosing = IsDiagnosing, + is_recovering = IsRecovering, + num_of_batch_procs = BatchProcs, + max_num_of_batch_procs = MaxBatchProcs, + interval = Interval, + num_of_steps = NumOfSteps} = State) -> + {ok, {StepBatchProcs, StepInterval}} = + ?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(), + ?env_compaction_interval_reg(), + NumOfSteps), BatchProcs_1 = incr_batch_of_msgs_fun(BatchProcs, MaxBatchProcs, StepBatchProcs), Interval_1 = decr_interval_fun(Interval, StepInterval), NextStatus = ?ST_RUNNING, + timer:apply_after(timer:seconds(1), ?MODULE, run, [Id, IsDiagnosing, IsRecovering]), {next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1, interval = Interval_1, status = NextStatus}}; @@ -1108,7 +1110,7 @@ gen_compaction_report(State) -> NewInterval::non_neg_integer()). incr_interval_fun(Interval, MaxInterval, StepInterval) -> Interval_1 = Interval + StepInterval, - case (Interval_1 > MaxInterval) of + case (Interval_1 >= MaxInterval) of true -> MaxInterval; false -> @@ -1124,9 +1126,9 @@ incr_interval_fun(Interval, MaxInterval, StepInterval) -> NewInterval::non_neg_integer()). decr_interval_fun(Interval, StepInterval) -> Interval_1 = Interval - StepInterval, - case (Interval_1 < 0) of + case (Interval_1 =< ?DEF_MIN_COMPACTION_WT) of true -> - 0; + ?DEF_MIN_COMPACTION_WT; false -> Interval_1 end. diff --git a/test/leo_object_storage_api_tests.erl b/test/leo_object_storage_api_tests.erl index 568cb43..9b2451a 100644 --- a/test/leo_object_storage_api_tests.erl +++ b/test/leo_object_storage_api_tests.erl @@ -291,13 +291,7 @@ compact() -> ok = put_regular_bin(240, 10), %% Change waiting-time of the procs - ok = leo_compact_fsm_controller:decrease(), - timer:sleep(10), - ok = leo_compact_fsm_controller:decrease(), - timer:sleep(10), - ok = leo_compact_fsm_controller:decrease(), - timer:sleep(10), - ok = leo_compact_fsm_controller:decrease(), + [leo_compact_fsm_controller:decrease() || _Num <- lists:seq(1, 30)], timer:sleep(3000), ok = leo_compact_fsm_controller:increase(),