Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reconnected agents not getting deleted workload #367

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions agent/src/agent_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ impl AgentManager {
log::debug!("Process command received from server.");

match from_server_msg {
FromServer::ServerHello(method_obj) => {
log::debug!(
"Agent '{}' received ServerHello:\n\tAdded workloads: {:?}",
self.agent_name,
method_obj.added_workloads
);

self.runtime_manager
.handle_server_hello(method_obj.added_workloads, &self.workload_state_store)
.await;
Some(())
}
FromServer::UpdateWorkload(method_obj) => {
log::debug!("Agent '{}' received UpdateWorkload:\n\tAdded workloads: {:?}\n\tDeleted workloads: {:?}",
self.agent_name,
Expand Down
100 changes: 45 additions & 55 deletions agent/src/runtime_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct RuntimeManager {
agent_name: AgentName,
run_folder: PathBuf,
control_interface_tx: ToServerSender,
initial_workload_list_received: bool,
workloads: HashMap<String, Workload>,
// [impl->swdd~agent-supports-multiple-runtime-connectors~1]
runtime_map: HashMap<String, Box<dyn RuntimeFacade>>,
Expand All @@ -85,7 +84,6 @@ impl RuntimeManager {
agent_name,
run_folder,
control_interface_tx,
initial_workload_list_received: false,
workloads: HashMap::new(),
runtime_map,
update_state_tx: update_state_tx.clone(),
Expand All @@ -108,34 +106,12 @@ impl RuntimeManager {
}
}

// [impl->swdd~agent-handles-update-workload-requests~1]
pub async fn handle_update_workload(
pub async fn execute_workloads(
&mut self,
mut added_workloads: Vec<WorkloadSpec>,
added_workloads: Vec<WorkloadSpec>,
deleted_workloads: Vec<DeletedWorkload>,
workload_state_db: &WorkloadStateStore,
) {
log::info!(
"Received a new desired state with '{}' added and '{}' deleted workloads.",
added_workloads.len(),
deleted_workloads.len()
);

if !self.initial_workload_list_received {
self.initial_workload_list_received = true;
if !deleted_workloads.is_empty() {
log::error!(
"Received an initial workload list with delete workload commands: '{:?}'",
deleted_workloads
);
}

// [impl->swdd~agent-initial-list-existing-workloads~1]
added_workloads = self
.resume_and_remove_from_added_workloads(added_workloads)
.await;
}

let workload_operations: Vec<WorkloadOperation> =
self.transform_into_workload_operations(added_workloads, deleted_workloads);

Expand All @@ -150,6 +126,42 @@ impl RuntimeManager {
.await;
}

// [impl->swdd~agent-initial-list-existing-workloads~1]
pub async fn handle_server_hello(
&mut self,
mut added_workloads: Vec<WorkloadSpec>,
workload_state_db: &WorkloadStateStore,
) {
log::info!(
"Received the server hello with '{}' added workloads.",
added_workloads.len()
);

added_workloads = self
.resume_and_remove_from_added_workloads(added_workloads)
.await;

self.execute_workloads(added_workloads, vec![], workload_state_db)
.await;
}

// [impl->swdd~agent-handles-update-workload-requests~1]
pub async fn handle_update_workload(
&mut self,
added_workloads: Vec<WorkloadSpec>,
deleted_workloads: Vec<DeletedWorkload>,
workload_state_db: &WorkloadStateStore,
) {
log::info!(
"Received a new desired state with '{}' added and '{}' deleted workloads.",
added_workloads.len(),
deleted_workloads.len()
);

self.execute_workloads(added_workloads, deleted_workloads, workload_state_db)
.await;
}

// [impl->swdd~agent-forward-responses-to-control-interface-pipe~1]
pub async fn forward_response(&mut self, mut response: ank_base::Response) {
// [impl->swdd~agent-uses-id-prefix-forward-control-interface-response-correct-workload~1]
Expand Down Expand Up @@ -650,10 +662,9 @@ mod tests {
.build();

runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.contains_key(WORKLOAD_1_NAME));
assert!(runtime_manager.workloads.contains_key(WORKLOAD_2_NAME));
}
Expand Down Expand Up @@ -709,10 +720,9 @@ mod tests {
.build();

runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.is_empty());
}

Expand Down Expand Up @@ -781,11 +791,10 @@ mod tests {
.build();

runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;
server_receiver.close();

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.contains_key(WORKLOAD_1_NAME));
}

Expand Down Expand Up @@ -852,10 +861,9 @@ mod tests {

let added_workloads = vec![existing_workload];
runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.contains_key(WORKLOAD_1_NAME));
}

Expand Down Expand Up @@ -1027,10 +1035,9 @@ mod tests {
.build();

runtime_manager
.handle_update_workload(vec![], vec![], &MockWorkloadStateStore::default())
.handle_server_hello(vec![], &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.is_empty());
}

Expand Down Expand Up @@ -1077,10 +1084,9 @@ mod tests {
.build();

runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(runtime_manager.workloads.is_empty());
}

Expand Down Expand Up @@ -1151,10 +1157,9 @@ mod tests {

let added_workloads = vec![existing_workload];
runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.handle_server_hello(added_workloads, &MockWorkloadStateStore::default())
.await;

assert!(runtime_manager.initial_workload_list_received);
assert!(!runtime_manager.workloads.contains_key(WORKLOAD_1_NAME));
}

Expand Down Expand Up @@ -1208,8 +1213,6 @@ mod tests {
)
.build();

runtime_manager.initial_workload_list_received = true;

let mut workload_mock = MockWorkload::default();
workload_mock
.expect_update()
Expand Down Expand Up @@ -1317,8 +1320,6 @@ mod tests {
)
.build();

runtime_manager.initial_workload_list_received = true;

runtime_manager
.workloads
.insert(WORKLOAD_1_NAME.to_string(), workload_mock);
Expand Down Expand Up @@ -1388,7 +1389,6 @@ mod tests {
Box::new(runtime_facade_mock) as Box<dyn RuntimeFacade>,
)
.build();
runtime_manager.initial_workload_list_received = true;

let added_workloads = vec![new_workload];
let deleted_workloads = vec![deleted_workload];
Expand Down Expand Up @@ -1453,8 +1453,6 @@ mod tests {
)
.build();

runtime_manager.initial_workload_list_received = true;

let mut workload_mock = MockWorkload::default();
workload_mock
.expect_update()
Expand Down Expand Up @@ -1537,8 +1535,6 @@ mod tests {
)
.build();

runtime_manager.initial_workload_list_received = true;

let added_workloads = vec![new_workload];
runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
Expand Down Expand Up @@ -1586,8 +1582,6 @@ mod tests {
RUNTIME_NAME.to_string(),
)];

runtime_manager.initial_workload_list_received = true;

runtime_manager
.handle_update_workload(added_workloads, vec![], &MockWorkloadStateStore::default())
.await;
Expand Down Expand Up @@ -1635,8 +1629,6 @@ mod tests {
)
.build();

runtime_manager.initial_workload_list_received = true;

let mut workload_mock = MockWorkload::default();
workload_mock
.expect_update()
Expand Down Expand Up @@ -1690,8 +1682,6 @@ mod tests {
let (mut server_receiver, mut runtime_manager, _wl_state_receiver) =
RuntimeManagerBuilder::default().build();

runtime_manager.initial_workload_list_received = true;

let mut workload_mock = MockWorkload::default();
workload_mock.expect_delete().never();

Expand Down
6 changes: 6 additions & 0 deletions common/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ impl TryFrom<ank_base::UpdateStateRequest> for UpdateStateRequest {
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ServerHello {
pub agent_name: Option<String>,
pub added_workloads: Vec<WorkloadSpec>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct UpdateWorkload {
pub added_workloads: Vec<WorkloadSpec>,
Expand Down
20 changes: 20 additions & 0 deletions common/src/from_server_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl From<SendError<FromServer>> for FromServerInterfaceError {

#[derive(Debug, Clone, PartialEq)]
pub enum FromServer {
ServerHello(commands::ServerHello),
UpdateWorkload(commands::UpdateWorkload),
UpdateWorkloadState(commands::UpdateWorkloadState),
Response(ank_base::Response),
Expand All @@ -44,6 +45,11 @@ pub enum FromServer {
// [impl->swdd~from-server-channel~1]
#[async_trait]
pub trait FromServerInterface {
async fn server_hello(
&self,
agent_name: Option<String>,
added_workloads: Vec<WorkloadSpec>,
) -> Result<(), FromServerInterfaceError>;
async fn update_workload(
&self,
added_workloads: Vec<WorkloadSpec>,
Expand Down Expand Up @@ -78,6 +84,20 @@ pub type FromServerReceiver = tokio::sync::mpsc::Receiver<FromServer>;

#[async_trait]
impl FromServerInterface for FromServerSender {
async fn server_hello(
&self,
// This is a workaround for not having a request-response model dedicated for the communication middleware
agent_name: Option<String>,
added_workloads: Vec<WorkloadSpec>,
) -> Result<(), FromServerInterfaceError> {
Ok(self
.send(FromServer::ServerHello(commands::ServerHello {
agent_name,
added_workloads,
}))
.await?)
}

async fn update_workload(
&self,
added_workloads: Vec<WorkloadSpec>,
Expand Down
8 changes: 8 additions & 0 deletions grpc/proto/grpc_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message FromServer {
UpdateWorkload updateWorkload = 1; /// A message containing lists of workloads to be added or deleted.
UpdateWorkloadState updateWorkloadState = 2; /// A message containing list of workload execution states.
ank_base.Response response = 3; /// A message containing a response to a previous request.
ServerHello serverHello = 4; /// A message containing information about the workloads to be added after the agent connects.
}
}

Expand All @@ -77,6 +78,13 @@ message AgentHello {
message Goodbye {
}

/**
* A message representing the response to the AgentHello message from agent. It provides information about the added workloads of the agent.
*/
message ServerHello {
repeated AddedWorkload addedWorkloads = 1; /// A list of messages containing information about a workload to be added by an Ankaios agent.
}

/**
* A message providing information about the workloads to be added and/or deleted.
*/
Expand Down
Loading