Skip to content

Commit

Permalink
[Refactor] Moves worker logic back to SimpleScheduler
Browse files Browse the repository at this point in the history
Worker logic should not be visible to StateManager just yet. In the
future this will likely change, but for this phase of the refactor
SimpleScheduler should own all information about workers.

towards: #359
  • Loading branch information
allada committed Jun 29, 2024
1 parent 7a16e2e commit b9d9702
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 254 deletions.
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct OrderBy {
pub type ActionStateResultStream = Pin<Box<dyn Stream<Item = Arc<dyn ActionStateResult>> + Send>>;

#[async_trait]
pub trait ClientStateManager {
pub trait ClientStateManager: Sync + Send + 'static {
/// Add a new action to the queue or joins an existing action.
async fn add_action(
&self,
Expand All @@ -113,7 +113,7 @@ pub trait ClientStateManager {
}

#[async_trait]
pub trait WorkerStateManager {
pub trait WorkerStateManager: Sync + Send + 'static {
/// Update that state of an operation.
/// The worker must also send periodic updates even if the state
/// did not change with a modified timestamp in order to prevent
Expand All @@ -127,7 +127,7 @@ pub trait WorkerStateManager {
}

#[async_trait]
pub trait MatchingEngineStateManager {
pub trait MatchingEngineStateManager: Sync + Send + 'static {
/// Returns a stream of operations that match the filter.
async fn filter_operations(
&self,
Expand Down
182 changes: 13 additions & 169 deletions nativelink-scheduler/src/scheduler_state/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use async_lock::Mutex;
use async_trait::async_trait;
use futures::stream;
use hashbrown::{HashMap, HashSet};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState, ExecutionMetadata,
OperationId, WorkerId,
ActionInfo, ActionInfoHashKey, ActionStage, ActionState, OperationId, WorkerId,
};
use tokio::sync::watch::error::SendError;
use tokio::sync::{watch, Notify};
Expand All @@ -39,8 +38,6 @@ use crate::scheduler_state::client_action_state_result::ClientActionStateResult;
use crate::scheduler_state::completed_action::CompletedAction;
use crate::scheduler_state::matching_engine_action_state_result::MatchingEngineActionStateResult;
use crate::scheduler_state::metrics::Metrics;
use crate::scheduler_state::workers::Workers;
use crate::worker::WorkerUpdate;

#[repr(transparent)]
pub(crate) struct StateManager {
Expand All @@ -52,22 +49,18 @@ impl StateManager {
pub(crate) fn new(
queued_actions_set: HashSet<Arc<ActionInfo>>,
queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,
workers: Workers,
active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,
recently_completed_actions: HashSet<CompletedAction>,
metrics: Arc<Metrics>,
max_job_retries: usize,
tasks_change_notify: Arc<Notify>,
) -> Self {
Self {
inner: Mutex::new(StateManagerImpl {
queued_actions_set,
queued_actions,
workers,
active_actions,
recently_completed_actions,
metrics,
max_job_retries,
tasks_change_notify,
}),
}
Expand Down Expand Up @@ -102,10 +95,6 @@ pub(crate) struct StateManagerImpl {
/// Important: `queued_actions_set` and `queued_actions` must be kept in sync.
pub(crate) queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,

/// A `Workers` pool that contains all workers that are available to execute actions in a priority
/// order based on the allocation strategy.
pub(crate) workers: Workers,

/// A map of all actions that are active. A hashmap is used to find actions that are active in
/// O(1) time. The key is the `ActionInfo` struct. The value is the `AwaitedAction` struct.
pub(crate) active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,
Expand All @@ -118,9 +107,6 @@ pub(crate) struct StateManagerImpl {

pub(crate) metrics: Arc<Metrics>,

/// Default times a job can retry before failing.
pub(crate) max_job_retries: usize,

/// Notify task<->worker matching engine that work needs to be done.
pub(crate) tasks_change_notify: Arc<Notify>,
}
Expand Down Expand Up @@ -199,123 +185,6 @@ fn mutate_priority(action_info: &mut Arc<ActionInfo>, priority: i32) {
}

impl StateManagerImpl {
fn immediate_evict_worker(&mut self, worker_id: &WorkerId, err: Error) {
if let Some(mut worker) = self.workers.remove_worker(worker_id) {
self.metrics.workers_evicted.inc();
// We don't care if we fail to send message to worker, this is only a best attempt.
let _ = worker.notify_update(WorkerUpdate::Disconnect);
// We create a temporary Vec to avoid doubt about a possible code
// path touching the worker.running_action_infos elsewhere.
for action_info in worker.running_action_infos.drain() {
self.metrics.workers_evicted_with_running_action.inc();
self.retry_action(&action_info, worker_id, err.clone());
}
// Note: Calling this multiple times is very cheap, it'll only trigger `do_try_match` once.
self.tasks_change_notify.notify_one();
}
}

fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId, err: Error) {
match self.active_actions.remove(action_info) {
Some(running_action) => {
let mut awaited_action = running_action;
let send_result = if awaited_action.attempts >= self.max_job_retries {
self.metrics.retry_action_max_attempts_reached.inc();
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: format!("{worker_id}"),
..ExecutionMetadata::default()
},
error: Some(err.merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed"
))),
..ActionResult::default()
});
awaited_action
.notify_channel
.send(awaited_action.current_state.clone())
// Do not put the action back in the queue here, as this action attempted to run too many
// times.
} else {
self.metrics.retry_action.inc();
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Queued;
let send_result = awaited_action
.notify_channel
.send(awaited_action.current_state.clone());
self.queued_actions_set.insert(action_info.clone());
self.queued_actions
.insert(action_info.clone(), awaited_action);
send_result
};

if send_result.is_err() {
self.metrics.retry_action_no_more_listeners.inc();
// Don't remove this task, instead we keep them around for a bit just in case
// the client disconnected and will reconnect and ask for same job to be executed
// again.
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during evict_worker()"
);
}
}
None => {
self.metrics.retry_action_but_action_missing.inc();
event!(
Level::ERROR,
?action_info,
?worker_id,
"Worker stated it was running an action, but it was not in the active_actions"
);
}
}
}

/// Notifies the specified worker to run the given action and handles errors by evicting
/// the worker if the notification fails.
///
/// # Note
///
/// Intended utility function for matching engine.
///
/// # Errors
///
/// This function will return an error if the notification to the worker fails, and in that case,
/// the worker will be immediately evicted from the system.
///
async fn worker_notify_run_action(
&mut self,
worker_id: WorkerId,
action_info: Arc<ActionInfo>,
) -> Result<(), Error> {
if let Some(worker) = self.workers.workers.get_mut(&worker_id) {
let notify_worker_result =
worker.notify_update(WorkerUpdate::RunAction(action_info.clone()));

if notify_worker_result.is_err() {
event!(
Level::WARN,
?worker_id,
?action_info,
?notify_worker_result,
"Worker command failed, removing worker",
);

let err = make_err!(
Code::Internal,
"Worker command failed, removing worker {worker_id} -- {notify_worker_result:?}",
);

self.immediate_evict_worker(&worker_id, err.clone());
return Err(err);
}
}
Ok(())
}

/// Marks the specified action as active, assigns it to the given worker, and updates the
/// action stage. This function removes the action from the queue, updates the action's state
/// or error, and inserts it into the set of active actions.
Expand Down Expand Up @@ -404,7 +273,7 @@ impl StateManagerImpl {
?action_info_hash_key,
?worker_id,
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker",
);
);
return;
};
if running_action_worker_id == *worker_id {
Expand All @@ -428,20 +297,6 @@ impl StateManagerImpl {
self.active_actions
.insert(action_info.clone(), running_action);

// Clear this action from the current worker.
if let Some(worker) = self.workers.workers.get_mut(worker_id) {
let was_paused = !worker.can_accept_work();
// This unpauses, but since we're completing with an error, don't
// unpause unless all actions have completed.
worker.complete_action(&action_info);
// Only pause if there's an action still waiting that will unpause.
if (was_paused || due_to_backpressure) && worker.has_actions() {
worker.is_paused = true;
}
}

// Re-queue the action or fail on max attempts.
self.retry_action(&action_info, worker_id, err);
self.tasks_change_notify.notify_one();
}
}
Expand Down Expand Up @@ -567,12 +422,10 @@ impl WorkerStateManager for StateManager {
?action_stage,
"Worker sent error while updating action. Removing worker"
);
let err = make_err!(
Code::Internal,
"Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.",
);
inner.immediate_evict_worker(&worker_id, err.clone());
return Err(err);
return Err(make_err!(
Code::Internal,
"Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.",
));
}

let (action_info, mut running_action) = inner
Expand All @@ -585,15 +438,14 @@ impl WorkerStateManager for StateManager {
if running_action.worker_id != Some(worker_id) {
inner.metrics.update_action_from_wrong_worker.inc();
let err = match running_action.worker_id {

Some(running_action_worker_id) => make_err!(
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}",
),
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}",
),
None => make_err!(
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}",
),
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}",
),
};
event!(
Level::ERROR,
Expand All @@ -605,7 +457,6 @@ impl WorkerStateManager for StateManager {
);
// First put it back in our active_actions or we will drop the task.
inner.active_actions.insert(action_info, running_action);
inner.immediate_evict_worker(&worker_id, err.clone());
return Err(err);
}

Expand Down Expand Up @@ -635,10 +486,6 @@ impl WorkerStateManager for StateManager {
state: running_action.current_state,
});

let worker = inner.workers.workers.get_mut(&worker_id).ok_or_else(|| {
make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
})?;
worker.complete_action(&action_info);
inner.tasks_change_notify.notify_one();
Ok(())
}
Expand Down Expand Up @@ -689,9 +536,6 @@ impl MatchingEngineStateManager for StateManager {
if let Some(action_info) = inner.queued_actions_set.get(&operation_id.unique_qualifier) {
if let Some(worker_id) = worker_id {
let action_info = action_info.clone();
inner
.worker_notify_run_action(worker_id, action_info.clone())
.await?;
inner
.worker_set_as_active(action_info, worker_id, action_stage)
.await?;
Expand Down
Loading

0 comments on commit b9d9702

Please sign in to comment.