diff --git a/Cargo.lock b/Cargo.lock index 2ffb1e939..d6ba9d590 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,6 +473,7 @@ dependencies = [ "common", "log", "regex", + "semver", "serde", "serde_yaml", "sha256", @@ -1717,6 +1718,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.208" diff --git a/agent/doc/swdesign/README.md b/agent/doc/swdesign/README.md index dafb63b79..6a30777a5 100644 --- a/agent/doc/swdesign/README.md +++ b/agent/doc/swdesign/README.md @@ -2816,6 +2816,26 @@ Needs: - impl - utest +#### Agent closes Control Interface channel on missing initial `Hello` +`swdd~agent-closes-control-interface-on-missing-initial-hello~1` + +Status: approved + +When an Ankaios agent receives an initial message on the Control Interface that is different to the initial `Hello` message containing the supported Ankaios version by the workload or the provided version in the message is not compatible with the one of the agent, the agent shall: +* close the Control Interface connection by sending a `ConnectionClosed` message +* discontinuing reading new messages from the workload. + +Comment: +The check for the supported by the agent version is done by a central function provided by the common library. + +Tags: +- ControlInterface + +Needs: +- impl +- utest +- stest + #### Agent converts from Control Interface proto request to internal object `swdd~agent-converts-control-interface-message-to-ankaios-object~1` diff --git a/agent/src/control_interface/control_interface_task.rs b/agent/src/control_interface/control_interface_task.rs index d9f03dd28..2d784e968 100644 --- a/agent/src/control_interface/control_interface_task.rs +++ b/agent/src/control_interface/control_interface_task.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use crate::control_interface::ToAnkaios; +use crate::control_interface::{to_ankaios, ToAnkaios}; #[cfg_attr(test, mockall_double::double)] use super::authorizer::Authorizer; @@ -22,6 +22,7 @@ use super::authorizer::Authorizer; use super::reopen_file::ReopenFile; use api::{ank_base, control_api}; use common::{ + check_version_compatibility, from_server_interface::{FromServer, FromServerReceiver}, to_server_interface::{ToServer, ToServerSender}, }; @@ -29,7 +30,10 @@ use common::{ use prost::Message; use tokio::{io, select, task::JoinHandle}; -fn decode_to_server(protobuf_data: io::Result>) -> io::Result { +const INITIAL_HELLO_MISSING_MSG: &str = "Initial Hello missing!"; +const PROTOBUF_DECODE_ERROR_MSG: &str = "Could not decode protobuf data"; + +fn decode_to_server(protobuf_data: io::Result>) -> io::Result { Ok(control_api::ToAnkaios::decode(&mut Box::new( protobuf_data?.as_ref(), ))?) @@ -63,7 +67,34 @@ impl ControlInterfaceTask { authorizer, } } + + // [impl->swdd~agent-closes-control-interface-on-missing-initial-hello~1] + async fn check_initial_hello(&mut self) -> Result<(), String> { + let to_ankaios = + decode_to_server(self.input_stream.read_protobuf_data().await).map_err(|err| { + log::debug!("{}: '{:?}'", PROTOBUF_DECODE_ERROR_MSG, err); + PROTOBUF_DECODE_ERROR_MSG.to_string() + })?; + match to_ankaios.try_into() { + Ok(ToAnkaios::Hello(to_ankaios::Hello { protocol_version })) => { + check_version_compatibility(protocol_version)? + } + unexpected => { + log::debug!("Expected initial Hello, received: '{unexpected:?}'."); + return Err(INITIAL_HELLO_MISSING_MSG.into()); + } + } + Ok(()) + } + pub async fn run(mut self) { + // [impl->swdd~agent-closes-control-interface-on-missing-initial-hello~1] + if let Err(message) = self.check_initial_hello().await { + log::warn!("{message}"); + let _ = self.send_connection_closed(message).await; + return; + } + loop { select! { // [impl->swdd~agent-ensures-control-interface-output-pipe-read~1] @@ -95,10 +126,17 @@ impl ControlInterfaceTask { }; let _ = self.forward_from_server(error).await; }; + }, + Ok(ToAnkaios::Hello(to_ankaios::Hello{protocol_version})) => { + log::warn!("Received yet another Hello with protocol version '{protocol_version}'"); + if let Err(message) = check_version_compatibility(protocol_version) { + log::warn!("{message}"); + let _ = self.send_connection_closed(message).await; + return; + } } Err(error) => { log::warn!("Could not convert protobuf in internal data structure: '{}'", error); - } } } else { @@ -118,10 +156,25 @@ impl ControlInterfaceTask { tokio::spawn(self.run()) } + async fn send_connection_closed(&mut self, reason: String) -> io::Result<()> { + use control_api::from_ankaios::FromAnkaiosEnum; + let message = control_api::FromAnkaios { + from_ankaios_enum: Some(FromAnkaiosEnum::ConnectionClosed( + control_api::ConnectionClosed { reason }, + )), + }; + + // [impl->swdd~agent-uses-length-delimited-protobuf-for-pipes~1] + let binary = message.encode_length_delimited_to_vec(); + self.output_stream.write_all(&binary).await?; + + Ok(()) + } + async fn forward_from_server(&mut self, response: ank_base::Response) -> io::Result<()> { use control_api::from_ankaios::FromAnkaiosEnum; let message = control_api::FromAnkaios { - from_ankaios_enum: Some(FromAnkaiosEnum::Response(response)), + from_ankaios_enum: Some(FromAnkaiosEnum::Response(Box::new(response))), }; // [impl->swdd~agent-uses-length-delimited-protobuf-for-pipes~1] @@ -160,10 +213,7 @@ pub fn generate_test_control_interface_task_mock() -> __mock_MockControlInterfac mod tests { use std::{io::Error, sync::Arc}; - use common::{ - commands::{self, CompleteStateRequest}, - to_server_interface::ToServer, - }; + use common::{commands, to_server_interface::ToServer}; use mockall::{predicate, Sequence}; use tokio::sync::mpsc; @@ -172,7 +222,40 @@ mod tests { use super::ControlInterfaceTask; - use crate::control_interface::{authorizer::MockAuthorizer, reopen_file::MockReopenFile}; + use crate::control_interface::{ + authorizer::MockAuthorizer, control_interface_task::INITIAL_HELLO_MISSING_MSG, + reopen_file::MockReopenFile, + }; + + const REQUEST_ID: &str = "req_id"; + + fn prepare_workload_hello_binary_message(version: impl Into) -> Vec { + let workload_hello = control_api::ToAnkaios { + to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Hello( + control_api::Hello { + protocol_version: version.into(), + }, + )), + }; + + workload_hello.encode_to_vec() + } + + fn prepare_request_complete_state_binary_message(field_mask: impl Into) -> Vec { + let ank_request = ank_base::Request { + request_id: REQUEST_ID.into(), + request_content: Some(ank_base::request::RequestContent::CompleteStateRequest( + ank_base::CompleteStateRequest { + field_mask: vec![field_mask.into()], + }, + )), + }; + let test_output_request = control_api::ToAnkaios { + to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(ank_request)), + }; + + test_output_request.encode_to_vec() + } #[tokio::test] async fn utest_control_interface_task_forward_from_server() { @@ -181,7 +264,7 @@ mod tests { .await; let response = ank_base::Response { - request_id: "req_id".to_owned(), + request_id: REQUEST_ID.into(), response_content: Some(ank_base::response::ResponseContent::CompleteState( Default::default(), )), @@ -189,7 +272,7 @@ mod tests { let test_command_binary = control_api::FromAnkaios { from_ankaios_enum: Some(control_api::from_ankaios::FromAnkaiosEnum::Response( - response.clone(), + Box::new(response.clone()), )), } .encode_length_delimited_to_vec(); @@ -232,7 +315,7 @@ mod tests { let test_output_request = control_api::ToAnkaios { to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request( ank_base::Request { - request_id: "req_id".to_owned(), + request_id: REQUEST_ID.into(), request_content: Some(ank_base::request::RequestContent::CompleteStateRequest( ank_base::CompleteStateRequest { field_mask: vec![] }, )), @@ -245,13 +328,19 @@ mod tests { let mut mockall_seq = Sequence::new(); let mut input_stream_mock = MockReopenFile::default(); - let mut x = [0; 12]; - x.clone_from_slice(&test_output_request_binary[..]); + + let workload_hello_binary = prepare_workload_hello_binary_message(common::ANKAIOS_VERSION); input_stream_mock .expect_read_protobuf_data() .once() .in_sequence(&mut mockall_seq) - .returning(move || Ok(Box::new(x))); + .return_once(move || Ok(workload_hello_binary)); + + input_stream_mock + .expect_read_protobuf_data() + .once() + .in_sequence(&mut mockall_seq) + .return_once(move || Ok(test_output_request_binary)); input_stream_mock .expect_read_protobuf_data() @@ -260,7 +349,7 @@ mod tests { .returning(move || Err(Error::new(std::io::ErrorKind::Other, "error"))); let error = ank_base::Response { - request_id: "req_id".to_owned(), + request_id: REQUEST_ID.into(), response_content: Some(ank_base::response::ResponseContent::Error( ank_base::Error { message: "Access denied".into(), @@ -270,7 +359,7 @@ mod tests { let test_input_command_binary = control_api::FromAnkaios { from_ankaios_enum: Some(control_api::from_ankaios::FromAnkaiosEnum::Response( - error.clone(), + Box::new(error.clone()), )), } .encode_length_delimited_to_vec(); @@ -305,15 +394,15 @@ mod tests { // [utest->swdd~agent-listens-for-requests-from-pipe~1] // [utest->swdd~agent-ensures-control-interface-output-pipe-read~1] // [utest->swdd~agent-forward-request-from-control-interface-pipe-to-server~1] + // [utest->swdd~agent-closes-control-interface-on-missing-initial-hello~1] #[tokio::test] async fn utest_control_interface_task_run_task_access_allowed() { let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC .get_lock_async() .await; - let request_id = "req_id"; let ank_request = ank_base::Request { - request_id: request_id.to_owned(), + request_id: REQUEST_ID.into(), request_content: Some(ank_base::request::RequestContent::CompleteStateRequest( ank_base::CompleteStateRequest { field_mask: vec!["desiredState.workloads.nginx".to_string()], @@ -321,7 +410,9 @@ mod tests { )), }; let test_output_request = control_api::ToAnkaios { - to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(ank_request)), + to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request( + ank_request.clone(), + )), }; let test_output_request_binary = test_output_request.encode_to_vec(); @@ -329,13 +420,19 @@ mod tests { let mut mockall_seq = Sequence::new(); let mut input_stream_mock = MockReopenFile::default(); - let mut x = [0; 42]; - x.clone_from_slice(&test_output_request_binary[..]); + + let workload_hello_binary = prepare_workload_hello_binary_message(common::ANKAIOS_VERSION); input_stream_mock .expect_read_protobuf_data() .once() .in_sequence(&mut mockall_seq) - .returning(move || Ok(Box::new(x))); + .return_once(move || Ok(workload_hello_binary)); + + input_stream_mock + .expect_read_protobuf_data() + .once() + .in_sequence(&mut mockall_seq) + .return_once(move || Ok(test_output_request_binary)); input_stream_mock .expect_read_protobuf_data() @@ -363,16 +460,120 @@ mod tests { control_interface_task.run().await; - let mut expected_request = commands::Request { - request_id: request_id.to_owned(), - request_content: commands::RequestContent::CompleteStateRequest(CompleteStateRequest { - field_mask: vec!["desiredState.workloads.nginx".to_string()], - }), - }; + let mut expected_request: commands::Request = ank_request.try_into().unwrap(); expected_request.prefix_request_id(request_id_prefix); assert_eq!( output_pipe_receiver.recv().await, Some(ToServer::Request(expected_request)) ); } + + // [utest->swdd~agent-closes-control-interface-on-missing-initial-hello~1] + #[tokio::test] + async fn utest_control_interface_task_run_task_no_hello() { + let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC + .get_lock_async() + .await; + + let test_output_request_binary = prepare_request_complete_state_binary_message(""); + + let mut mockall_seq = Sequence::new(); + let mut input_stream_mock = MockReopenFile::default(); + input_stream_mock + .expect_read_protobuf_data() + .once() + .in_sequence(&mut mockall_seq) + .return_once(move || Ok(test_output_request_binary)); + + let test_input_command_binary = control_api::FromAnkaios { + from_ankaios_enum: Some( + control_api::from_ankaios::FromAnkaiosEnum::ConnectionClosed( + control_api::ConnectionClosed { + reason: INITIAL_HELLO_MISSING_MSG.into(), + }, + ), + ), + } + .encode_length_delimited_to_vec(); + + let mut output_stream_mock = MockReopenFile::default(); + output_stream_mock + .expect_write_all() + .with(predicate::eq(test_input_command_binary)) + .once() + .returning(|_| Ok(())); + + let (_input_pipe_sender, input_pipe_receiver) = mpsc::channel(1); + let (output_pipe_sender, mut output_pipe_receiver) = mpsc::channel(1); + let request_id_prefix = "prefix@"; + + let authorizer = MockAuthorizer::default(); + + let control_interface_task = ControlInterfaceTask::new( + output_stream_mock, + input_stream_mock, + input_pipe_receiver, + output_pipe_sender, + request_id_prefix.to_owned(), + Arc::new(authorizer), + ); + + control_interface_task.run().await; + assert!(output_pipe_receiver.recv().await.is_none()); + } + + // [utest->swdd~agent-closes-control-interface-on-missing-initial-hello~1] + #[tokio::test] + async fn utest_control_interface_task_run_task_hello_unsupported_version() { + let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC + .get_lock_async() + .await; + + let mut mockall_seq = Sequence::new(); + let mut input_stream_mock = MockReopenFile::default(); + + let unsupported_version = "1999.1.0"; + let workload_hello_binary = prepare_workload_hello_binary_message(unsupported_version); + input_stream_mock + .expect_read_protobuf_data() + .once() + .in_sequence(&mut mockall_seq) + .return_once(move || Ok(workload_hello_binary)); + + let test_input_command_binary = control_api::FromAnkaios { + from_ankaios_enum: Some( + control_api::from_ankaios::FromAnkaiosEnum::ConnectionClosed( + control_api::ConnectionClosed { + reason: format!("Unsupported protocol version '{unsupported_version}'. Currently supported '{}'", common::ANKAIOS_VERSION), + }, + ), + ), + } + .encode_length_delimited_to_vec(); + + let mut output_stream_mock = MockReopenFile::default(); + output_stream_mock + .expect_write_all() + .with(predicate::eq(test_input_command_binary)) + .once() + .returning(|_| Ok(())); + + let (_input_pipe_sender, input_pipe_receiver) = mpsc::channel(1); + let (output_pipe_sender, mut output_pipe_receiver) = mpsc::channel(1); + let request_id_prefix = "prefix@"; + + let authorizer = MockAuthorizer::default(); + + let control_interface_task = ControlInterfaceTask::new( + output_stream_mock, + input_stream_mock, + input_pipe_receiver, + output_pipe_sender, + request_id_prefix.to_owned(), + Arc::new(authorizer), + ); + + control_interface_task.run().await; + assert!(output_pipe_receiver.recv().await.is_none()); + } } diff --git a/agent/src/control_interface/reopen_file.rs b/agent/src/control_interface/reopen_file.rs index 592bd8de2..41e0513a4 100644 --- a/agent/src/control_interface/reopen_file.rs +++ b/agent/src/control_interface/reopen_file.rs @@ -67,7 +67,7 @@ impl ReopenFile { tokio::spawn(async move { open_options.open(path).await }) } - pub async fn read_protobuf_data(&mut self) -> io::Result> { + pub async fn read_protobuf_data(&mut self) -> io::Result> { loop { let file = self.ensure_file().await?; match Self::try_read_protobuf_data(file).await { @@ -82,7 +82,7 @@ impl ReopenFile { } // [impl->swdd~agent-uses-length-delimited-protobuf-for-pipes~1] - async fn try_read_protobuf_data(file: &mut BufReader) -> Result, Error> { + async fn try_read_protobuf_data(file: &mut BufReader) -> Result, Error> { let varint_data = Self::try_read_varint_data(file).await?; let mut varint_data = Box::new(&varint_data[..]); @@ -90,7 +90,7 @@ impl ReopenFile { let mut buf = vec![0; size]; file.read_exact(&mut buf[..]).await?; - Ok(buf.into_boxed_slice()) + Ok(buf) } async fn try_read_varint_data( @@ -153,8 +153,8 @@ mockall::mock! { pub ReopenFile { pub fn open(path: &Path) -> Self; pub fn create(path: &Path) -> Self; - pub async fn read_protobuf_data(&mut self) -> io::Result>; - async fn try_read_protobuf_data(file: &mut BufReader) -> Result, Error>; + pub async fn read_protobuf_data(&mut self) -> io::Result>; + async fn try_read_protobuf_data(file: &mut BufReader) -> Result, Error>; pub async fn write_all(&mut self, buf: &[u8]) -> io::Result<()>; async fn try_write_all(&mut self, buf: &[u8]) -> io::Result<()>; } @@ -192,7 +192,7 @@ mod tests { let jh = tokio::spawn(async move { let mut f = super::ReopenFile::open(&fifo2); let data = f.read_protobuf_data().await.unwrap(); - assert_eq!(data, vec![17].into_boxed_slice()); + assert_eq!(data, vec![17]); }); let mut f = std::fs::File::create(&fifo).unwrap(); @@ -213,7 +213,7 @@ mod tests { let jh = tokio::spawn(async move { let mut f = super::ReopenFile::open(&fifo2); let data = f.read_protobuf_data().await.unwrap(); - assert_eq!(data, vec![17; 128].into_boxed_slice()); + assert_eq!(data, vec![17; 128]); }); let mut f = std::fs::File::create(&fifo).unwrap(); @@ -235,7 +235,7 @@ mod tests { let jh = tokio::spawn(async move { let mut f = super::ReopenFile::open(&fifo2); let data = f.read_protobuf_data().await.unwrap(); - assert_eq!(data, vec![17].into_boxed_slice()); + assert_eq!(data, vec![17]); }); { @@ -265,7 +265,7 @@ mod tests { let jh = tokio::spawn(async move { let mut f = super::ReopenFile::open(&fifo2); let data = f.read_protobuf_data().await.unwrap(); - assert_eq!(data, vec![17].into_boxed_slice()); + assert_eq!(data, vec![17]); }); { @@ -318,7 +318,7 @@ mod tests { let jh = tokio::spawn(async move { let mut f = super::ReopenFile::open(&fifo2); let data = f.read_protobuf_data().await.unwrap(); - assert_eq!(data, vec![].into_boxed_slice()); + assert_eq!(data, Vec::::new()); }); let mut f = std::fs::File::create(&fifo).unwrap(); diff --git a/agent/src/control_interface/to_ankaios.rs b/agent/src/control_interface/to_ankaios.rs index c8896a7af..7a2f883d4 100644 --- a/agent/src/control_interface/to_ankaios.rs +++ b/agent/src/control_interface/to_ankaios.rs @@ -19,6 +19,7 @@ use common::commands; #[derive(Debug, PartialEq, Eq, Clone)] pub enum ToAnkaios { Request(commands::Request), + Hello(Hello) } // [impl->swdd~agent-converts-control-interface-message-to-ankaios-object~1] @@ -33,10 +34,38 @@ impl TryFrom for ToAnkaios { Ok(match to_ankaios { ToAnkaiosEnum::Request(content) => ToAnkaios::Request(content.try_into()?), + ToAnkaiosEnum::Hello(content) => ToAnkaios::Hello(content.into()), }) } } +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Hello { + pub protocol_version: String, +} + +impl From for Hello { + fn from(item: control_api::Hello) -> Self { + Hello { + protocol_version: item.protocol_version, + } + } +} + +impl Hello { + pub fn new() -> Self { + Hello { + protocol_version: common::ANKAIOS_VERSION.into(), + } + } +} + +impl Default for Hello { + fn default() -> Self { + Self::new() + } +} + ////////////////////////////////////////////////////////////////////////////// // ######## ####### ######### ######### // // ## ## ## ## // diff --git a/agent/src/main.rs b/agent/src/main.rs index 1dc844145..73f5c8829 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -18,7 +18,6 @@ use common::to_server_interface::ToServer; use generic_polling_state_checker::GenericPollingStateChecker; use grpc::security::TLSConfig; use std::collections::HashMap; -use tokio::try_join; mod agent_manager; mod cli; @@ -35,7 +34,7 @@ mod workload_scheduler; mod workload_state; use common::from_server_interface::FromServer; -use common::std_extensions::{GracefulExitResult, IllegalStateResult, UnreachableResult}; +use common::std_extensions::GracefulExitResult; use grpc::client::GRPCCommunicationsClient; use agent_manager::AgentManager; @@ -119,12 +118,13 @@ async fn main() { // [impl->swdd~agent-fails-on-missing-file-paths-and-insecure-cli-arguments~1] let tls_config = TLSConfig::new(args.insecure, args.ca_pem, args.crt_pem, args.key_pem); - let communications_client = GRPCCommunicationsClient::new_agent_communication( + let mut communications_client = GRPCCommunicationsClient::new_agent_communication( args.agent_name.clone(), server_url, // [impl->swdd~agent-fails-on-missing-file-paths-and-insecure-cli-arguments~1] tls_config.unwrap_or_exit("Missing certificate file"), - ); + ) + .unwrap_or_exit("Failed to create communications client."); let mut agent_manager = AgentManager::new( args.agent_name, @@ -134,17 +134,14 @@ async fn main() { workload_state_receiver, ); - let manager_task = tokio::spawn(async move { agent_manager.start().await }); - // [impl->swdd~agent-sends-hello~1] - // [impl->swdd~agent-default-communication-grpc~1] - let communications_task = tokio::spawn(async move { - communications_client? - .run(server_receiver, to_manager.clone()) - .await - }); - - let (_, communication_task_result) = - try_join!(manager_task, communications_task).unwrap_or_illegal_state(); - - communication_task_result.unwrap_or_unreachable(); + tokio::select! { + // [impl->swdd~agent-sends-hello~1] + // [impl->swdd~agent-default-communication-grpc~1] + communication_result = communications_client.run(server_receiver, to_manager) => { + communication_result.unwrap_or_exit("agent error") + } + _agent_mgr_result = agent_manager.start() => { + log::info!("AgentManager exited."); + } + } } diff --git a/api/build.rs b/api/build.rs index 04127b860..0aef558ec 100644 --- a/api/build.rs +++ b/api/build.rs @@ -16,6 +16,7 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(true) .boxed("Request.RequestContent.updateStateRequest") + .boxed("FromAnkaios.FromAnkaiosEnum.response") .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") .type_attribute(".", "#[serde(rename_all = \"camelCase\")]") .type_attribute( diff --git a/api/proto/control_api.proto b/api/proto/control_api.proto index 8fd93e9fd..1b5226477 100644 --- a/api/proto/control_api.proto +++ b/api/proto/control_api.proto @@ -34,15 +34,32 @@ import "ank_base.proto"; */ message ToAnkaios { oneof ToAnkaiosEnum { - ank_base.Request request = 3; + Hello hello = 1; /// The fist message sent when a connection is established. The message is needed to make sure the connected components are compatible. + ank_base.Request request = 3; /// A request to Ankaios } } +/** +* This message is the first one that needs to be sent when a new connection to the Ankaios cluster is established. Without this message being sent all further request are rejected. +*/ +message Hello { + string protocolVersion = 2; /// The protocol version used by the calling component. +} + /** * Messages from the Ankaios server to e.g. the Ankaios agent. */ message FromAnkaios { oneof FromAnkaiosEnum { ank_base.Response response = 3; /// A message containing a response to a previous request. + ConnectionClosed connectionClosed = 5; /// A message sent by Ankaios to inform a workload that the connection to Anakios was closed. } } + +/** +* This message informs the user of the Control Interface that the connection was closed by Ankaios. +* No more messages will be processed by Ankaios after this message is sent. +*/ +message ConnectionClosed { + string reason = 1; /// A string containing the reason for closing the connection. +} diff --git a/common/Cargo.toml b/common/Cargo.toml index 30630ab41..7a32630d5 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9" log = "0.4" sha256 = "1.5" +semver = "1.0" regex = "1.10" [dev-dependencies] diff --git a/common/doc/swdesign/README.md b/common/doc/swdesign/README.md index ccd265f64..8c96b5b8c 100644 --- a/common/doc/swdesign/README.md +++ b/common/doc/swdesign/README.md @@ -406,7 +406,7 @@ Tags: Needs: - impl -### Helper methods +### Common Helpers Different helper methods used by other components of Ankaios. For example regarding error handling or testing. @@ -425,6 +425,22 @@ Tags: Needs: - impl +#### Provide common version checking functionality +`swdd~common-version-checking~1` + +Status: approved + +The Common library shall provide a common version checking functionality that fails if a provided version differs from the current major and minor one. + +Rationale: +The version checking is executed in different Ankaios components and must behave in the same way. The failure on a different minor version is required as Ankaios is currently at a 0 (zero) major version. + +Tags: +- CommonHelpers + +Needs: +- impl + ## Data view ## Error management view diff --git a/common/src/helpers.rs b/common/src/helpers.rs index c5f831f9d..0c3f907c4 100644 --- a/common/src/helpers.rs +++ b/common/src/helpers.rs @@ -14,6 +14,9 @@ use serde::{Serialize, Serializer}; use std::collections::{BTreeMap, HashMap}; +use crate::{std_extensions::IllegalStateResult, ANKAIOS_VERSION}; +use semver::Version; + // [impl->swdd~common-helper-methods~1] pub fn try_into_vec(input: Vec) -> Result, E> where @@ -32,3 +35,74 @@ where let ordered: BTreeMap<_, _> = value.iter().collect(); ordered.serialize(serializer) } + +// [impl->swdd~common-version-checking~1] +pub fn check_version_compatibility(version: impl AsRef) -> Result<(), String> { + let ank_version = Version::parse(ANKAIOS_VERSION).unwrap_or_illegal_state(); + if let Ok(input_version) = Version::parse(version.as_ref()) { + if ank_version.major == input_version.major && + // As we are at a 0 (zero) major version, we also require minor version equality + ank_version.minor == input_version.minor + { + return Ok(()); + } + } else { + log::warn!( + "Could not parse incoming string '{}' as semantic version.", + version.as_ref() + ); + } + + Err(format!( + "Unsupported protocol version '{}'. Currently supported '{ANKAIOS_VERSION}'", + version.as_ref() + )) +} + +////////////////////////////////////////////////////////////////////////////// +// ######## ####### ######### ######### // +// ## ## ## ## // +// ## ##### ######### ## // +// ## ## ## ## // +// ## ####### ######### ## // +////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use semver::Version; + + use crate::{check_version_compatibility, ANKAIOS_VERSION}; + + // [utest->swdd~common-version-checking~1] + #[test] + fn utest_version_compatibility_success() { + assert!(check_version_compatibility(ANKAIOS_VERSION).is_ok()) + } + + // [utest->swdd~common-version-checking~1] + #[test] + fn utest_version_compatibility_patch_diff_success() { + let mut version = Version::parse(ANKAIOS_VERSION).unwrap(); + version.patch = 199; + assert!(check_version_compatibility(version.to_string()).is_ok()) + } + + // [utest->swdd~common-version-checking~1] + #[test] + fn utest_version_compatibility_patch_major_error() { + let mut version = Version::parse(ANKAIOS_VERSION).unwrap(); + version.major = 199; + assert!(check_version_compatibility(version.to_string()).is_err()) + } + + // [utest->swdd~common-version-checking~1] + #[test] + fn utest_version_compatibility_patch_minor_error() { + let mut version = Version::parse(ANKAIOS_VERSION).unwrap(); + version.minor = 199; + // Currently we assert that the minor version is also equal as we are at a 0th major version. + // When a major version is released, we can update the test here and expect an Ok(). + assert_eq!(0, version.major); + assert!(check_version_compatibility(version.to_string()).is_err()) + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 6741aab86..4083e1021 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -16,6 +16,7 @@ pub const CHANNEL_CAPACITY: usize = 20; pub const DEFAULT_SOCKET_ADDRESS: &str = "127.0.0.1:25551"; pub const DEFAULT_SERVER_ADDRESS: &str = "http[s]://127.0.0.1:25551"; pub const PATH_SEPARATOR: char = '.'; +pub const ANKAIOS_VERSION: &str = env!("CARGO_PKG_VERSION"); pub mod commands; pub mod communications_client; @@ -23,6 +24,7 @@ pub mod communications_error; pub mod communications_server; pub mod from_server_interface; pub mod helpers; +pub use helpers::check_version_compatibility; pub mod objects; pub mod request_id_prepending; pub mod state_manipulation; diff --git a/doc/docs/usage/upgrading/v0_4_to_v0_5.md b/doc/docs/usage/upgrading/v0_4_to_v0_5.md new file mode 100644 index 000000000..22281c3fe --- /dev/null +++ b/doc/docs/usage/upgrading/v0_4_to_v0_5.md @@ -0,0 +1,29 @@ +# Upgrading v0.4 to v0.5 + +When upgrading from v0.4 to v0.5, the installation script simply needs to be ran again. However, due to some breaking changes, some manual adjustments are required for existing workloads using the control interface. + +## Initial `Hello` message for the Control Interface + +In order to ensure version compatibility and avoid undefined behavior resulting from version mismatch, a new obligatory `Hello` message was added to the Control Interface protocol. +The `Hello` must be sent by a workload communicating over the Control Interface at the start of the session as a first message. It is part of the `ToAnkaios` message and has the following format: + +```proto +message Hello { + string protocolVersion = 2; /// The protocol version used by the calling component. +} +``` + +Failing to sent the message before any other communication is done, or providing an unsupported version would result in a preliminary closing of the Control Interface session by Ankaios. +The required `protocolVersion` string is the current Ankaios release version. As Ankaios is currently in the initial development (no official major release), minor version differences are also handled as incompatible. After the official major release, only the major versions will be compared. + +To inform the workload of this, a `ConnectionClosed` is sent as part of the `FromAnkaios` message. The `ConnectionClosed` message contains the reason for closing the session as a string: + +```proto +message ConnectionClosed { + string reason = 1; /// A string containing the reason for closing the connection. +} +``` + +After the `ConnectionClosed`, no more messages would be read or sent by Ankaios on the input and output pipes. + +The Control Interface instance cannot be reopened, but a new instance would be created if the workload is restarted. diff --git a/examples/cpp_control_interface/Dockerfile b/examples/cpp_control_interface/Dockerfile index 819093780..92a02bdc3 100644 --- a/examples/cpp_control_interface/Dockerfile +++ b/examples/cpp_control_interface/Dockerfile @@ -18,5 +18,6 @@ FROM docker.io/library/alpine:3.18.4 RUN apk update && apk add --update-cache protobuf-dev COPY --from=compile /workspaces/app/build/main /usr/local/bin/control_interface_example RUN chmod +x /usr/local/bin/control_interface_example +ENV ANKAIOS_VERSION=0.5.0-pre ENTRYPOINT ["/usr/local/bin/control_interface_example"] diff --git a/examples/cpp_control_interface/src/main.cpp b/examples/cpp_control_interface/src/main.cpp index f24d4476e..d078bf220 100644 --- a/examples/cpp_control_interface/src/main.cpp +++ b/examples/cpp_control_interface/src/main.cpp @@ -27,6 +27,19 @@ namespace logging } } +/* Create a Hello message to initialize the session. */ +control_api::ToAnkaios createHelloMessage() +{ + const char* ankaios_version = std::getenv("ANKAIOS_VERSION"); + + control_api::Hello* hello{new control_api::Hello}; + hello->set_protocolversion(ankaios_version); + + control_api::ToAnkaios toAnkaios; + toAnkaios.set_allocated_hello(hello); + return toAnkaios; +} + /* Create the Request containing an UpdateStateRequest that contains the details for adding the new workload and the update mask to add only the new workload. */ @@ -126,7 +139,6 @@ void readFromControlInterface() another Request to request the workload states. */ void writeToControlInterface() { - const auto requestToAddNewWorkload = createRequestToAddNewWorkload(); const auto outputFifo = ANKAIOS_CONTROL_INTERFACE_BASE_PATH + "/output"; std::ofstream output{outputFifo, std::ios::app | std::ios::binary}; if (output.fail()) @@ -135,6 +147,14 @@ void writeToControlInterface() return; } + const auto hello = createHelloMessage(); + logging::log(std::cout, + "Sending initial Hello message:\n", + hello.DebugString()); + // write length-delimited protobuf message into output fifo to initialize the session + google::protobuf::util::SerializeDelimitedToOstream(hello, &output); + output.flush(); + const auto requestToAddNewWorkload = createRequestToAddNewWorkload(); logging::log(std::cout, "Sending Request containing details for adding the dynamic workload \"dynamic_nginx\":\n", "ToAnkaios {\n", diff --git a/examples/nodejs_control_interface/Dockerfile b/examples/nodejs_control_interface/Dockerfile index 3f3ad4953..3e825bdaa 100644 --- a/examples/nodejs_control_interface/Dockerfile +++ b/examples/nodejs_control_interface/Dockerfile @@ -9,7 +9,7 @@ RUN apk update && apk add --update-cache \ COPY api/proto/ank_base.proto api/proto/control_api.proto /usr/local/lib/ankaios/ COPY examples/nodejs_control_interface /workspaces/app WORKDIR /workspaces/app - RUN npm install +ENV ANKAIOS_VERSION=0.5.0-pre ENTRYPOINT ["node", "/workspaces/app/src/main.js"] diff --git a/examples/nodejs_control_interface/src/main.js b/examples/nodejs_control_interface/src/main.js index 7cbfbfd25..a53f48cf7 100644 --- a/examples/nodejs_control_interface/src/main.js +++ b/examples/nodejs_control_interface/src/main.js @@ -8,6 +8,26 @@ const REQUEST_ID = "dynamic_nginx@nodejs_control_interface" let ToAnkaios; let FromAnkaios; +function create_hello_message(root) { + /* Create a Hello message to initialize the session. */ + + ToAnkaios = root.lookupType("control_api.ToAnkaios"); + + let payload = { + hello: { + protocolVersion: process.env.ANKAIOS_VERSION, + } + } + + const errMsg = ToAnkaios.verify(payload); + if (errMsg) { + throw Error(errMsg); + } + + return ToAnkaios.create(payload); +} + + function create_request_to_add_new_workload(root) { /* Create the Request containing an UpdateStateRequest that contains the details for adding the new workload and @@ -114,6 +134,11 @@ async function main() { read_from_control_interface(root, decode_from_server_response_message); + // Send the initial Hello message to initialize the session + const hello = create_hello_message(root); + console.log(`[${new Date().toISOString()}] Sending initial Hello message:\n`, util.inspect(hello.toJSON(), { depth: null })); + write_to_control_interface(root, hello); + // Send request to add the new workload dynamic_nginx to Ankaios Server const message = create_request_to_add_new_workload(root); console.log(`[${new Date().toISOString()}] Sending Request containing details for adding the dynamic workload "dynamic_nginx":\nToAnkaios `, util.inspect(message.toJSON(), { depth: null })); diff --git a/examples/python_control_interface/Dockerfile b/examples/python_control_interface/Dockerfile index 94330235e..c7ba18dba 100644 --- a/examples/python_control_interface/Dockerfile +++ b/examples/python_control_interface/Dockerfile @@ -20,5 +20,6 @@ FROM base ENV PYTHONPATH="${PYTHONPATH}:/usr/local/lib/ankaios" COPY --from=dev /usr/local/lib/ankaios /usr/local/lib/ankaios COPY examples/python_control_interface /ankaios +ENV ANKAIOS_VERSION=0.5.0-pre ENTRYPOINT ["python3", "-u", "/ankaios/src/main.py"] diff --git a/examples/python_control_interface/src/main.py b/examples/python_control_interface/src/main.py index 02da66c54..ff97bf54d 100644 --- a/examples/python_control_interface/src/main.py +++ b/examples/python_control_interface/src/main.py @@ -5,6 +5,7 @@ import threading import time import logging +import os ANKAIOS_CONTROL_INTERFACE_BASE_PATH = "/run/ankaios/control_interface" WAITING_TIME_IN_SEC = 5 @@ -22,6 +23,14 @@ def create_logger(): logger = create_logger() +def create_hello_message(): + """Create a Hello message to initialize the session.""" + return control_api.ToAnkaios ( + hello=control_api.Hello( + protocolVersion=os.environ['ANKAIOS_VERSION'], + ), + ) + def create_request_to_add_new_workload(): """Create the Request containing an UpdateStateRequest that contains the details for adding the new workload and @@ -106,6 +115,14 @@ def write_to_control_interface(): """ with open(f"{ANKAIOS_CONTROL_INTERFACE_BASE_PATH}/output", "ab") as f: + hello_message = create_hello_message() + hello_message_len = hello_message.ByteSize() # Length of the msg + proto_hello_message = hello_message.SerializeToString() # Serialized proto msg + logger.info(f'Sending initial Hello message:\n{hello_message}') + f.write(_VarintBytes(hello_message_len)) # Send the byte length of the proto msg + f.write(proto_hello_message) # Send the proto msg itself + f.flush() + update_workload_request = create_request_to_add_new_workload() update_workload_request_byte_len = update_workload_request.ByteSize() # Length of the msg proto_update_workload_request_msg = update_workload_request.SerializeToString() # Serialized proto msg diff --git a/examples/rust_control_interface/Dockerfile b/examples/rust_control_interface/Dockerfile index 416779595..57eaf0af2 100644 --- a/examples/rust_control_interface/Dockerfile +++ b/examples/rust_control_interface/Dockerfile @@ -17,6 +17,7 @@ RUN curl --proto '=https' --tlsv1.2 -sS https://sh.rustup.rs | sh -s -- -y > /de COPY api ${WORKSPACE_DIR}/api COPY examples/rust_control_interface ${WORKSPACE_DIR} WORKDIR ${WORKSPACE_DIR} +ENV ANKAIOS_VERSION=0.5.0-pre RUN --mount=type=cache,target=${WORKSPACE_DIR}/target/release cargo build --release \ && cp ${WORKSPACE_DIR}/target/release/control_interface_example /usr/local/bin/ @@ -24,4 +25,5 @@ RUN --mount=type=cache,target=${WORKSPACE_DIR}/target/release cargo build --rele FROM docker.io/alpine:3.18.4 COPY --from=compile /usr/local/bin/control_interface_example /usr/local/bin/control_interface_example RUN chmod +x /usr/local/bin/control_interface_example + ENTRYPOINT ["/usr/local/bin/control_interface_example"] diff --git a/examples/rust_control_interface/src/main.rs b/examples/rust_control_interface/src/main.rs index 3c2378acc..3c2f38077 100644 --- a/examples/rust_control_interface/src/main.rs +++ b/examples/rust_control_interface/src/main.rs @@ -18,7 +18,7 @@ use api::ank_base::{ }; use api::control_api::{ - from_ankaios::FromAnkaiosEnum, to_ankaios::ToAnkaiosEnum, FromAnkaios, ToAnkaios, + from_ankaios::FromAnkaiosEnum, to_ankaios::ToAnkaiosEnum, FromAnkaios, ToAnkaios, Hello }; use prost::Message; @@ -47,6 +47,13 @@ mod logging { } } +/// Create a Hello message to initialize the session +fn create_hello_message() -> ToAnkaios { + ToAnkaios { + to_ankaios_enum: Some(ToAnkaiosEnum::Hello(Hello{ protocol_version: env!("ANKAIOS_VERSION").to_string() })), + } +} + /// Create the Request containing an UpdateStateRequest /// that contains the details for adding the new workload and /// the update mask to add only the new workload. @@ -191,6 +198,12 @@ fn write_to_control_interface() { exit(1); }); + let protobuf_hello_message = create_hello_message(); + logging::log(format!("Sending initial Hello message:\n{:#?}", protobuf_hello_message).as_str()); + sc_req + .write_all(&protobuf_hello_message.encode_length_delimited_to_vec()) + .unwrap(); + let protobuf_update_workload_request = create_request_to_add_new_workload(); logging::log(format!("Sending Request containing details for adding the dynamic workload \"dynamic_nginx\":\n{:#?}", protobuf_update_workload_request).as_str()); diff --git a/grpc/doc/swdesign/README.md b/grpc/doc/swdesign/README.md index 10833ccb8..432acf073 100644 --- a/grpc/doc/swdesign/README.md +++ b/grpc/doc/swdesign/README.md @@ -85,7 +85,11 @@ The Proxy functions also do conversion of the transferred objects to the appropr ### gRPC Agent Connection -One gRPC Agent Connection is created by the gRPC Server at startup. The gRPC Server then spawns a tonic gRPC service in a new green thread and all calls to the service are handled in tasks by the gRPCAgentConnection. +One gRPC Agent Connection is created by the gRPC Server at startup. The gRPC Server then spawns a tonic gRPC service in a new green thread and all calls to the service are handled in tasks by the gRPC Agent Connection. + +### gRPC Commander Connection + +One gRPC Commander Connection is created by the gRPC Server at startup. This connection is used by the Ankaios CLI `ank` or by third-party-applications to connect to the Ankaios server. The gRPC Server then spawns a tonic gRPC service in a new green thread and all calls to the service are handled in tasks by the gRPC Commander Connection. ## Behavioral view @@ -117,7 +121,7 @@ Status: approved Upon startup, the gRPC Server shall create a gRPC CLI Connection responsible for handling calls from the gRPC Client Tags: -- gRPC_CLI_Connection +- gRPC_Commander_Connection Needs: - impl @@ -252,6 +256,23 @@ Needs: - impl - itest +#### gRPC Client send supported version with first message +`swdd~grpc-client-sends-supported-version~1` + +Status: approved + +The gRPC Client shall send the Ankaios version it was built with in the first message to the gRPC Server. + +Comment: +The gRPC Client shall also handle the case where the connection is closed due to a version mismatch error. + +Tags: +- gRPC_Client + +Needs: +- impl +- itest + #### gRPC Agent Connection creates from server channel `swdd~grpc-agent-connection-creates-from-server-channel~1` @@ -266,6 +287,22 @@ Needs: - impl - itest +#### gRPC Agent Connection checks incoming connection version for compatibility +`swdd~grpc-agent-connection-checks-version-compatibility~1` + +Status: approved + +For each received connection request, the gRPC Agent Connection shall: +* check the received version for compatibility +* refuse the connection if the version is not provided or is not supported. + +Tags: +- gRPC_Agent_Connection + +Needs: +- impl +- itest + #### gRPC Agent Connection stores from server channel tx `swdd~grpc-agent-connection-stores-from-server-channel-tx~1` @@ -311,6 +348,64 @@ Needs: - impl - itest +#### gRPC Commander Connection creates from server channel +`swdd~grpc-commander-connection-creates-from-server-channel~1` + +Status: approved + +For each received connection request, the gRPC Commander Connection shall create a new FromServer Channel for this agent. + +Tags: +- gRPC_Commander_Connection + +Needs: +- impl +- itest + +#### gRPC Commander Connection checks incoming connection version for compatibility +`swdd~grpc-commander-connection-checks-version-compatibility~1` + +Status: approved + +For each received connection request, the gRPC Commander Connection shall: +* check the received version for compatibility +* refuse the connection if the version is not provided or is not supported. + +Tags: +- gRPC_Commander_Connection + +Needs: +- impl +- itest + +#### gRPC Commander Connection stores from server channel sender +`swdd~grpc-commander-connection-stores-from-server-channel-tx~1` + +Status: approved + +For each received connection request, the gRPC Commander Connection shall store the created FromServer Channel in the Commander Senders Map. + +Tags: +- gRPC_Commander_Connection + +Needs: +- impl +- itest + +#### gRPC Commander Connection responds to client with from server channel receiver +`swdd~grpc-commander-connection-responds-with-from-server-channel-rx~1` + +Status: approved + +The gRPC Commander Connection shall respond to the connection request of the gRPC Client with the receiving side of an FromServer Channel. + +Tags: +- gRPC_Commander_Connection + +Needs: +- impl +- itest + ### Secure mTLS and insecure communication This chapter describes how secure and insecure communication is handled by the gRPC library. @@ -527,6 +622,23 @@ Needs: - utest - itest +#### gRPC Commander Connection forwards ToServer messages to Ankaios Server +`swdd~grpc-commander-connection-forwards-commands-to-server~1` + +Status: approved + +When receiving ToServer messages from the gRPC Client, the gRPC Commander Connection shall forward these messages to the Ankaios Server. + +Comment: +The gRPC Commander Connection must also convert the commands from protobuf in order to forward them to the Ankaios Server. + +Tags: +- gRPC_Commander_Connection + +Needs: +- impl +- itest + ### Handling connection interruptions The following diagram shows how connection interruptions are handled by the gRPC Connection Middleware: diff --git a/grpc/proto/grpc_api.proto b/grpc/proto/grpc_api.proto index 0c71b2677..4975e9af2 100644 --- a/grpc/proto/grpc_api.proto +++ b/grpc/proto/grpc_api.proto @@ -44,11 +44,12 @@ service CliConnection { */ message ToServer { oneof ToServerEnum { - AgentHello agentHello = 1; /// This message is for internal usage only! + AgentHello agentHello = 1; /// This is the first message sent by an Ankaios agent when it connects to the cluster. UpdateWorkloadState updateWorkloadState = 2; /// A message to Ankaios server to update the execution state of a workload. ank_base.Request request = 3; Goodbye goodbye = 4; AgentLoadStatus AgentLoadStatus = 5; + CommanderHello commanderHello = 6; /// This is the first message sent by the ank CLI or a third-party command component connected directly to the Ankaios server. } } @@ -69,6 +70,7 @@ message FromServer { */ message AgentHello { string agentName = 1; /// A unique agent name. + string protocolVersion = 2; /// The protocol version used by the calling component. } /** @@ -80,6 +82,13 @@ message AgentLoadStatus { ank_base.FreeMemory free_memory = 3; /// The amount of free memory of the agent. } +/** +* A message to the Ankaios server to register a new CLI session or a third-party command component. +*/ +message CommanderHello { + string protocolVersion = 2; /// The protocol version used by the calling component. +} + /** * A message to the Ankaios server to signalize a client (agent or cli) is shutting down. */ diff --git a/grpc/src/client.rs b/grpc/src/client.rs index ac5cd9a90..ccf5a5706 100644 --- a/grpc/src/client.rs +++ b/grpc/src/client.rs @@ -14,7 +14,6 @@ use std::path::Path; -use crate::from_server_proxy; use crate::from_server_proxy::GRPCFromServerStreaming; use crate::grpc_api::{ self, agent_connection_client::AgentConnectionClient, @@ -23,6 +22,7 @@ use crate::grpc_api::{ use crate::grpc_middleware_error::GrpcMiddlewareError; use crate::security::{read_pem_file, TLSConfig}; use crate::to_server_proxy; +use crate::{from_server_proxy, CommanderHello}; use common::communications_client::CommunicationsClient; use common::communications_error::CommunicationMiddlewareError; @@ -118,6 +118,14 @@ impl CommunicationsClient for GRPCCommunicationsClient { loop { let result = self.run_internal(&mut server_rx, &agent_tx).await; + // Take care of general errors + if let Err(GrpcMiddlewareError::VersionMismatch(err)) = result { + return Err(CommunicationMiddlewareError(format!( + "Ankaios version mismatch: '{}'.", + err + ))); + } + match self.connection_type { ConnectionType::Agent => { log::warn!("Connection to server interrupted: '{:?}'", result); @@ -174,17 +182,22 @@ impl GRPCCommunicationsClient { let (grpc_tx, grpc_rx) = tokio::sync::mpsc::channel::(common::CHANNEL_CAPACITY); + // [impl->swdd~grpc-client-sends-supported-version~1] match self.connection_type { ConnectionType::Agent => { grpc_tx .send(grpc_api::ToServer { - to_server_enum: Some(ToServerEnum::AgentHello(AgentHello { - agent_name: self.name.to_owned(), - })), + to_server_enum: Some(ToServerEnum::AgentHello(AgentHello::new(&self.name))), + }) + .await?; + } + ConnectionType::Cli => { + grpc_tx + .send(grpc_api::ToServer { + to_server_enum: Some(ToServerEnum::CommanderHello(CommanderHello::new())), }) .await?; } - ConnectionType::Cli => (), //no need to send AgentHello for Cli connection } // [impl->swdd~grpc-client-connects-with-agent-hello~1] diff --git a/grpc/src/grpc_agent_connection.rs b/grpc/src/grpc_agent_connection.rs index cdcd6e18b..e591cea7b 100644 --- a/grpc/src/grpc_agent_connection.rs +++ b/grpc/src/grpc_agent_connection.rs @@ -14,6 +14,7 @@ use std::pin::Pin; +use common::check_version_compatibility; use common::std_extensions::GracefulExitResult; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; @@ -115,9 +116,17 @@ impl AgentConnection for GRPCAgentConnection { .to_server_enum .ok_or_else(invalid_argument_empty)? { - ToServerEnum::AgentHello(grpc_api::AgentHello { agent_name }) => { + ToServerEnum::AgentHello(grpc_api::AgentHello { + agent_name, + protocol_version, + }) => { log::trace!("Received a hello from '{}'", agent_name); + // [impl->swdd~grpc-agent-connection-checks-version-compatibility~1] + check_version_compatibility(&protocol_version).map_err(|err| { + log::warn!("Refused connection from agent '{agent_name}' due to unsupported version: '{protocol_version}'"); + Status::failed_precondition(err)})?; + if sans.is_empty() || sans.contains(&agent_name) || sans.contains(&String::from("*")) @@ -163,17 +172,13 @@ impl AgentConnection for GRPCAgentConnection { } }); } else { - let err_message = format!( + return Err(Status::unauthenticated(format!( "Agent name '{agent_name}' does not match SAN {:?} in agent certificates!", sans - ); - // log::error!(err_message); - return Err(Status::unauthenticated(err_message)); + ))); } } - _ => { - panic!("No AgentHello received."); - } + _ => Err::<(), &str>("No AgentHello received.").unwrap_or_exit("Protocol error."), } // [impl->swdd~grpc-agent-connection-responds-with-from-server-channel-rx~1] diff --git a/grpc/src/grpc_api.rs b/grpc/src/grpc_api.rs index 0d4ece497..55475950a 100644 --- a/grpc/src/grpc_api.rs +++ b/grpc/src/grpc_api.rs @@ -21,6 +21,15 @@ use std::collections::HashMap; // [impl->swdd~grpc-delegate-workflow-to-external-library~1] tonic::include_proto!("grpc_api"); // The string specified here must match the proto package name +impl AgentHello { + pub fn new(agent_name: impl Into) -> Self { + AgentHello { + agent_name: agent_name.into(), + protocol_version: common::ANKAIOS_VERSION.into(), + } + } +} + impl From for commands::AgentHello { fn from(item: AgentHello) -> Self { commands::AgentHello { @@ -29,6 +38,14 @@ impl From for commands::AgentHello { } } +impl CommanderHello { + pub fn new() -> Self { + CommanderHello { + protocol_version: common::ANKAIOS_VERSION.into(), + } + } +} + impl From for commands::AgentLoadStatus { fn from(item: AgentLoadStatus) -> Self { commands::AgentLoadStatus { @@ -197,6 +214,9 @@ impl TryFrom for to_server_interface::ToServer { let to_server = item.to_server_enum.ok_or("ToServer is None.".to_string())?; Ok(match to_server { + ToServerEnum::CommanderHello(_) => { + return Err("The 'CommanderHello' message cannot be forwarded to Ankaios.".into()); + } ToServerEnum::AgentHello(protobuf) => { to_server_interface::ToServer::AgentHello(protobuf.into()) } @@ -293,9 +313,9 @@ mod tests { let agent_name = "agent_A".to_string(); let proto_request = ToServer { - to_server_enum: Some(ToServerEnum::AgentHello(AgentHello { - agent_name: agent_name.clone(), - })), + to_server_enum: Some(ToServerEnum::AgentHello(AgentHello::new( + &agent_name, + ))), }; let ankaios_command = ankaios::ToServer::AgentHello(ankaios::AgentHello { agent_name }); diff --git a/grpc/src/grpc_cli_connection.rs b/grpc/src/grpc_cli_connection.rs index 4a5d412d1..945512f08 100644 --- a/grpc/src/grpc_cli_connection.rs +++ b/grpc/src/grpc_cli_connection.rs @@ -14,7 +14,8 @@ use std::pin::Pin; -use common::to_server_interface; +use common::std_extensions::GracefulExitResult; +use common::{check_version_compatibility, to_server_interface}; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; @@ -22,6 +23,7 @@ use tonic::codegen::futures_core::Stream; use tonic::{Request, Response, Status}; use crate::agent_senders_map::AgentSendersMap; +use crate::to_server::ToServerEnum; use crate::to_server_proxy::{forward_from_proto_to_ankaios, GRPCToServerStreaming}; use grpc_api::cli_connection_server::CliConnection; @@ -55,8 +57,9 @@ impl CliConnection for GRPCCliConnection { &self, request: Request>, ) -> Result, Status> { - let stream = request.into_inner(); + let mut stream = request.into_inner(); + // [impl->swdd~grpc-commander-connection-creates-from-server-channel~1] let (new_sender, new_receiver) = tokio::sync::mpsc::channel::< Result, >(common::CHANNEL_CAPACITY); @@ -66,29 +69,52 @@ impl CliConnection for GRPCCliConnection { let ankaios_tx = self.to_ankaios_server.clone(); let cli_senders = self.cli_senders.clone(); - self.cli_senders.insert(&cli_connection_name, new_sender); - let _x = tokio::spawn(async move { - let mut stream = GRPCToServerStreaming::new(stream); - let result = forward_from_proto_to_ankaios( - cli_connection_name.clone(), - &mut stream, - ankaios_tx.clone(), - ) - .await; - if result.is_err() { - log::debug!( - "Connection to CLI (name={}) failed with {:?}.", - cli_connection_name, - result - ); + + // The first_message must be a commander hello + match stream + .message() + .await? + .ok_or(Status::invalid_argument("Empty"))? + .to_server_enum + .ok_or(Status::invalid_argument("Empty"))? + { + ToServerEnum::CommanderHello(grpc_api::CommanderHello { protocol_version }) => { + log::trace!("Received a hello from a cli/commander application."); + + // [impl->swdd~grpc-commander-connection-checks-version-compatibility~1] + check_version_compatibility(&protocol_version).map_err(|err| { + log::warn!("Refused cli/commander connection due to unsupported version: '{protocol_version}'"); + Status::failed_precondition(err)})?; + + // [impl->swdd~grpc-commander-connection-stores-from-server-channel-tx~1] + self.cli_senders.insert(&cli_connection_name, new_sender); + // [impl->swdd~grpc-commander-connection-forwards-commands-to-server~1] + let _x = tokio::spawn(async move { + let mut stream = GRPCToServerStreaming::new(stream); + let result = forward_from_proto_to_ankaios( + cli_connection_name.clone(), + &mut stream, + ankaios_tx.clone(), + ) + .await; + if result.is_err() { + log::debug!( + "Connection to CLI (name={}) failed with {:?}.", + cli_connection_name, + result + ); + } + cli_senders.remove(&cli_connection_name); + log::debug!( + "Connection to CLI (name={}) has been closed.", + cli_connection_name + ); + }); } - cli_senders.remove(&cli_connection_name); - log::debug!( - "Connection to CLI (name={}) has been closed.", - cli_connection_name - ); - }); + _ => Err::<(), &str>("No CommanderHello received.").unwrap_or_exit("Protocol error."), + } + // [impl->swdd~grpc-commander-connection-responds-with-from-server-channel-rx~1] Ok(Response::new(Box::pin(ReceiverStream::new(new_receiver)))) } } diff --git a/grpc/src/grpc_middleware_error.rs b/grpc/src/grpc_middleware_error.rs index 5a44dd7c5..c46c919a4 100644 --- a/grpc/src/grpc_middleware_error.rs +++ b/grpc/src/grpc_middleware_error.rs @@ -30,6 +30,7 @@ pub enum GrpcMiddlewareError { ConnectionInterrupted(String), CertificateError(String), TLSError(String), + VersionMismatch(String), } impl From for CommunicationMiddlewareError { @@ -64,7 +65,12 @@ impl From>> for GrpcMiddlewareError impl From for GrpcMiddlewareError { fn from(err: tonic::Status) -> Self { - GrpcMiddlewareError::ConnectionInterrupted(err.to_string()) + match err.code() { + tonic::Code::FailedPrecondition => { + GrpcMiddlewareError::VersionMismatch(err.to_string()) + } + _ => GrpcMiddlewareError::ConnectionInterrupted(err.to_string()), + } } } @@ -97,6 +103,9 @@ impl fmt::Display for GrpcMiddlewareError { GrpcMiddlewareError::TLSError(message) => { write!(f, "TLS error: '{message}'") } + GrpcMiddlewareError::VersionMismatch(message) => { + write!(f, "Version mismatch: '{message}'") + } } } } diff --git a/tests/resources/ankaios.resource b/tests/resources/ankaios.resource index 2d7c03131..6c091c7e7 100644 --- a/tests/resources/ankaios.resource +++ b/tests/resources/ankaios.resource @@ -30,12 +30,18 @@ ${CURRENT_RESULT} ${EMPTY} ${SERVER_PROCESS_HANDLE} ${EMPTY} ${TEST_FOLDER_NAME} ${EMPTY} &{ANKAIOS_INSTANCE_NAME_TO_PODMAN_ID_MAPPING} +${ANKAIOS_VERSION} ${EMPTY} *** Keywords *** Get MTLS Enabled ${result}= Get Environment Variable name=MTLS_ENABLED default=False RETURN ${result} +Retrieve current Ankaios version + ${run_result}= Run Process command=%{ANK_BIN_DIR}${/}ank-server --version shell=True + ${ANKAIOS_VERSION}= Evaluate '${run_result.stdout}'.split(' ')[1] + Set Global Variable ${ANKAIOS_VERSION} + Setup Ankaios without MTLS Setup ${run_result}= Run Process command=mktemp -d shell=True Set Environment Variable name=ANKAIOS_TEMP value=${run_result.stdout} @@ -512,21 +518,24 @@ the last command shall list the connected agent "${agent_name}" Setup Ankaios for Control Interface test Setup Ankaios Prepare Test Control Interface Workload + Retrieve current Ankaios version The controller workload has no access rights - empty_keyword - -The controller workload is allowed to ${operation} - Control interface allows to ${operation} on ${EMPTY} + The controller workload sends initial hello with correct version The controller workload is allowed to ${operation} on ${filter_mask} internal_allow_control_interface ${operation} ${filter_mask} - -The controller workload is forbidden to ${operation} - Control interface denies to ${operation} on ${EMPTY} + The controller workload sends initial hello with correct version The controller workload is forbidden to ${operation} on ${filter_mask} internal_deny_control_interface ${operation} ${filter_mask} + The controller workload sends initial hello with correct version + +The controller workload does not send hello + internal_allow_control_interface read ${EMPTY} + +The controller workload sends initial hello with correct version + internal_send_initial_hello ${ANKAIOS_VERSION} The controller workload updates the state with manifest "${manifest}" The controller workload updates the state with manifest "${manifest}" and update mask ${EMPTY} @@ -554,6 +563,13 @@ The controller workload requests shall all fail And the workload "controller" shall have the execution state "Succeeded(Ok)" on agent "agent_A" within "20" seconds internal_check_all_control_interface_requests_failed ${ankaios_config_folder.name} +The controller workload shall get a closed connection + ${ankaios_config_folder}= create_control_interface_config_for_test + Ankaios server is started with config "${ankaios_config_folder.name}${/}startup_config.yaml" + Ankaios agent is started with name "agent_A" + And the workload "controller" shall have the execution state "Succeeded(Ok)" on agent "agent_A" within "20" seconds + internal_check_control_interface_closed ${ankaios_config_folder.name} + The controller workload has no access to Control Interface ${ankaios_config_folder}= create_control_interface_config_for_test Ankaios server is started with config "${ankaios_config_folder.name}${/}startup_config.yaml" diff --git a/tests/resources/ankaios_library.py b/tests/resources/ankaios_library.py index 5ad2b6090..3e6f88421 100644 --- a/tests/resources/ankaios_library.py +++ b/tests/resources/ankaios_library.py @@ -243,7 +243,7 @@ def json_to_dict(raw): def find_control_interface_test_tag(): global control_interface_tester_tag - control_interface_tester_tag = "manual-build-2" + control_interface_tester_tag = "manual-build-3" def prepare_test_control_interface_workload(): global control_interface_workload_config @@ -298,6 +298,15 @@ def internal_add_update_state_command(manifest, update_mask): } }) +def internal_send_initial_hello(version): + global control_interface_workload_config + control_interface_workload_config.append({ + "command": { + "type": "SendHello", + "version": version + } + }) + def internal_add_to_manifest_list(manifest_file): global next_manifest_number global manifest_files_location @@ -351,15 +360,21 @@ def internal_check_all_control_interface_requests_succeeded(tmp_folder): def internal_check_all_control_interface_requests_failed(tmp_folder): output = read_yaml(path.join(tmp_folder, "output.yaml")) for test_number,test_result in enumerate(output): - test_result = test_result["result"]["value"]["type"] != "Ok" - assert test_result, \ - f"Expected request {test_number + 1} to fail, but it succeeded" + if test_result["result"]["type"] != "SendHelloResult": + test_result = test_result["result"]["value"]["type"] != "Ok" + assert test_result, \ + f"Expected request {test_number + 1} to fail, but it succeeded" def internal_check_no_access_to_control_interface(tmp_folder): output = read_yaml(path.join(tmp_folder, "output.yaml")) - for _, test_result in enumerate(output): + for test_result in output: assert test_result["result"]["type"] == "NoApi", "Expect type is different to NoApi" +def internal_check_control_interface_closed(tmp_folder): + output = read_yaml(path.join(tmp_folder, "output.yaml")) + for test_result in output: + assert test_result["result"]["type"] == "ConnectionClosed", "Expect type is different to ConnectionClosed" + def empty_keyword(): pass diff --git a/tests/resources/configs/simple_with_control.yaml b/tests/resources/configs/simple_with_control.yaml new file mode 100644 index 000000000..ba8b6e655 --- /dev/null +++ b/tests/resources/configs/simple_with_control.yaml @@ -0,0 +1,21 @@ +apiVersion: v0.1 +workloads: + simple: + runtime: podman + restartPolicy: NEVER + agent: agent_A + runtimeConfig: | + image: ghcr.io/eclipse-ankaios/tests/alpine:latest + commandArgs: [ "echo", "Hello Ankaios"] + controlInterfaceAccess: + allowRules: + - type: StateRule + operation: Write + filterMask: + # workload has write access to the config of the workload dynamic_nginx + - "desiredState.workloads.dynamic_nginx" + - type: StateRule + operation: Read + filterMask: + # workload is allowed to read the workload state of the workload dynamic_nginx + - "workloadStates.agent_A.dynamic_nginx" diff --git a/tests/resources/image/Cargo.toml b/tests/resources/image/Cargo.toml index 894d71ee9..12e531532 100644 --- a/tests/resources/image/Cargo.toml +++ b/tests/resources/image/Cargo.toml @@ -1,5 +1,4 @@ [package] -# name = "control_interface_example" name = "control_interface_tester" edition = "2021" license = "Apache-2.0" diff --git a/tests/resources/image/README.md b/tests/resources/image/README.md new file mode 100644 index 000000000..200939eb1 --- /dev/null +++ b/tests/resources/image/README.md @@ -0,0 +1,28 @@ +# Control Interface Tester workload + +The Control Interface Tester workload is used in system tests to verify the correct execution of the Ankaios Control Interface. +The workload reads commands from a file, executes them and writes an output file containing the results of the commands. + +If no Control Interface instance was provided to the workload, a `NoAPI` result is written. If the Control Interface was preliminary closed by Ankaios, e.g., due to a protocol error, a `ConnectionClosed` result is provided. + +## Building and pushing a new image + +It is planned to automate the process of building and pushing a new version of the container, but for now the process is done manually. + +To build a new image run the following command from the project root folder: + +```bash +podman build -t ghcr.io/eclipse-ankaios/control_interface_tester:manual-build- . -f tests/resources/image/Dockerfile +``` + +To push the new image to GitHub container registry, you will need to generate an access token that is allowed to write packages and login to `ghcr.io`: + +```bash +podman login ghcr.io +``` + +Afterwards the new image can be published with: + +```bash +podman push ghcr.io/eclipse-ankaios/control_interface_tester:manual-build- +``` diff --git a/tests/resources/image/src/main.rs b/tests/resources/image/src/main.rs index 7601c32d9..5713feeab 100644 --- a/tests/resources/image/src/main.rs +++ b/tests/resources/image/src/main.rs @@ -17,7 +17,6 @@ use api::ank_base::{State, UpdateStateRequest}; use api::control_api::{from_ankaios::FromAnkaiosEnum, FromAnkaios}; -use common::from_server_interface::FromServer; use prost::Message; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -48,11 +47,22 @@ struct Command { command: CommandEnum, } +enum CommandError { + ConnectionClosed(String), + GenericError(String), +} + #[derive(Deserialize)] #[serde(tag = "type")] enum CommandEnum { UpdateState(UpdateState), GetState(GetState), + SendHello(Version), +} + +#[derive(Deserialize)] +struct Version { + version: String, } #[derive(Deserialize)] @@ -77,6 +87,8 @@ enum TestResultEnum { UpdateStateResult(TagSerializedResult), GetStateResult(TagSerializedResult>), NoApi, + SendHelloResult(TagSerializedResult<()>), + ConnectionClosed, } #[derive(Serialize)] @@ -129,34 +141,45 @@ fn main() { .map(|x| connection.handle_command(x)) .collect::, _>>() } else { - commands - .into_iter() - .map(|_| { - Ok(TestResult { - result: TestResultEnum::NoApi, - }) - }) - .collect::, _>>() + Ok(vec![TestResult { + result: TestResultEnum::NoApi, + }]) }; match result { Ok(result) => { - let output_file = File::create(output_path).unwrap_or_else(|err| { - logging::log(&format!("Could not open output file: '{}'", err)); - exit(1); - }); - serde_json::to_writer(output_file, &result).unwrap_or_else(|err| { - logging::log(&format!("Could write to open output file: '{}'", err)); - exit(1); - }); + write_result(output_path, result); + } + Err(CommandError::ConnectionClosed(err)) => { + logging::log(&format!( + "Connection to Ankaios server was closed: '{}'", + err + )); + write_result( + output_path, + vec![TestResult { + result: TestResultEnum::ConnectionClosed, + }], + ); } - Err(err) => { + Err(CommandError::GenericError(err)) => { logging::log(&format!("Failed during test execution: {}", err)); - exit(1); + exit(3); } } } +fn write_result(output_path: String, result: Vec) { + let output_file = File::create(output_path).unwrap_or_else(|err| { + logging::log(&format!("Could not open output file: '{}'", err)); + exit(4); + }); + serde_json::to_writer(output_file, &result).unwrap_or_else(|err| { + logging::log(&format!("Could not write to open output file: '{}'", err)); + exit(5); + }); +} + struct Connection { id_counter: i32, output: File, @@ -193,30 +216,43 @@ impl Connection { }) } - fn handle_command(&mut self, command: Command) -> Result { + fn handle_command(&mut self, command: Command) -> Result { Ok(TestResult { result: match command.command { CommandEnum::UpdateState(update_state_command) => { - TestResultEnum::UpdateStateResult( - self.handle_update_state_command(update_state_command) - .into(), - ) + self.handle_update_state_command(update_state_command)? + } + CommandEnum::GetState(get_state_command) => { + self.handle_get_state_command(get_state_command)? } - CommandEnum::GetState(get_state_command) => TestResultEnum::GetStateResult( - self.handle_get_state_command(get_state_command).into(), - ), + CommandEnum::SendHello(Version { version }) => self.send_hello(version)?, }, }) } + fn send_hello(&mut self, protocol_version: String) -> Result { + let proto = api::control_api::ToAnkaios { + to_ankaios_enum: Some(api::control_api::to_ankaios::ToAnkaiosEnum::Hello( + api::control_api::Hello { protocol_version }, + )), + }; + + Ok(TestResultEnum::SendHelloResult(TagSerializedResult::Ok( + self.output + .write_all(&proto.encode_length_delimited_to_vec()) + .map_err(|err| CommandError::GenericError(err.to_string()))?, + ))) + } + pub fn handle_update_state_command( &mut self, update_state_command: UpdateState, - ) -> Result { + ) -> Result { let request_id = self.get_next_id(); let state: common::objects::CompleteState = - read_yaml_file(Path::new(&update_state_command.manifest_file))?; + read_yaml_file(Path::new(&update_state_command.manifest_file)) + .map_err(CommandError::GenericError)?; let request = common::commands::Request { request_id: request_id.clone(), @@ -225,7 +261,8 @@ impl Connection { new_state: Some(state.into()), update_mask: update_state_command.update_mask, } - .try_into()?, + .try_into() + .map_err(CommandError::GenericError)?, )), }; @@ -240,22 +277,25 @@ impl Connection { .unwrap(); let response = self.wait_for_response(request_id)?; - let ResponseContent::UpdateStateSuccess(response) = response else { - return Err(format!( - "Received wrong response type, expected UpdateStateSuccess: '{:?}'", - response - )); - }; - Ok(UpdateStateResult { - added_workloads: response.added_workloads, - deleted_workloads: response.deleted_workloads, - }) + + Ok(TestResultEnum::UpdateStateResult(match response { + ResponseContent::UpdateStateSuccess(response) => { + TagSerializedResult::Ok(UpdateStateResult { + added_workloads: response.added_workloads, + deleted_workloads: response.deleted_workloads, + }) + } + response_content => TagSerializedResult::Err(format!( + "Received wrong response type. Expected UpdateStateSuccess, received: '{:?}'", + response_content + )), + })) } pub fn handle_get_state_command( &mut self, get_state_command: GetState, - ) -> Result, String> { + ) -> Result { let request_id = self.get_next_id(); let request = common::commands::Request { @@ -279,52 +319,59 @@ impl Connection { let response = self.wait_for_response(request_id)?; - let ResponseContent::CompleteState(response) = response else { - return Err(format!( - "Received wrong response type, expected CompleteState: '{:?}'", - response - )); - }; - Ok(response.desired_state) + Ok(TestResultEnum::GetStateResult(match response { + ResponseContent::CompleteState(complete_state) => { + TagSerializedResult::Ok(complete_state.desired_state) + } + response_content => TagSerializedResult::Err(format!( + "Received wrong response type. Expected CompleteState, received: '{:?}'", + response_content + )), + })) } - fn wait_for_response(&mut self, request_id: String) -> Result { + fn wait_for_response( + &mut self, + target_request_id: String, + ) -> Result { loop { - let message = self.read_message()?; - let FromServer::Response(response) = message else { - logging::log(&format!("Received message: '{:?}'", message)); - continue; - }; - if response.request_id == request_id { - let Some(response_content) = response.response_content else { - return Err(format!( - "Received Response with correct request_id, but without content: '{:?}'", - response - )); - }; - return Ok(response_content); - } else { - logging::log(&format!( - "Received unexpected response for request {:}", - response.request_id - )); + let message = self.read_message().map_err(CommandError::GenericError)?; + + match message { + FromAnkaiosEnum::Response(response) => { + if response.request_id.eq(&target_request_id) { + if let Some(response_content) = response.response_content { + return Ok(response_content); + } else { + return Err(CommandError::GenericError(format!( + "Received Response with correct request_id, but without content. Request Id: '{:?}'", + response.request_id + ))); + } + } else { + logging::log(&format!( + "Received unexpected response for request {:}", + response.request_id + )); + } + } + FromAnkaiosEnum::ConnectionClosed(_) => { + return Err(CommandError::ConnectionClosed( + "Control Interface connection closed by Ankaios.".into(), + )) + } } } } - fn read_message(&mut self) -> Result { + fn read_message(&mut self) -> Result { let binary = self .read_protobuf_data() .map_err(|err| format!("Failed to read message from input stream: '{}'", err))?; - let from_ankaios = FromAnkaios::decode(&mut Box::new(binary.as_ref())) + FromAnkaios::decode(&mut Box::new(binary.as_ref())) .map_err(|err| format!("Could not decode proto received from input: '{}'", err))? .from_ankaios_enum - .ok_or_else(|| "The field FromAnkaiosEnum not set".to_string())?; - Ok(match from_ankaios { - FromAnkaiosEnum::Response(response) => { - common::from_server_interface::FromServer::Response(response) - } - }) + .ok_or_else(|| "The field FromAnkaiosEnum is not set".to_string()) } fn read_protobuf_data(&mut self) -> Result, io::Error> { diff --git a/tests/stests/control_interface/control_interface.robot b/tests/stests/control_interface/control_interface.robot index bbdb6923b..2ad0e47b2 100644 --- a/tests/stests/control_interface/control_interface.robot +++ b/tests/stests/control_interface/control_interface.robot @@ -29,7 +29,7 @@ Test Ankaios workload successful start-up without a Control Interface access [Setup] Run Keywords Setup Ankaios # Preconditions - Given Ankaios server is started with config "${CONFIGS_DIR}/default.yaml" + Given Ankaios server is started with config "${CONFIGS_DIR}/simple.yaml" And Ankaios agent is started with name "${agent_name}" And all workloads of agent "{agent_name}" have an initial execution state # Actions @@ -42,11 +42,11 @@ Test Ankaios workload restart after update without a Control Interface access [Setup] Run Keywords Setup Ankaios # Preconditions - Given Ankaios server is started with config "${CONFIGS_DIR}/startConfig.yaml" + Given Ankaios server is started with config "${CONFIGS_DIR}/simple_with_control.yaml" And Ankaios agent is started with name "${agent_name}" And all workloads of agent "{agent_name}" have an initial execution state # Actions - When user triggers "ank -k apply ${CONFIGS_DIR}/default.yaml" + When user triggers "ank -k apply ${CONFIGS_DIR}/simple.yaml" # Asserts Then the mount point has not been generated for ${agent_name} [Teardown] Clean up Ankaios @@ -56,12 +56,20 @@ Test Ankaios workload restart after update with a Control Interface access [Setup] Run Keywords Setup Ankaios # Preconditions - Given Ankaios server is started with config "${CONFIGS_DIR}/default.yaml" + Given Ankaios server is started with config "${CONFIGS_DIR}/simple.yaml" And Ankaios agent is started with name "${agent_name}" And all workloads of agent "{agent_name}" have an initial execution state And the mount point has not been generated for ${agent_name} # Actions - When user triggers "ank apply ${CONFIGS_DIR}/startConfig.yaml" + When user triggers "ank apply ${CONFIGS_DIR}/simple_with_control.yaml" # Asserts Then the mount point has been generated for ${agent_name} [Teardown] Clean up Ankaios + +# [stest->swdd~agent-closes-control-interface-on-missing-initial-hello~1] +Test Control Interface closes connection when initial hello missing + [Setup] Run Keywords Setup Ankaios for Control Interface test + Given the controller workload does not send hello + And the controller workload gets the state + Then The controller workload shall get a closed connection + [Teardown] Clean up Ankaios diff --git a/tools/update_version.sh b/tools/update_version.sh index 27b2e8ad8..5bb2daa02 100755 --- a/tools/update_version.sh +++ b/tools/update_version.sh @@ -78,5 +78,10 @@ if [ "$release" = "1" ]; then done fi - - +# Update the version of the examples +examples=$(find "$base_dir/examples" -type d -name \*_control_interface -printf "%f\n") +for example in $examples; do + dockerfile="$base_dir/examples/$example/Dockerfile" + log_update "$dockerfile" + sed -i "s/^ENV ANKAIOS_VERSION=.*/ENV ANKAIOS_VERSION=${version}/" "$dockerfile" +done