Skip to content

Commit

Permalink
SimpleScheduler now uses config for action pruning (#1137)
Browse files Browse the repository at this point in the history
  • Loading branch information
allada authored Jul 10, 2024
1 parent eaaa872 commit 1fdd505
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
2 changes: 1 addition & 1 deletion nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct SimpleScheduler {
/// a WaitExecution is called after the action has completed.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u64,
pub retain_completed_for_s: u32,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
Expand Down
21 changes: 7 additions & 14 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,12 @@ const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

/// Default timeout for recently completed actions in seconds.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60;

/// Default times a job can retry before failing.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Default time in seconds before a client is evicted.
// TODO!(make this a config and documented)
const CLIENT_EVICTION_SECONDS: u32 = 300; // 5 mins

struct SimpleSchedulerActionListener {
client_operation_id: ClientOperationId,
action_state_result: Arc<dyn ActionStateResult>,
Expand Down Expand Up @@ -119,12 +115,10 @@ pub struct SimpleScheduler {
client_state_manager: Arc<dyn ClientStateManager>,

platform_property_manager: Arc<PlatformPropertyManager>,
// metrics: Arc<Metrics>,
// Triggers `drop()`` call if scheduler is dropped.
_task_worker_matching_future: JoinHandleDropGuard<()>,

/// The duration that actions are kept in recently_completed_actions for.
_retain_completed_for: Duration,
/// Background task that tries to match actions to workers. If this struct
/// is dropped the spawn will be cancelled as well.
_task_worker_matching_spawn: JoinHandleDropGuard<()>,

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
Expand Down Expand Up @@ -310,7 +304,7 @@ impl SimpleScheduler {
let tasks_or_worker_change_notify = Arc::new(Notify::new());
let state_manager = Arc::new(MemorySchedulerStateManager::new(
&EvictionPolicy {
max_seconds: CLIENT_EVICTION_SECONDS,
max_seconds: retain_completed_for_s,
..Default::default()
},
tasks_or_worker_change_notify.clone(),
Expand All @@ -329,7 +323,7 @@ impl SimpleScheduler {

let action_scheduler = Arc::new_cyclic(move |weak_self| -> Self {
let weak_inner = weak_self.clone();
let task_worker_matching_future =
let task_worker_matching_spawn =
spawn!("simple_scheduler_task_worker_matching", async move {
// Break out of the loop only when the inner is dropped.
loop {
Expand All @@ -351,10 +345,9 @@ impl SimpleScheduler {
SimpleScheduler {
matching_engine_state_manager: state_manager.clone(),
client_state_manager: state_manager.clone(),
_retain_completed_for: Duration::new(retain_completed_for_s, 0),
workers,
platform_property_manager,
_task_worker_matching_future: task_worker_matching_future,
_task_worker_matching_spawn: task_worker_matching_spawn,
}
});
(action_scheduler, workers_copy)
Expand Down

0 comments on commit 1fdd505

Please sign in to comment.