Skip to content

Commit

Permalink
Add node resource availability (#378)
Browse files Browse the repository at this point in the history
* initial commit

* Add Resource Measurement

* More useless things

* Delete agent/src/resource_measurement/resource_measurement_sender

* Sending of AgentResource

* Add the actual measurement

* Add data load to resource availability message

* Add resource availability in ank get agents

* Measure only free memory

* Finish impl and start adding unit tests

* Add unit tests

* Fix unit tests

* Update documentation

* Rename node resource availability message

* Update AgentMap

* Fix failing unit tests

* Update docs

Co-authored-by: Oliver <42932060+inf17101@users.noreply.github.com>

* Update ank/doc/swdesign/README.md

Co-authored-by: Oliver <42932060+inf17101@users.noreply.github.com>

* Update resource availability swdd

* Update agent resource availability swdd

* Update agent manager unit tests

* Refactor the node resource availability

* Update doc and test cases

* Update server/doc/swdesign/README.md

Co-authored-by: Oliver <42932060+inf17101@users.noreply.github.com>

* Update filtering

* Update filtering

* Update stests and documentation

* Apply suggestions from code review - Documentation

Co-authored-by: Kaloyan <36224699+krucod3@users.noreply.github.com>

* Update docs and agent_manager

* Update server/doc/swdesign/README.md

Co-authored-by: Kaloyan <36224699+krucod3@users.noreply.github.com>

* Update server/doc/swdesign/README.md

Co-authored-by: Kaloyan <36224699+krucod3@users.noreply.github.com>

* Fix match assertion and PERCENTAGE_BASE

* Improve get agents readability

* Update doc

* Rename CpuLoad to CpuUsage

* Update requirements versions

* Update messages

* Fix CLI format and trace messages

---------

Co-authored-by: Oliver <42932060+inf17101@users.noreply.github.com>
Co-authored-by: Kaloyan <36224699+krucod3@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent d90ce71 commit 76d5773
Show file tree
Hide file tree
Showing 25 changed files with 860 additions and 62 deletions.
124 changes: 123 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde_json = "1.0"
uuid = { version = "1.3", features = ["v4", "fast-rng"] }
sha256 = "1.5"
umask = "2.1.0"
sysinfo = "0.31"
regex = "1.10"

[dev-dependencies]
Expand Down
17 changes: 17 additions & 0 deletions agent/doc/swdesign/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2759,6 +2759,23 @@ Needs:
- impl
- utest

#### AgentManager sends the node resource availability to the server
`swdd~agent-sends-node-resource-availability-to-server~1`

Status: approved

At an interval of 2 seconds, the AgentManager measures the global CPU usage and the available free memory and sends them to the Ankaios server via an `AgentLoadStatus` message.

Rationale:
Available resources must be available in the cluster in order to enable dynamic scheduling, e.g., done by a workload.

Tags:
- AgentManager

Needs:
- impl
- utest

### Forwarding the Control Interface

The Ankaios Agent is responsible to forward Control Interface requests from a Workload to the Ankaios Server and to forward Control Interface responses from the Ankaios Server to the Workload.
Expand Down
94 changes: 91 additions & 3 deletions agent/src/agent_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
// under the License.
//
// SPDX-License-Identifier: Apache-2.0
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};

use common::{
commands::AgentLoadStatus,
from_server_interface::{FromServer, FromServerReceiver},
objects::WorkloadState,
objects::{CpuUsage, FreeMemory, WorkloadState},
std_extensions::{GracefulExitResult, IllegalStateResult},
to_server_interface::{ToServerInterface, ToServerSender},
};
Expand All @@ -25,6 +27,9 @@ use crate::workload_state::workload_state_store::WorkloadStateStore;
#[cfg_attr(test, mockall_double::double)]
use crate::runtime_manager::RuntimeManager;
use crate::workload_state::WorkloadStateReceiver;

const RESOURCE_MEASUREMENT_INTERVAL_TICK: std::time::Duration = tokio::time::Duration::from_secs(2);

// [impl->swdd~agent-shall-use-interfaces-to-server~1]
pub struct AgentManager {
agent_name: String,
Expand Down Expand Up @@ -56,6 +61,9 @@ impl AgentManager {

pub async fn start(&mut self) {
log::info!("Awaiting commands from the server ...");

let mut interval = tokio::time::interval(RESOURCE_MEASUREMENT_INTERVAL_TICK);

loop {
tokio::select! {
// [impl->swdd~agent-manager-listens-requests-from-server~1]
Expand All @@ -67,15 +75,18 @@ impl AgentManager {
if self.execute_from_server_command(from_server).await.is_none() {
break;
}
}
},
// [impl->swdd~agent-manager-receives-workload-states-of-its-workloads~1]
workload_state = self.workload_state_receiver.recv() => {
let workload_state = workload_state
.ok_or("Channel to listen to own workload states closed.".to_string())
.unwrap_or_exit("Abort");

self.store_and_forward_own_workload_states(workload_state).await;
}
// [impl->swdd~agent-sends-node-resource-availability-to-server~1]
_ = interval.tick() => {
self.measure_and_forward_resource_availability().await;
}
}
}
}
Expand Down Expand Up @@ -192,6 +203,36 @@ impl AgentManager {
.await
.unwrap_or_illegal_state();
}

// [impl->swdd~agent-sends-node-resource-availability-to-server~1]
async fn measure_and_forward_resource_availability(&mut self) {
let mut sys = System::new_with_specifics(
RefreshKind::new()
.with_cpu(CpuRefreshKind::everything())
.with_memory(MemoryRefreshKind::everything()),
);

sys.refresh_all();

let cpu_usage = sys.global_cpu_usage();
let free_memory = sys.free_memory();

log::trace!(
"Agent '{}' reports resource usage: CPU Usage: {}%, Free Memory: {}B",
self.agent_name,
cpu_usage,
free_memory,
);

self.to_server
.agent_load_status(AgentLoadStatus {
agent_name: self.agent_name.clone(),
cpu_usage: CpuUsage::new(cpu_usage),
free_memory: FreeMemory { free_memory },
})
.await
.unwrap_or_illegal_state();
}
}

//////////////////////////////////////////////////////////////////////////////
Expand All @@ -204,6 +245,8 @@ impl AgentManager {

#[cfg(test)]
mod tests {
use core::panic;

use super::RuntimeManager;
use crate::agent_manager::AgentManager;
use crate::workload_state::{
Expand Down Expand Up @@ -501,4 +544,49 @@ mod tests {
to_manager.stop().await.unwrap();
assert!(join!(handle).0.is_ok());
}

// [utest->swdd~agent-sends-node-resource-availability-to-server~1]
#[tokio::test]
async fn utest_agent_manager_sends_available_resources() {
let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC
.get_lock_async()
.await;

let mock_wl_state_store = MockWorkloadStateStore::default();
mock_parameter_storage_new_returns(mock_wl_state_store);

let (to_manager, manager_receiver) = channel(BUFFER_SIZE);
let (to_server, mut server_receiver) = channel(BUFFER_SIZE);
let (_workload_state_sender, workload_state_receiver) = channel(BUFFER_SIZE);
let mut mock_runtime_manager = RuntimeManager::default();
mock_runtime_manager.expect_handle_update_workload().never();
mock_runtime_manager.expect_forward_response().never();
mock_runtime_manager.expect_execute_workloads().never();
mock_runtime_manager.expect_handle_server_hello().never();
mock_runtime_manager
.expect_update_workloads_on_fulfilled_dependencies()
.never();

let mut agent_manager = AgentManager::new(
AGENT_NAME.to_string(),
manager_receiver,
mock_runtime_manager,
to_server,
workload_state_receiver,
);

let handle = tokio::spawn(async move { agent_manager.start().await });

let result = server_receiver.recv().await.unwrap();
if let ToServer::AgentLoadStatus(load_status) = result {
assert_eq!(load_status.agent_name, AGENT_NAME.to_string());
assert_ne!(load_status.cpu_usage.cpu_usage, 0);
assert_ne!(load_status.cpu_usage.cpu_usage, 0);
} else {
panic!("Expected AgentLoadStatus, got something else");
}

to_manager.stop().await.unwrap();
assert!(join!(handle).0.is_ok());
}
}
12 changes: 9 additions & 3 deletions agent/src/runtime_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ mod tests {
use crate::workload_state::WorkloadStateReceiver;
use ank_base::response::ResponseContent;
use common::objects::{
generate_test_control_interface_access,
self, generate_test_control_interface_access,
generate_test_workload_spec_with_control_interface_access,
generate_test_workload_spec_with_dependencies, generate_test_workload_spec_with_param,
AddCondition, WorkloadInstanceNameBuilder, WorkloadState,
Expand Down Expand Up @@ -1947,9 +1947,15 @@ mod tests {
});

complete_state.agents = Some(ank_base::AgentMap {
agents: HashMap::from([(AGENT_NAME.to_owned(), Default::default())]),
agents: HashMap::from([(
AGENT_NAME.to_owned(),
objects::AgentAttributes {
cpu_usage: Some(objects::CpuUsage { cpu_usage: 42 }),
free_memory: Some(objects::FreeMemory { free_memory: 42 }),
}
.into(),
)]),
});

let expected_response = ank_base::Response {
request_id,
response_content: Some(ResponseContent::CompleteState(complete_state)),
Expand Down
Loading

0 comments on commit 76d5773

Please sign in to comment.