Skip to content

Commit

Permalink
Implement get_action_info to all ActionStateResult impls (TraceMachin…
Browse files Browse the repository at this point in the history
…a#1118)

Pipes get_action_info() to be implemented everywhere.
  • Loading branch information
allada authored Jul 9, 2024
1 parent 2fa4fee commit ba52c7f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
3 changes: 1 addition & 2 deletions nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub trait ActionStateResult: Send + Sync + 'static {
// Subscribes to the state of the action, receiving updates as they are published.
async fn as_receiver(&self) -> Result<Cow<'_, watch::Receiver<Arc<ActionState>>>, Error>;
// Provide result as action info. This behavior will not be supported by all implementations.
// TODO(adams): Expectation is this to experimental and removed in the future.
async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error>;
}

Expand Down Expand Up @@ -106,7 +105,7 @@ pub trait ClientStateManager: Sync + Send + 'static {
async fn add_action(
&self,
client_operation_id: ClientOperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<Arc<dyn ActionStateResult>, Error>;

/// Returns a stream of operations that match the filter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use tokio::sync::watch::Receiver;
use crate::operation_state_manager::ActionStateResult;

pub(crate) struct ClientActionStateResult {
/// The action info for the action.
action_info: Arc<ActionInfo>,

/// The receiver for the action state updates.
rx: Receiver<Arc<ActionState>>,

Expand All @@ -38,6 +41,7 @@ pub(crate) struct ClientActionStateResult {

impl ClientActionStateResult {
pub(crate) fn new(
action_info: Arc<ActionInfo>,
mut rx: Receiver<Arc<ActionState>>,
maybe_keepalive_spawn: Option<JoinHandleDropGuard<()>>,
) -> Self {
Expand All @@ -46,6 +50,7 @@ impl ClientActionStateResult {
// without having to use an explicit notification.
rx.mark_changed();
Self {
action_info,
rx,
_maybe_keepalive_spawn: maybe_keepalive_spawn,
}
Expand All @@ -63,6 +68,6 @@ impl ActionStateResult for ClientActionStateResult {
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
todo!()
Ok(self.action_info.clone())
}
}
17 changes: 10 additions & 7 deletions nativelink-scheduler/src/scheduler_state/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl AwaitedActionDb {
&mut self,
state_manager_impl: &Weak<Mutex<StateManagerImpl>>,
client_operation_id: ClientOperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> watch::Receiver<Arc<ActionState>> {
// Check to see if the action is already known and subscribe if it is.
let subscription_result = self
Expand All @@ -406,11 +406,12 @@ impl AwaitedActionDb {
.await;
let action_info = match subscription_result {
Ok(subscription) => return subscription,
Err(_) => Arc::new(action_info),
Err(_) => action_info,
};

let unique_qualifier = action_info.unique_qualifier.clone();
let (awaited_action, sort_key, subscription) =
AwaitedAction::new_with_subscription(action_info.clone());
AwaitedAction::new_with_subscription(action_info);
let awaited_action = Arc::new(awaited_action);
self.client_operation_to_awaited_action
.insert(
Expand All @@ -423,7 +424,7 @@ impl AwaitedActionDb {
)
.await;
self.action_info_hash_key_to_awaited_action
.insert(action_info.unique_qualifier.clone(), awaited_action.clone());
.insert(unique_qualifier.clone(), awaited_action.clone());
self.operation_id_to_awaited_action
.insert(awaited_action.get_operation_id(), awaited_action.clone());

Expand Down Expand Up @@ -862,7 +863,7 @@ impl StateManagerImpl {
async fn inner_add_operation(
&mut self,
new_client_operation_id: ClientOperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
let rx = self
.action_db
Expand Down Expand Up @@ -927,15 +928,16 @@ impl ClientStateManager for StateManager {
async fn add_action(
&self,
client_operation_id: ClientOperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<Arc<dyn ActionStateResult>, Error> {
let mut inner = self.inner.lock().await;
let rx = inner
.inner_add_operation(client_operation_id.clone(), action_info)
.inner_add_operation(client_operation_id.clone(), action_info.clone())
.await?;

let inner_weak = Arc::downgrade(&self.inner);
Ok(Arc::new(ClientActionStateResult::new(
action_info,
rx,
Some(make_client_keepalive_spawn(client_operation_id, inner_weak)),
)))
Expand All @@ -949,6 +951,7 @@ impl ClientStateManager for StateManager {
let inner_weak = Arc::downgrade(&self.inner);
self.inner_filter_operations(filter, move |awaited_action| {
Arc::new(ClientActionStateResult::new(
awaited_action.get_action_info().clone(),
awaited_action.subscribe(),
maybe_client_operation_id
.as_ref()
Expand Down
5 changes: 3 additions & 2 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl SimpleScheduler {
async fn add_action(
&self,
client_operation_id: ClientOperationId,
action_info: ActionInfo,
action_info: Arc<ActionInfo>,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let add_action_result = self
.client_state_manager
Expand Down Expand Up @@ -375,7 +375,8 @@ impl ActionScheduler for SimpleScheduler {
client_operation_id: ClientOperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
self.add_action(client_operation_id, action_info).await
self.add_action(client_operation_id, Arc::new(action_info))
.await
}

async fn find_by_client_operation_id(
Expand Down

0 comments on commit ba52c7f

Please sign in to comment.