diff --git a/conmon-rs/server/src/container_io.rs b/conmon-rs/server/src/container_io.rs index b1ebc52a28..4741c3115f 100644 --- a/conmon-rs/server/src/container_io.rs +++ b/conmon-rs/server/src/container_io.rs @@ -235,7 +235,6 @@ impl ContainerIO { logger: SharedContainerLog, message_tx: UnboundedSender, mut attach: SharedContainerAttach, - token: CancellationToken, ) -> Result<()> where T: AsyncRead + Unpin, @@ -243,76 +242,69 @@ impl ContainerIO { let mut buf = vec![0; 1024]; loop { - // In situations we're processing output directly from the I/O streams - // we need a mechanism to figure out when to stop that doesn't involve reading the - // number of bytes read. - // Thus, we need to select on the cancellation token saved in the child. - // While this could result in a data race, as select statements are racy, - // we won't interleve these two futures, as one ends execution. - select! { - n = reader.read(&mut buf) => { - match n { - Ok(n) if n > 0 => { - debug!("Read {} bytes", n); - let data = &buf[..n]; - - let mut locked_logger = logger.write().await; - locked_logger - .write(pipe, data) - .await - .context("write to log file")?; + match reader.read(&mut buf).await { + Ok(n) if n == 0 => { + debug!("Nothing more to read"); - attach - .write(Message::Data(data.into(), pipe)) - .await - .context("write to attach endpoints")?; + attach + .write(Message::Done) + .await + .context("write to attach endpoints")?; - if !message_tx.is_closed() { - message_tx - .send(Message::Data(data.into(), pipe)) - .context("send data message")?; - } - } - Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) { - Errno::EIO => { - debug!("Stopping read loop"); - attach - .write(Message::Done) - .await - .context("write to attach endpoints")?; - - message_tx - .send(Message::Done) - .context("send done message")?; - return Ok(()); - } - Errno::EBADF => { - return Err(Errno::EBADFD.into()); - } - Errno::EAGAIN => { - continue; - } - _ => error!( - "Unable to read from file descriptor: {} {}", - e, - e.raw_os_error().context("get OS error")? - ), - }, - _ => {} + if !message_tx.is_closed() { + message_tx + .send(Message::Done) + .context("send done message")?; } + + return Ok(()); } - _ = token.cancelled() => { - debug!("Sending done because token cancelled"); + + Ok(n) => { + debug!("Read {} bytes", n); + let data = &buf[..n]; + + let mut locked_logger = logger.write().await; + locked_logger + .write(pipe, data) + .await + .context("write to log file")?; + attach - .write(Message::Done) + .write(Message::Data(data.into(), pipe)) .await .context("write to attach endpoints")?; - message_tx - .send(Message::Done) - .context("send done message")?; - return Ok(()); + if !message_tx.is_closed() { + message_tx + .send(Message::Data(data.into(), pipe)) + .context("send data message")?; + } } + + Err(e) => match Errno::from_i32(e.raw_os_error().context("get OS error")?) { + Errno::EIO => { + debug!("Stopping read loop"); + attach + .write(Message::Done) + .await + .context("write to attach endpoints")?; + + if !message_tx.is_closed() { + message_tx + .send(Message::Done) + .context("send done message")?; + } + return Ok(()); + } + Errno::EBADF => bail!(e), + Errno::EAGAIN => continue, + _ => error!( + "Unable to read from file descriptor: {} {}", + e, + e.raw_os_error().context("get OS error")? + ), + }, } } } diff --git a/conmon-rs/server/src/streams.rs b/conmon-rs/server/src/streams.rs index 67a54247c2..0676e5c574 100644 --- a/conmon-rs/server/src/streams.rs +++ b/conmon-rs/server/src/streams.rs @@ -65,12 +65,11 @@ impl Streams { let attach = self.attach().clone(); let message_tx = self.message_tx_stdout().clone(); - let token_clone = token.clone(); if let Some(stdin) = stdin { task::spawn( async move { if let Err(e) = - ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token_clone).await + ContainerIO::read_loop_stdin(stdin.as_raw_fd(), attach, token).await { error!("Stdin read loop failure: {:#}", e); } @@ -80,19 +79,12 @@ impl Streams { } let attach = self.attach().clone(); - let token_clone = token.clone(); if let Some(stdout) = stdout { task::spawn( async move { - if let Err(e) = ContainerIO::read_loop( - stdout, - Pipe::StdOut, - logger, - message_tx, - attach, - token_clone, - ) - .await + if let Err(e) = + ContainerIO::read_loop(stdout, Pipe::StdOut, logger, message_tx, attach) + .await { error!("Stdout read loop failure: {:#}", e); } @@ -107,15 +99,9 @@ impl Streams { if let Some(stderr) = stderr { task::spawn( async move { - if let Err(e) = ContainerIO::read_loop( - stderr, - Pipe::StdErr, - logger, - message_tx, - attach, - token, - ) - .await + if let Err(e) = + ContainerIO::read_loop(stderr, Pipe::StdErr, logger, message_tx, attach) + .await { error!("Stderr read loop failure: {:#}", e); } diff --git a/conmon-rs/server/src/terminal.rs b/conmon-rs/server/src/terminal.rs index 0ad63e7eba..7014add45c 100644 --- a/conmon-rs/server/src/terminal.rs +++ b/conmon-rs/server/src/terminal.rs @@ -115,7 +115,6 @@ impl Terminal { let logger_clone = self.logger.clone(); let (message_tx, message_rx) = mpsc::unbounded_channel(); self.message_rx = Some(message_rx); - let token_clone = token.clone(); task::spawn( async move { @@ -125,7 +124,6 @@ impl Terminal { logger_clone, message_tx, attach_clone, - token_clone, ) .await { diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 1bbdb35850..3f1bc02ebb 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -118,8 +118,8 @@ var _ = Describe("ConmonClient", func() { }) It(testName("should execute cleanup command when container exits", terminal), func() { - filepath := fmt.Sprintf("%s/conmon-client-test%s", os.TempDir(), tr.ctrID) tr = newTestRunner() + filepath := fmt.Sprintf("%s/conmon-client-test%s", os.TempDir(), tr.ctrID) tr.createRuntimeConfig(terminal) sut = tr.configGivenEnv() tr.createContainerWithConfig(sut, &client.CreateContainerConfig{ @@ -312,7 +312,7 @@ var _ = Describe("ConmonClient", func() { rssAfter := vmRSSGivenPID(pid) GinkgoWriter.Printf("VmRSS after: %d\n", rssAfter) GinkgoWriter.Printf("VmRSS diff: %d\n", rssAfter-rssBefore) - Expect(rssAfter - rssBefore).To(BeNumerically("<", 1000)) + Expect(rssAfter - rssBefore).To(BeNumerically("<", 1200)) }) } })