Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API version checks to all connections to Ankaios #380

Merged
merged 28 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cf29678
Add API versions checks to all connections to Ankaios
krucod3 Sep 18, 2024
074e8b7
Stabilize stests
krucod3 Sep 25, 2024
bd21c09
Adding API checks for the Control Interface.
krucod3 Sep 25, 2024
8c889ef
Merge remote-tracking branch 'origin/main' into 14_api_version_checks…
krucod3 Sep 26, 2024
4c15245
Update preliminary expected Ankaios version
krucod3 Sep 26, 2024
8f499ac
Merge branch 'main' into 14_api_version_checks_for_connections
krucod3 Sep 26, 2024
0180a5a
Remove the unnecessary Box<[u8]> & fixed utests
krucod3 Sep 26, 2024
d5f5c51
Merge branch '14_api_version_checks_for_connections' of github.com:ec…
krucod3 Sep 26, 2024
e58336a
Update Cargo.lock
krucod3 Sep 26, 2024
8315774
Add unit tests for the Control Interface API version checks
krucod3 Sep 26, 2024
50a84fd
Improve stests
krucod3 Sep 27, 2024
ff54d8f
Make stest work in CI/CD too
krucod3 Sep 27, 2024
ff82926
Add stest for connection closed without control interface hello
krucod3 Sep 30, 2024
059fcfa
API version check: requirements & tracing
krucod3 Sep 30, 2024
e64b524
Add Control Interface Tester README
krucod3 Oct 1, 2024
6ffcfa3
Merge remote-tracking branch 'origin/main' into 14_api_version_checks…
krucod3 Oct 2, 2024
7431b72
Prior to this change,
krucod3 Oct 2, 2024
79430cb
Upgrading sections
krucod3 Oct 7, 2024
d2b2cf8
Merge remote-tracking branch 'origin/main' into 14_api_version_checks…
krucod3 Oct 14, 2024
3ff1f0e
Boxing big objects
krucod3 Oct 14, 2024
01455f7
Fix typo
krucod3 Oct 14, 2024
15fa2cb
Merge branch 'main' into 14_api_version_checks_for_connections
krucod3 Oct 18, 2024
208b39d
Handle the case of broken protobuf data.
krucod3 Oct 18, 2024
3ca1771
Apply suggestions from code review
krucod3 Oct 22, 2024
e26de82
Apply suggestions from code review
krucod3 Oct 22, 2024
a4058ee
Apply suggestions from code review
krucod3 Oct 22, 2024
a293746
Apply suggestions from code review
krucod3 Oct 22, 2024
8193991
Fox code review findings.
krucod3 Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

246 changes: 220 additions & 26 deletions agent/src/control_interface/control_interface_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@

use std::sync::Arc;

use crate::control_interface::ToAnkaios;
use crate::control_interface::{to_ankaios, ToAnkaios};

#[cfg_attr(test, mockall_double::double)]
use super::authorizer::Authorizer;
#[cfg_attr(test, mockall_double::double)]
use super::reopen_file::ReopenFile;
use api::{ank_base, control_api};
use common::{
check_version_compatibility,
from_server_interface::{FromServer, FromServerReceiver},
to_server_interface::{ToServer, ToServerSender},
};

use prost::Message;
use tokio::{io, select, task::JoinHandle};

fn decode_to_server(protobuf_data: io::Result<Box<[u8]>>) -> io::Result<control_api::ToAnkaios> {
const INITIAL_HELLO_MISSING_MSG: &str = "Initial Hello missing!";

fn decode_to_server(protobuf_data: io::Result<Vec<u8>>) -> io::Result<control_api::ToAnkaios> {
Ok(control_api::ToAnkaios::decode(&mut Box::new(
protobuf_data?.as_ref(),
))?)
Expand Down Expand Up @@ -63,7 +66,29 @@ impl ControlInterfaceTask {
authorizer,
}
}

async fn check_initial_hello(&mut self) -> Result<(), String> {
if let Ok(to_ankaios) = decode_to_server(self.input_stream.read_protobuf_data().await) {
match to_ankaios.try_into() {
Ok(ToAnkaios::Hello(to_ankaios::Hello { protocol_version })) => {
check_version_compatibility(protocol_version)?
}
unexpected => {
log::debug!("Expected initial Hello, received: '{unexpected:?}'.");
return Err(INITIAL_HELLO_MISSING_MSG.into());
}
}
}
inf17101 marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

pub async fn run(mut self) {
if let Err(message) = self.check_initial_hello().await {
log::warn!("{message}");
let _ = self.send_connection_closed(message).await;
return;
}

loop {
select! {
// [impl->swdd~agent-ensures-control-interface-output-pipe-read~1]
Expand Down Expand Up @@ -95,10 +120,17 @@ impl ControlInterfaceTask {
};
let _ = self.forward_from_server(error).await;
};
},
Ok(ToAnkaios::Hello(to_ankaios::Hello{protocol_version})) => {
log::warn!("Received a second Hello with protocol version '{protocol_version}'");
krucod3 marked this conversation as resolved.
Show resolved Hide resolved
if let Err(message) = check_version_compatibility(protocol_version) {
log::warn!("{message}");
let _ = self.send_connection_closed(message).await;
return;
}
}
Err(error) => {
log::warn!("Could not convert protobuf in internal data structure: '{}'", error);

}
}
} else {
Expand All @@ -118,6 +150,21 @@ impl ControlInterfaceTask {
tokio::spawn(self.run())
}

async fn send_connection_closed(&mut self, reason: String) -> io::Result<()> {
use control_api::from_ankaios::FromAnkaiosEnum;
let message = control_api::FromAnkaios {
from_ankaios_enum: Some(FromAnkaiosEnum::ConnectionClosed(
control_api::ConnectionClosed { reason },
)),
};

// [impl->swdd~agent-uses-length-delimited-protobuf-for-pipes~1]
let binary = message.encode_length_delimited_to_vec();
self.output_stream.write_all(&binary).await?;

Ok(())
}

async fn forward_from_server(&mut self, response: ank_base::Response) -> io::Result<()> {
use control_api::from_ankaios::FromAnkaiosEnum;
let message = control_api::FromAnkaios {
Expand Down Expand Up @@ -160,10 +207,7 @@ pub fn generate_test_control_interface_task_mock() -> __mock_MockControlInterfac
mod tests {
use std::{io::Error, sync::Arc};

use common::{
commands::{self, CompleteStateRequest},
to_server_interface::ToServer,
};
use common::{commands, to_server_interface::ToServer};
use mockall::{predicate, Sequence};
use tokio::sync::mpsc;

Expand All @@ -172,7 +216,40 @@ mod tests {

use super::ControlInterfaceTask;

use crate::control_interface::{authorizer::MockAuthorizer, reopen_file::MockReopenFile};
use crate::control_interface::{
authorizer::MockAuthorizer, control_interface_task::INITIAL_HELLO_MISSING_MSG,
reopen_file::MockReopenFile,
};

const REQUEST_ID: &str = "req_id";

fn prepare_workload_hello_binary_message(version: impl Into<String>) -> Vec<u8> {
let workload_hello = control_api::ToAnkaios {
to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Hello(
control_api::Hello {
protocol_version: version.into(),
},
)),
};

workload_hello.encode_to_vec()
}

fn prepare_request_complete_state_binary_message(field_mask: impl Into<String>) -> Vec<u8> {
let ank_request = ank_base::Request {
request_id: REQUEST_ID.into(),
request_content: Some(ank_base::request::RequestContent::CompleteStateRequest(
ank_base::CompleteStateRequest {
field_mask: vec![field_mask.into()],
},
)),
};
let test_output_request = control_api::ToAnkaios {
to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(ank_request)),
};

test_output_request.encode_to_vec()
}

#[tokio::test]
async fn utest_control_interface_task_forward_from_server() {
Expand All @@ -181,7 +258,7 @@ mod tests {
.await;

let response = ank_base::Response {
request_id: "req_id".to_owned(),
request_id: REQUEST_ID.into(),
response_content: Some(ank_base::response::ResponseContent::CompleteState(
Default::default(),
)),
Expand Down Expand Up @@ -232,7 +309,7 @@ mod tests {
let test_output_request = control_api::ToAnkaios {
to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(
ank_base::Request {
request_id: "req_id".to_owned(),
request_id: REQUEST_ID.into(),
request_content: Some(ank_base::request::RequestContent::CompleteStateRequest(
ank_base::CompleteStateRequest { field_mask: vec![] },
)),
Expand All @@ -245,13 +322,19 @@ mod tests {
let mut mockall_seq = Sequence::new();

let mut input_stream_mock = MockReopenFile::default();
let mut x = [0; 12];
x.clone_from_slice(&test_output_request_binary[..]);

let workload_hello_binary = prepare_workload_hello_binary_message(common::ANKAIOS_VERSION);
input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.returning(move || Ok(Box::new(x)));
.return_once(move || Ok(workload_hello_binary));

input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.return_once(move || Ok(test_output_request_binary));

input_stream_mock
.expect_read_protobuf_data()
Expand All @@ -260,7 +343,7 @@ mod tests {
.returning(move || Err(Error::new(std::io::ErrorKind::Other, "error")));

let error = ank_base::Response {
request_id: "req_id".to_owned(),
request_id: REQUEST_ID.into(),
response_content: Some(ank_base::response::ResponseContent::Error(
ank_base::Error {
message: "Access denied".into(),
Expand Down Expand Up @@ -311,31 +394,38 @@ mod tests {
.get_lock_async()
.await;

let request_id = "req_id";
let ank_request = ank_base::Request {
request_id: request_id.to_owned(),
request_id: REQUEST_ID.into(),
request_content: Some(ank_base::request::RequestContent::CompleteStateRequest(
ank_base::CompleteStateRequest {
field_mask: vec!["desiredState.workloads.nginx".to_string()],
},
)),
};
let test_output_request = control_api::ToAnkaios {
to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(ank_request)),
to_ankaios_enum: Some(control_api::to_ankaios::ToAnkaiosEnum::Request(
ank_request.clone(),
)),
};

let test_output_request_binary = test_output_request.encode_to_vec();

let mut mockall_seq = Sequence::new();

let mut input_stream_mock = MockReopenFile::default();
let mut x = [0; 42];
x.clone_from_slice(&test_output_request_binary[..]);

let workload_hello_binary = prepare_workload_hello_binary_message(common::ANKAIOS_VERSION);
input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.returning(move || Ok(Box::new(x)));
.return_once(move || Ok(workload_hello_binary));

input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.return_once(move || Ok(test_output_request_binary));

input_stream_mock
.expect_read_protobuf_data()
Expand Down Expand Up @@ -363,16 +453,120 @@ mod tests {

control_interface_task.run().await;

let mut expected_request = commands::Request {
request_id: request_id.to_owned(),
request_content: commands::RequestContent::CompleteStateRequest(CompleteStateRequest {
field_mask: vec!["desiredState.workloads.nginx".to_string()],
}),
};
let mut expected_request: commands::Request = ank_request.try_into().unwrap();
expected_request.prefix_request_id(request_id_prefix);
assert_eq!(
output_pipe_receiver.recv().await,
Some(ToServer::Request(expected_request))
);
}

// TODO add requirements tracing
#[tokio::test]
async fn utest_control_interface_task_run_task_no_hello() {
let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC
.get_lock_async()
.await;

let test_output_request_binary = prepare_request_complete_state_binary_message("");

let mut mockall_seq = Sequence::new();
let mut input_stream_mock = MockReopenFile::default();
input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.return_once(move || Ok(test_output_request_binary));

let test_input_command_binary = control_api::FromAnkaios {
from_ankaios_enum: Some(
control_api::from_ankaios::FromAnkaiosEnum::ConnectionClosed(
control_api::ConnectionClosed {
reason: INITIAL_HELLO_MISSING_MSG.into(),
},
),
),
}
.encode_length_delimited_to_vec();

let mut output_stream_mock = MockReopenFile::default();
output_stream_mock
.expect_write_all()
.with(predicate::eq(test_input_command_binary))
.once()
.returning(|_| Ok(()));

let (_input_pipe_sender, input_pipe_receiver) = mpsc::channel(1);
let (output_pipe_sender, mut output_pipe_receiver) = mpsc::channel(1);
let request_id_prefix = "prefix@";

let authorizer = MockAuthorizer::default();

let control_interface_task = ControlInterfaceTask::new(
output_stream_mock,
input_stream_mock,
input_pipe_receiver,
output_pipe_sender,
request_id_prefix.to_owned(),
Arc::new(authorizer),
);

control_interface_task.run().await;
assert!(output_pipe_receiver.recv().await.is_none());
}

// TODO add requirements tracing
#[tokio::test]
async fn utest_control_interface_task_run_task_hello_unsupported_version() {
let _guard = crate::test_helper::MOCKALL_CONTEXT_SYNC
.get_lock_async()
.await;

let mut mockall_seq = Sequence::new();
let mut input_stream_mock = MockReopenFile::default();

let unsupported_version = "1999.1.0";
let workload_hello_binary = prepare_workload_hello_binary_message(unsupported_version);
input_stream_mock
.expect_read_protobuf_data()
.once()
.in_sequence(&mut mockall_seq)
.return_once(move || Ok(workload_hello_binary));

let test_input_command_binary = control_api::FromAnkaios {
from_ankaios_enum: Some(
control_api::from_ankaios::FromAnkaiosEnum::ConnectionClosed(
control_api::ConnectionClosed {
reason: format!("Unsupported protocol version '{unsupported_version}'. Currently supported '{}'", common::ANKAIOS_VERSION),
},
),
),
}
.encode_length_delimited_to_vec();

let mut output_stream_mock = MockReopenFile::default();
output_stream_mock
.expect_write_all()
.with(predicate::eq(test_input_command_binary))
.once()
.returning(|_| Ok(()));

let (_input_pipe_sender, input_pipe_receiver) = mpsc::channel(1);
let (output_pipe_sender, mut output_pipe_receiver) = mpsc::channel(1);
let request_id_prefix = "prefix@";

let authorizer = MockAuthorizer::default();

let control_interface_task = ControlInterfaceTask::new(
output_stream_mock,
input_stream_mock,
input_pipe_receiver,
output_pipe_sender,
request_id_prefix.to_owned(),
Arc::new(authorizer),
);

control_interface_task.run().await;
assert!(output_pipe_receiver.recv().await.is_none());
}
}
Loading