Skip to content

Commit

Permalink
Fix leo-project/leofs/issues/393 more correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Jul 6, 2015
1 parent e16f007 commit 2fd5ca5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 45 deletions.
5 changes: 1 addition & 4 deletions include/leo_object_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,8 @@
}).

%% @doc Retrieve compaction-proc's step parameters
-define(step_compaction_proc_values(_State),
-define(step_compaction_proc_values(_RegBatchProcs,_RegInterval,_NumOfSteps),
begin
#compaction_state{interval = _RegInterval,
num_of_batch_procs = _RegBatchProcs,
num_of_steps = _NumOfSteps} = _State,
_StepBatchOfProcs = leo_math:ceiling(_RegBatchProcs / _NumOfSteps),
_StepInterval = leo_math:ceiling(_RegInterval / _NumOfSteps),
{ok, {_StepBatchOfProcs,_StepInterval}}
Expand Down
70 changes: 36 additions & 34 deletions src/leo_compact_fsm_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ idling(#compaction_event_info{event = ?EVENT_RUN,
error_pos = 0,
set_errors = sets:new(),
acc_errors = [],
interval = ?env_compaction_interval_reg(),
max_interval = ?env_compaction_interval_max(),
max_num_of_batch_procs = ?env_compaction_num_of_batch_procs_max(),
num_of_batch_procs = ?env_compaction_num_of_batch_procs_reg(),
max_num_of_batch_procs = ?env_compaction_num_of_batch_procs_max(),
compaction_prms =
CompactionPrms#compaction_prms{
num_of_active_objs = 0,
Expand Down Expand Up @@ -275,23 +277,12 @@ idling(#compaction_event_info{event = ?EVENT_STATE,
{next_state, NextStatus, State#compaction_state{status = NextStatus}};

idling(#compaction_event_info{event = ?EVENT_INCREASE}, State) ->
BatchProcs = ?env_compaction_num_of_batch_procs_reg(),
Interval = ?env_compaction_interval_reg(),
NextStatus = ?ST_IDLING,
{next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs,
interval = Interval,
status = NextStatus}};
{next_state, NextStatus, State#compaction_state{status = NextStatus}};

idling(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_of_batch_procs = BatchProcs,
interval = Interval,
max_interval = MaxInterval} = State) ->
{ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State),
BatchProcs_1 = decr_batch_of_msgs_fun(BatchProcs, StepBatchProcs),
Interval_1 = incr_interval_fun(Interval, MaxInterval, StepInterval),
idling(#compaction_event_info{event = ?EVENT_DECREASE}, State) ->
NextStatus = ?ST_IDLING,
{next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1,
interval = Interval_1,
status = NextStatus}};
{next_state, NextStatus, State#compaction_state{status = NextStatus}};
idling(_, State) ->
NextStatus = ?ST_IDLING,
{next_state, NextStatus, State#compaction_state{status = NextStatus}}.
Expand Down Expand Up @@ -321,6 +312,7 @@ running(#compaction_event_info{event = ?EVENT_RUN,
compaction_prms =
#compaction_prms{
start_lock_offset = StartLockOffset}} = State) ->

%% Temporally suspend the compaction
%% in order to decrease i/o load
CountProcs_1 = case (CountProcs < 1) of
Expand Down Expand Up @@ -396,8 +388,12 @@ running(#compaction_event_info{event = ?EVENT_STATE,

running(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_of_batch_procs = BatchProcs,
max_num_of_batch_procs = MaxBatchProcs,
interval = Interval} = State) ->
{ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State),
interval = Interval,
num_of_steps = NumOfSteps} = State) ->
{ok, {StepBatchProcs, StepInterval}} =
?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(),
?env_compaction_interval_reg(),
NumOfSteps),
BatchProcs_1 = incr_batch_of_msgs_fun(BatchProcs, MaxBatchProcs, StepBatchProcs),
Interval_1 = decr_interval_fun(Interval, StepInterval),

Expand All @@ -408,15 +404,13 @@ running(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_o

running(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_of_batch_procs = BatchProcs,
interval = Interval,
max_interval = MaxInterval} = State) ->
{ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State),
Interval_1 = Interval + StepInterval,
Interval_2 = case (Interval_1 >= MaxInterval) of
true ->
MaxInterval;
false ->
Interval_1
end,
max_interval = MaxInterval,
num_of_steps = NumOfSteps} = State) ->
{ok, {StepBatchProcs, StepInterval}} =
?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(),
?env_compaction_interval_reg(),
NumOfSteps),
Interval_1 = incr_interval_fun(Interval, MaxInterval, StepInterval),

{NextStatus, BatchProcs_1} =
case (BatchProcs =< 0) of
Expand All @@ -427,7 +421,7 @@ running(#compaction_event_info{event = ?EVENT_DECREASE}, #compaction_state{num_o
decr_batch_of_msgs_fun(BatchProcs, StepBatchProcs)}
end,
{next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1,
interval = Interval_2,
interval = Interval_1,
status = NextStatus}};

running(_, State) ->
Expand Down Expand Up @@ -458,14 +452,22 @@ suspending(#compaction_event_info{event = ?EVENT_STATE,
erlang:send(Client, NextStatus),
{next_state, NextStatus, State#compaction_state{status = NextStatus}};

suspending(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{num_of_batch_procs = BatchProcs,
max_num_of_batch_procs = MaxBatchProcs,
interval = Interval} = State) ->
{ok, {StepBatchProcs, StepInterval}} = ?step_compaction_proc_values(State),
suspending(#compaction_event_info{event = ?EVENT_INCREASE}, #compaction_state{id = Id,
is_diagnosing = IsDiagnosing,
is_recovering = IsRecovering,
num_of_batch_procs = BatchProcs,
max_num_of_batch_procs = MaxBatchProcs,
interval = Interval,
num_of_steps = NumOfSteps} = State) ->
{ok, {StepBatchProcs, StepInterval}} =
?step_compaction_proc_values(?env_compaction_num_of_batch_procs_reg(),
?env_compaction_interval_reg(),
NumOfSteps),
BatchProcs_1 = incr_batch_of_msgs_fun(BatchProcs, MaxBatchProcs, StepBatchProcs),
Interval_1 = decr_interval_fun(Interval, StepInterval),

NextStatus = ?ST_RUNNING,
timer:apply_after(timer:seconds(1), ?MODULE, run, [Id, IsDiagnosing, IsRecovering]),
{next_state, NextStatus, State#compaction_state{num_of_batch_procs = BatchProcs_1,
interval = Interval_1,
status = NextStatus}};
Expand Down Expand Up @@ -1108,7 +1110,7 @@ gen_compaction_report(State) ->
NewInterval::non_neg_integer()).
incr_interval_fun(Interval, MaxInterval, StepInterval) ->
Interval_1 = Interval + StepInterval,
case (Interval_1 > MaxInterval) of
case (Interval_1 >= MaxInterval) of
true ->
MaxInterval;
false ->
Expand All @@ -1124,9 +1126,9 @@ incr_interval_fun(Interval, MaxInterval, StepInterval) ->
NewInterval::non_neg_integer()).
decr_interval_fun(Interval, StepInterval) ->
Interval_1 = Interval - StepInterval,
case (Interval_1 < 0) of
case (Interval_1 =< ?DEF_MIN_COMPACTION_WT) of
true ->
0;
?DEF_MIN_COMPACTION_WT;
false ->
Interval_1
end.
Expand Down
8 changes: 1 addition & 7 deletions test/leo_object_storage_api_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,7 @@ compact() ->
ok = put_regular_bin(240, 10),

%% Change waiting-time of the procs
ok = leo_compact_fsm_controller:decrease(),
timer:sleep(10),
ok = leo_compact_fsm_controller:decrease(),
timer:sleep(10),
ok = leo_compact_fsm_controller:decrease(),
timer:sleep(10),
ok = leo_compact_fsm_controller:decrease(),
[leo_compact_fsm_controller:decrease() || _Num <- lists:seq(1, 30)],
timer:sleep(3000),

ok = leo_compact_fsm_controller:increase(),
Expand Down

0 comments on commit 2fd5ca5

Please sign in to comment.