Skip to content

Commit

Permalink
Replace mpsc with crossbeam-channel
Browse files Browse the repository at this point in the history
On Apple Silicon, we were hitting a concurrency error that manifest
with recv() panicking like this:

thread '<unnamed>' panicked at 'internal error: entered unreachable
code'

Seems to be a known issue
rust-lang/rust#39364

Follow the recommendation on the thread, and switch from mpsc (which
seems its going to be retired) to crossbeam-channel.

Signed-off-by: Sergio Lopez <slp@sinrega.org>
  • Loading branch information
slp authored and tylerfanelli committed Sep 1, 2022
1 parent b9dd9d8 commit 5e3fcd4
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 24 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/devices/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ amd-sev = []

[dependencies]
bitflags = "1.2.0"
crossbeam-channel = "0.5"
env_logger = "0.9.0"
libc = ">=0.2.39"
log = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion src/devices/src/legacy/gic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2021 Red Hat, Inc.
// SPDX-License-Identifier: Apache-2.0

use crossbeam_channel::Sender;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, VecDeque};
use std::convert::TryInto;
use std::sync::mpsc::Sender;

use arch::aarch64::gicv2::GICv2;
use arch::aarch64::layout::GTIMER_VIRT;
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/vsock/muxer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex, RwLock};

use super::super::super::legacy::Gic;
Expand All @@ -17,6 +16,7 @@ use super::reaper::ReaperThread;
use super::tcp::TcpProxy;
use super::udp::UdpProxy;
use super::VsockError;
use crossbeam_channel::{unbounded, Sender};
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use utils::eventfd::EventFd;
use vm_memory::GuestMemoryMmap;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl VsockMuxer {
self.intc = intc.clone();
self.irq_line = irq_line;

let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let thread = MuxerThread::new(
self.cid,
Expand Down
2 changes: 1 addition & 1 deletion src/devices/src/virtio/vsock/muxer_thread.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;

Expand All @@ -12,6 +11,7 @@ use super::muxer_rxq::MuxerRxQ;
use super::proxy::{ProxyRemoval, ProxyUpdate};
use super::tcp::TcpProxy;

use crossbeam_channel::Sender;
use rand::{rngs::ThreadRng, thread_rng, Rng};
use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use utils::eventfd::EventFd;
Expand Down
2 changes: 1 addition & 1 deletion src/devices/src/virtio/vsock/reaper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};

use super::proxy::Proxy;
use crossbeam_channel::Receiver;

pub type ProxyMap = Arc<RwLock<HashMap<u64, Mutex<Box<dyn Proxy>>>>>;
const TIMEOUT: Duration = Duration::new(5, 0);
Expand Down
1 change: 1 addition & 0 deletions src/vmm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
amd-sev = [ "codicon", "kbs-types", "procfs", "serde", "serde_json", "sev", "curl" ]

[dependencies]
crossbeam-channel = "0.5"
env_logger = "0.9.0"
libc = ">=0.2.39"
log = "0.4.0"
Expand Down
6 changes: 3 additions & 3 deletions src/vmm/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

//! Enables pre-boot setup, instantiation and booting of a Firecracker VMM.
#[cfg(target_os = "macos")]
use crossbeam_channel::unbounded;
use std::fmt::{Display, Formatter};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(target_os = "macos")]
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};

use super::{Error, Vmm};
Expand Down Expand Up @@ -930,7 +930,7 @@ fn create_vcpus_aarch64(

for cpu_index in 0..vcpu_config.vcpu_count {
let boot_receiver = if cpu_index != 0 {
let (boot_sender, boot_receiver) = channel();
let (boot_sender, boot_receiver) = unbounded();
boot_senders.push(boot_sender);
Some(boot_receiver)
} else {
Expand Down
15 changes: 8 additions & 7 deletions src/vmm/src/linux/vstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the THIRD-PARTY file.

use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
use libc::{c_int, c_void, siginfo_t};
use std::cell::Cell;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::atomic::{fence, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
#[cfg(not(test))]
use std::sync::Barrier;
use std::thread;
Expand Down Expand Up @@ -815,8 +815,8 @@ impl Vcpu {
create_ts: TimestampUs,
) -> Result<Self> {
let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?;
let (event_sender, event_receiver) = channel();
let (response_sender, response_receiver) = channel();
let (event_sender, event_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();

// Initially the cpuid per vCPU is the one supported by this VM.
Ok(Vcpu {
Expand Down Expand Up @@ -851,8 +851,8 @@ impl Vcpu {
create_ts: TimestampUs,
) -> Result<Self> {
let kvm_vcpu = vm_fd.create_vcpu(id as u64).map_err(Error::VcpuFd)?;
let (event_sender, event_receiver) = channel();
let (response_sender, response_receiver) = channel();
let (event_sender, event_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();

Ok(Vcpu {
fd: kvm_vcpu,
Expand Down Expand Up @@ -973,7 +973,7 @@ impl Vcpu {
pub fn start_threaded(mut self) -> Result<VcpuHandle> {
let event_sender = self.event_sender.take().unwrap();
let response_receiver = self.response_receiver.take().unwrap();
let (init_tls_sender, init_tls_receiver) = channel();
let (init_tls_sender, init_tls_receiver) = unbounded();
let vcpu_thread = thread::Builder::new()
.name(format!("fc_vcpu {}", self.cpu_index()))
.spawn(move || {
Expand Down Expand Up @@ -1420,6 +1420,7 @@ enum VcpuEmulation {

#[cfg(test)]
mod tests {
use crossbeam_channel::unbounded;
use std::fs::File;
use std::sync::{Arc, Barrier};

Expand All @@ -1434,7 +1435,7 @@ mod tests {
// Make sure the Vcpu is out of KVM_RUN.
self.send_event(VcpuEvent::Pause).unwrap();
// Close the original channel so that the Vcpu thread errors and goes to exit state.
let (event_sender, _event_receiver) = channel();
let (event_sender, _event_receiver) = unbounded();
self.event_sender = event_sender;
// Wait for the Vcpu thread to finish execution
self.vcpu_thread.take().unwrap().join().unwrap();
Expand Down
18 changes: 9 additions & 9 deletions src/vmm/src/macos/vstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::cell::Cell;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
#[cfg(not(test))]
use std::sync::{Arc, Mutex};
use std::thread;
Expand All @@ -21,6 +20,7 @@ use crate::vmm_config::machine_config::CpuFeaturesTemplate;

use arch;
use arch::aarch64::gic::GICDevice;
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use devices::legacy::Gic;
use hvf::{HvfVcpu, HvfVm, VcpuExit};
use utils::eventfd::EventFd;
Expand Down Expand Up @@ -267,8 +267,8 @@ impl Vcpu {
_create_ts: TimestampUs,
intc: Arc<Mutex<Gic>>,
) -> Result<Self> {
let (event_sender, event_receiver) = channel();
let (response_sender, response_receiver) = channel();
let (event_sender, event_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();

Ok(Vcpu {
id,
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Vcpu {
pub fn start_threaded(mut self) -> Result<VcpuHandle> {
let event_sender = self.event_sender.take().unwrap();
let response_receiver = self.response_receiver.take().unwrap();
let (init_tls_sender, init_tls_receiver) = channel();
let (init_tls_sender, init_tls_receiver) = unbounded();

let vcpu_thread = thread::Builder::new()
.name(format!("fc_vcpu {}", self.cpu_index()))
Expand Down Expand Up @@ -441,7 +441,7 @@ impl Vcpu {
let mut hvf_vcpu = HvfVcpu::new().expect("Can't create HVF vCPU");
let hvf_vcpuid = hvf_vcpu.id();

let (wfe_sender, wfe_receiver) = channel();
let (wfe_sender, wfe_receiver) = unbounded();
self.intc
.lock()
.unwrap()
Expand Down Expand Up @@ -599,11 +599,11 @@ enum VcpuEmulation {

#[cfg(test)]
mod tests {
#[cfg(target_arch = "x86_64")]
use crossbeam_channel::{unbounded, RecvTimeoutError};
use std::fs::File;
#[cfg(target_arch = "x86_64")]
use std::os::unix::io::AsRawFd;
#[cfg(target_arch = "x86_64")]
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
#[cfg(target_arch = "x86_64")]
use std::time::Duration;
Expand All @@ -619,7 +619,7 @@ mod tests {
// Make sure the Vcpu is out of KVM_RUN.
self.send_event(VcpuEvent::Pause).unwrap();
// Close the original channel so that the Vcpu thread errors and goes to exit state.
let (event_sender, _event_receiver) = channel();
let (event_sender, _event_receiver) = unbounded();
self.event_sender = event_sender;
// Wait for the Vcpu thread to finish execution
self.vcpu_thread.take().unwrap().join().unwrap();
Expand Down Expand Up @@ -933,7 +933,7 @@ mod tests {
handle
.response_receiver()
.recv_timeout(Duration::from_millis(100)),
Err(mpsc::RecvTimeoutError::Timeout)
Err(RecvTimeoutError::Timeout)
);
}

Expand Down

0 comments on commit 5e3fcd4

Please sign in to comment.