Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sandbox: bugfix infinite loop in new_ttrpc_client() #119

Merged
merged 2 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions vmm/sandbox/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ limitations under the License.
*/

use std::{
io::{BufRead, BufReader, Write},
os::unix::{
io::{IntoRawFd, RawFd},
net::UnixStream,
},
os::unix::io::{IntoRawFd, RawFd},
time::Duration,
};

Expand All @@ -34,23 +30,28 @@ use nix::{
time::{clock_gettime, ClockId},
unistd::close,
};
use tokio::time::timeout;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::UnixStream,
time::timeout,
};
use ttrpc::{context::with_timeout, r#async::Client};
use vmm_common::api::{sandbox::*, sandbox_ttrpc::SandboxServiceClient};

use crate::network::{NetworkInterface, Route};

const HVSOCK_RETRY_TIMEOUT_IN_MS: u64 = 10;
// TODO: reduce to 10s
const NEW_TTRPC_CLIENT_TIMEOUT: u64 = 45;
const TIME_SYNC_PERIOD: u64 = 60;
const TIME_DIFF_TOLERANCE_IN_MS: u64 = 10;

pub(crate) async fn new_sandbox_client(address: &str) -> Result<SandboxServiceClient> {
let client = new_ttrpc_client(address).await?;
let client = new_ttrpc_client_with_timeout(address, NEW_TTRPC_CLIENT_TIMEOUT).await?;
Ok(SandboxServiceClient::new(client))
}

async fn new_ttrpc_client(address: &str) -> Result<Client> {
let ctx_timeout = 10;

async fn new_ttrpc_client_with_timeout(address: &str, t: u64) -> Result<Client> {
let mut last_err = Error::Other(anyhow!(""));

let fut = async {
Expand All @@ -62,16 +63,17 @@ async fn new_ttrpc_client(address: &str) -> Result<Client> {
}
Err(e) => last_err = e,
}
// In case that the address doesn't exist, the executed function in this loop are all
// sync, making the first time of future poll in timeout hang forever. As a result, the
// timeout will hang too. To solve this, add a async function in this loop or call
// `tokio::task::yield_now()` to give up current cpu time slice.
tokio::time::sleep(Duration::from_millis(10)).await;
}
};

let client = timeout(Duration::from_secs(ctx_timeout), fut)
let client = timeout(Duration::from_secs(t), fut)
.await
.map_err(|_| {
let e = anyhow!("{}s timeout connecting socket: {}", ctx_timeout, last_err);
error!("{}", e);
e
})?;
.map_err(|_| anyhow!("{}s timeout connecting socket: {}", t, last_err))?;
Ok(client)
}

Expand Down Expand Up @@ -165,28 +167,31 @@ async fn connect_to_hvsocket(address: &str) -> Result<RawFd> {
if v.len() < 2 {
return Err(anyhow!("hvsock address {} should not less than 2", address).into());
}
(v[0].to_string(), v[1].to_string())
(v[0], v[1])
};

tokio::task::spawn_blocking(move || {
let mut stream =
UnixStream::connect(&addr).map_err(|e| anyhow!("failed to connect hvsock: {}", e))?;
let fut = async {
let mut stream = UnixStream::connect(addr).await?;
stream
.write_all(format!("CONNECT {}\n", port).as_bytes())
.await
.map_err(|e| anyhow!("hvsock connected but failed to write CONNECT: {}", e))?;

let mut response = String::new();
BufReader::new(&stream)
BufReader::new(&mut stream)
.read_line(&mut response)
.await
.map_err(|e| anyhow!("CONNECT sent but failed to get response: {}", e))?;
if response.starts_with("OK") {
Ok(stream.into_raw_fd())
Ok(stream.into_std()?.into_raw_fd())
} else {
Err(anyhow!("CONNECT sent but response is not OK: {}", response).into())
}
})
.await
.map_err(|e| anyhow!("failed to spawn blocking task: {}", e))?
};

timeout(Duration::from_millis(HVSOCK_RETRY_TIMEOUT_IN_MS), fut)
.await
.map_err(|_| anyhow!("hvsock retry {}ms timeout", HVSOCK_RETRY_TIMEOUT_IN_MS))?
}

pub fn unix_sock(r#abstract: bool, socket_path: &str) -> Result<UnixAddr> {
Expand Down Expand Up @@ -302,3 +307,16 @@ pub(crate) async fn client_sync_clock(client: &SandboxServiceClient, id: &str) {
}
});
}

#[cfg(test)]
mod tests {
use crate::client::new_ttrpc_client_with_timeout;

#[tokio::test]
async fn test_new_ttrpc_client_timeout() {
// Expect new_ttrpc_client would return timeout error, instead of blocking.
assert!(new_ttrpc_client_with_timeout("hvsock://fake.sock:1024", 1)
.await
.is_err());
}
}
35 changes: 21 additions & 14 deletions vmm/sandbox/src/cloud_hypervisor/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::{
use anyhow::anyhow;
use api_client::{simple_api_command, simple_api_full_command_with_fds_and_response};
use containerd_sandbox::error::Result;
use log::error;
use log::{debug, error, trace};
use tokio::task::spawn_blocking;

use crate::{
cloud_hypervisor::devices::{block::DiskConfig, AddDeviceResponse, RemoveDeviceRequest},
Expand All @@ -38,25 +39,31 @@ pub struct ChClient {

impl ChClient {
pub async fn new(socket_path: String) -> Result<Self> {
let s = socket_path.to_string();
let start_time = SystemTime::now();
tokio::task::spawn_blocking(move || loop {
match UnixStream::connect(&socket_path) {
Ok(socket) => {
return Ok(Self { socket });
}
Err(e) => {
if start_time.elapsed().unwrap().as_secs()
> CLOUD_HYPERVISOR_START_TIMEOUT_IN_SEC
{
error!("failed to connect api server: {:?}", e);
return Err(anyhow!("timeout connect client, {}", e).into());
let socket = spawn_blocking(move || -> Result<UnixStream> {
loop {
match UnixStream::connect(&socket_path) {
Ok(socket) => {
return Ok(socket);
}
Err(e) => {
trace!("failed to create client: {:?}", e);
if start_time.elapsed().unwrap().as_secs()
> CLOUD_HYPERVISOR_START_TIMEOUT_IN_SEC
{
error!("failed to create client: {:?}", e);
return Err(anyhow!("timeout connect client, {}", e).into());
}
sleep(Duration::from_millis(10));
}
sleep(Duration::from_millis(10));
}
}
})
.await
.map_err(|e| anyhow!("failed to spawn a task {}", e))?
.map_err(|e| anyhow!("failed to join thread {}", e))??;
debug!("connected to api server {}", s);
Ok(Self { socket })
}

pub fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<String> {
Expand Down
Loading