Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow reusing same shared IpcSharedMem for transfers #356

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ harness = false
name = "ipc_receiver_set"
harness = false

[[bench]]
name = "ipc_shared_mem"
harness = false

[features]
force-inprocess = []
memfd = ["sc"]
Expand Down
65 changes: 65 additions & 0 deletions benches/ipc_shared_mem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::time::Instant;

use criterion::{criterion_group, criterion_main, Criterion};
use ipc_channel::ipc::{self, IpcSharedMemory};

#[inline]
fn on_recv<const MUT: bool>(mut ism: IpcSharedMemory) -> IpcSharedMemory {
if MUT {
let data = unsafe { ism.deref_mut() };
for d in data {
*d += 1;
}
ism
} else {
let mut data = ism.to_vec();
for d in &mut data {
*d += 1;
}
IpcSharedMemory::from_bytes(&data)
}
}

fn ping_pong_mut_shared_mem<const MUT: bool, const SIZE: usize, const COUNT: u8>(
criterion: &mut Criterion,
) {
criterion.bench_function(
&format!(
"ping_pong_shared_mem{}_{SIZE}_{COUNT}",
if MUT { "_mut" } else { "" }
),
|bencher| {
bencher.iter_custom(|_| {
let (tx1, rx1) = ipc::channel().unwrap();
let (tx2, rx2) = ipc::channel().unwrap();
let tx = tx1.clone();
let _t1 = std::thread::spawn(move || {
for _i in 0..=COUNT / 2 {
tx2.send(on_recv::<MUT>(rx1.recv().unwrap())).unwrap();
}
});
let t2 = std::thread::spawn(move || {
for _i in 0..COUNT / 2 {
tx1.send(on_recv::<MUT>(rx2.recv().unwrap())).unwrap();
}
rx2.recv().unwrap().to_vec()
});
let start = Instant::now();
tx.send(IpcSharedMemory::from_byte(0, SIZE)).unwrap();
let data = t2.join().unwrap();
let duration = start.elapsed();
assert!(data.iter().all(|d| *d == (COUNT / 2) * 2 + 1));
duration
});
},
);
}

criterion_group!(
benches,
ping_pong_mut_shared_mem<true, {4*1024*1024}, 100>,
ping_pong_mut_shared_mem<false, {4*1024*1024}, 100>,
ping_pong_mut_shared_mem<true, {4*1024*1024}, 125>,
ping_pong_mut_shared_mem<false, {4*1024*1024}, 125>,
);
criterion_main!(benches);
18 changes: 18 additions & 0 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,24 @@ impl Deref for IpcSharedMemory {
}
}

impl IpcSharedMemory {
/// Returns a mutable reference to the deref of this [`IpcSharedMemory`].
///
/// # Safety
///
/// This is safe if there is only one reader/writer on the data.
/// User can achieve this by not cloning [`IpcSharedMemory`]
/// and serializing/deserializing only once.
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
sagudev marked this conversation as resolved.
Show resolved Hide resolved
if let Some(os_shared_memory) = &mut self.os_shared_memory {
os_shared_memory.deref_mut()
} else {
&mut []
}
}
}

impl<'de> Deserialize<'de> for IpcSharedMemory {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
Expand Down
10 changes: 10 additions & 0 deletions src/platform/inprocess/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,16 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
if self.ptr.is_null() {
panic!("attempted to access a consumed `OsIpcSharedMemory`")
}
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
let mut v = Arc::new(vec![byte; length]);
Expand Down
10 changes: 10 additions & 0 deletions src/platform/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,16 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
if self.ptr.is_null() && self.length > 0 {
panic!("attempted to access a consumed `OsIpcSharedMemory`")
}
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
unsafe fn from_raw_parts(ptr: *mut u8, length: usize) -> OsIpcSharedMemory {
OsIpcSharedMemory {
Expand Down
9 changes: 8 additions & 1 deletion src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::hash::BuildHasherDefault;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, RangeFrom};
use std::ops::{Deref, DerefMut, RangeFrom};
use std::os::fd::RawFd;
use std::ptr;
use std::slice;
Expand Down Expand Up @@ -866,6 +866,13 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
sagudev marked this conversation as resolved.
Show resolved Hide resolved
unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
}
}

impl OsIpcSharedMemory {
unsafe fn from_raw_parts(
ptr: *mut u8,
Expand Down
8 changes: 8 additions & 0 deletions src/platform/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,14 @@ impl Deref for OsIpcSharedMemory {
}
}

impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
assert!(!self.view_handle.Value.is_null() && self.handle.is_valid());
unsafe { slice::from_raw_parts_mut(self.view_handle.Value as _, self.length) }
}
}

impl OsIpcSharedMemory {
fn new(length: usize) -> Result<OsIpcSharedMemory, WinError> {
unsafe {
Expand Down