From 83d6670ffe3080b68ab0c4ec9f59edeb6f89b0af Mon Sep 17 00:00:00 2001 From: Sergio Lopez Date: Mon, 29 Aug 2022 13:31:29 +0200 Subject: [PATCH] Replace mpsc with crossbeam-channel On Apple Silicon, we were hitting a concurrency error that manifest with recv() panicking like this: thread '' panicked at 'internal error: entered unreachable code' Seems to be a known issue https://github.com/rust-lang/rust/issues/39364 Follow the recommendation on the thread, and switch from mpsc (which seems its going to be retired) to crossbeam-channel. Signed-off-by: Sergio Lopez --- Cargo.lock | 22 ++++++++++++++++++++ src/devices/Cargo.toml | 1 + src/devices/src/legacy/gic.rs | 2 +- src/devices/src/virtio/vsock/muxer.rs | 4 ++-- src/devices/src/virtio/vsock/muxer_thread.rs | 2 +- src/devices/src/virtio/vsock/reaper.rs | 2 +- src/vmm/Cargo.toml | 1 + src/vmm/src/builder.rs | 6 +++--- src/vmm/src/linux/vstate.rs | 15 ++++++------- src/vmm/src/macos/vstate.rs | 18 ++++++++-------- 10 files changed, 49 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66df3a36..db14ae4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,6 +128,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "curl" version = "0.4.44" @@ -164,6 +184,7 @@ version = "0.1.0" dependencies = [ "arch", "bitflags", + "crossbeam-channel", "env_logger", "hvf", "libc", @@ -767,6 +788,7 @@ dependencies = [ "arch", "codicon", "cpuid", + "crossbeam-channel", "curl", "devices", "env_logger", diff --git a/src/devices/Cargo.toml b/src/devices/Cargo.toml index 7dcf9782..f34cffe2 100644 --- a/src/devices/Cargo.toml +++ b/src/devices/Cargo.toml @@ -9,6 +9,7 @@ amd-sev = [] [dependencies] bitflags = "1.2.0" +crossbeam-channel = "0.5" env_logger = "0.9.0" libc = ">=0.2.39" log = "0.4.0" diff --git a/src/devices/src/legacy/gic.rs b/src/devices/src/legacy/gic.rs index a44f234f..64656097 100644 --- a/src/devices/src/legacy/gic.rs +++ b/src/devices/src/legacy/gic.rs @@ -1,10 +1,10 @@ // Copyright 2021 Red Hat, Inc. // SPDX-License-Identifier: Apache-2.0 +use crossbeam_channel::Sender; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, VecDeque}; use std::convert::TryInto; -use std::sync::mpsc::Sender; use arch::aarch64::gicv2::GICv2; use arch::aarch64::layout::GTIMER_VIRT; diff --git a/src/devices/src/virtio/vsock/muxer.rs b/src/devices/src/virtio/vsock/muxer.rs index 3994949e..27d27b46 100644 --- a/src/devices/src/virtio/vsock/muxer.rs +++ b/src/devices/src/virtio/vsock/muxer.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::os::unix::io::RawFd; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex, RwLock}; use super::super::super::legacy::Gic; @@ -17,6 +16,7 @@ use super::reaper::ReaperThread; use super::tcp::TcpProxy; use super::udp::UdpProxy; use super::VsockError; +use crossbeam_channel::{unbounded, Sender}; use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; use utils::eventfd::EventFd; use vm_memory::GuestMemoryMmap; @@ -146,7 +146,7 @@ impl VsockMuxer { self.intc = intc.clone(); self.irq_line = irq_line; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let thread = MuxerThread::new( self.cid, diff --git a/src/devices/src/virtio/vsock/muxer_thread.rs b/src/devices/src/virtio/vsock/muxer_thread.rs index f8446ec0..c28c7662 100644 --- a/src/devices/src/virtio/vsock/muxer_thread.rs +++ b/src/devices/src/virtio/vsock/muxer_thread.rs @@ -1,6 +1,5 @@ use std::os::unix::io::RawFd; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; use std::thread; @@ -12,6 +11,7 @@ use super::muxer_rxq::MuxerRxQ; use super::proxy::{ProxyRemoval, ProxyUpdate}; use super::tcp::TcpProxy; +use crossbeam_channel::Sender; use rand::{rngs::ThreadRng, thread_rng, Rng}; use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; use utils::eventfd::EventFd; diff --git a/src/devices/src/virtio/vsock/reaper.rs b/src/devices/src/virtio/vsock/reaper.rs index 1cf45709..d5326518 100644 --- a/src/devices/src/virtio/vsock/reaper.rs +++ b/src/devices/src/virtio/vsock/reaper.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; -use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::{Duration, Instant}; use super::proxy::Proxy; +use crossbeam_channel::Receiver; pub type ProxyMap = Arc>>>>; const TIMEOUT: Duration = Duration::new(5, 0); diff --git a/src/vmm/Cargo.toml b/src/vmm/Cargo.toml index 5df6f6ef..3f03b4f2 100644 --- a/src/vmm/Cargo.toml +++ b/src/vmm/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" amd-sev = [ "codicon", "kbs-types", "procfs", "serde", "serde_json", "sev", "curl" ] [dependencies] +crossbeam-channel = "0.5" env_logger = "0.9.0" libc = ">=0.2.39" log = "0.4.0" diff --git a/src/vmm/src/builder.rs b/src/vmm/src/builder.rs index ca751548..dd5ffadb 100644 --- a/src/vmm/src/builder.rs +++ b/src/vmm/src/builder.rs @@ -3,11 +3,11 @@ //! Enables pre-boot setup, instantiation and booting of a Firecracker VMM. +#[cfg(target_os = "macos")] +use crossbeam_channel::unbounded; use std::fmt::{Display, Formatter}; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -#[cfg(target_os = "macos")] -use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use super::{Error, Vmm}; @@ -930,7 +930,7 @@ fn create_vcpus_aarch64( for cpu_index in 0..vcpu_config.vcpu_count { let boot_receiver = if cpu_index != 0 { - let (boot_sender, boot_receiver) = channel(); + let (boot_sender, boot_receiver) = unbounded(); boot_senders.push(boot_sender); Some(boot_receiver) } else { diff --git a/src/vmm/src/linux/vstate.rs b/src/vmm/src/linux/vstate.rs index 38a7956f..cdbcad3d 100644 --- a/src/vmm/src/linux/vstate.rs +++ b/src/vmm/src/linux/vstate.rs @@ -5,13 +5,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. +use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}; use libc::{c_int, c_void, siginfo_t}; use std::cell::Cell; use std::fmt::{Display, Formatter}; use std::io; use std::result; use std::sync::atomic::{fence, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; #[cfg(not(test))] use std::sync::Barrier; use std::thread; @@ -815,8 +815,8 @@ impl Vcpu { create_ts: TimestampUs, ) -> Result { let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?; - let (event_sender, event_receiver) = channel(); - let (response_sender, response_receiver) = channel(); + let (event_sender, event_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); // Initially the cpuid per vCPU is the one supported by this VM. Ok(Vcpu { @@ -851,8 +851,8 @@ impl Vcpu { create_ts: TimestampUs, ) -> Result { let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?; - let (event_sender, event_receiver) = channel(); - let (response_sender, response_receiver) = channel(); + let (event_sender, event_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); Ok(Vcpu { fd: kvm_vcpu, @@ -973,7 +973,7 @@ impl Vcpu { pub fn start_threaded(mut self) -> Result { let event_sender = self.event_sender.take().unwrap(); let response_receiver = self.response_receiver.take().unwrap(); - let (init_tls_sender, init_tls_receiver) = channel(); + let (init_tls_sender, init_tls_receiver) = unbounded(); let vcpu_thread = thread::Builder::new() .name(format!("fc_vcpu {}", self.cpu_index())) .spawn(move || { @@ -1420,6 +1420,7 @@ enum VcpuEmulation { #[cfg(test)] mod tests { + use crossbeam_channel::unbounded; use std::fs::File; use std::sync::{Arc, Barrier}; @@ -1434,7 +1435,7 @@ mod tests { // Make sure the Vcpu is out of KVM_RUN. self.send_event(VcpuEvent::Pause).unwrap(); // Close the original channel so that the Vcpu thread errors and goes to exit state. - let (event_sender, _event_receiver) = channel(); + let (event_sender, _event_receiver) = unbounded(); self.event_sender = event_sender; // Wait for the Vcpu thread to finish execution self.vcpu_thread.take().unwrap().join().unwrap(); diff --git a/src/vmm/src/macos/vstate.rs b/src/vmm/src/macos/vstate.rs index c0a87a70..91d5c76b 100644 --- a/src/vmm/src/macos/vstate.rs +++ b/src/vmm/src/macos/vstate.rs @@ -9,7 +9,6 @@ use std::cell::Cell; use std::fmt::{Display, Formatter}; use std::io; use std::result; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; #[cfg(not(test))] use std::sync::{Arc, Mutex}; use std::thread; @@ -21,6 +20,7 @@ use crate::vmm_config::machine_config::CpuFeaturesTemplate; use arch; use arch::aarch64::gic::GICDevice; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use devices::legacy::Gic; use hvf::{HvfVcpu, HvfVm, VcpuExit}; use utils::eventfd::EventFd; @@ -267,8 +267,8 @@ impl Vcpu { _create_ts: TimestampUs, intc: Arc>, ) -> Result { - let (event_sender, event_receiver) = channel(); - let (response_sender, response_receiver) = channel(); + let (event_sender, event_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); Ok(Vcpu { id, @@ -325,7 +325,7 @@ impl Vcpu { pub fn start_threaded(mut self) -> Result { let event_sender = self.event_sender.take().unwrap(); let response_receiver = self.response_receiver.take().unwrap(); - let (init_tls_sender, init_tls_receiver) = channel(); + let (init_tls_sender, init_tls_receiver) = unbounded(); let vcpu_thread = thread::Builder::new() .name(format!("fc_vcpu {}", self.cpu_index())) @@ -441,7 +441,7 @@ impl Vcpu { let mut hvf_vcpu = HvfVcpu::new().expect("Can't create HVF vCPU"); let hvf_vcpuid = hvf_vcpu.id(); - let (wfe_sender, wfe_receiver) = channel(); + let (wfe_sender, wfe_receiver) = unbounded(); self.intc .lock() .unwrap() @@ -599,11 +599,11 @@ enum VcpuEmulation { #[cfg(test)] mod tests { + #[cfg(target_arch = "x86_64")] + use crossbeam_channel::{unbounded, RecvTimeoutError}; use std::fs::File; #[cfg(target_arch = "x86_64")] use std::os::unix::io::AsRawFd; - #[cfg(target_arch = "x86_64")] - use std::sync::mpsc; use std::sync::{Arc, Barrier}; #[cfg(target_arch = "x86_64")] use std::time::Duration; @@ -619,7 +619,7 @@ mod tests { // Make sure the Vcpu is out of KVM_RUN. self.send_event(VcpuEvent::Pause).unwrap(); // Close the original channel so that the Vcpu thread errors and goes to exit state. - let (event_sender, _event_receiver) = channel(); + let (event_sender, _event_receiver) = unbounded(); self.event_sender = event_sender; // Wait for the Vcpu thread to finish execution self.vcpu_thread.take().unwrap().join().unwrap(); @@ -933,7 +933,7 @@ mod tests { handle .response_receiver() .recv_timeout(Duration::from_millis(100)), - Err(mpsc::RecvTimeoutError::Timeout) + Err(RecvTimeoutError::Timeout) ); }