-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Splice between a pipe and a socket #26
Comments
According to the man page of [
You can use |
There're two functions for this: |
Ok, but how to make them work with |
Checkout Then, you can just use |
Ok, makes sense. Thank you! |
Tried that, but I'm hitting a problem: async fn forward(src: OwnedReadHalf, dest: OwnedWriteHalf) -> io::Result<u64> {
use tokio_pipe::{pipe, PipeRead, PipeWrite};
let (mut pipe_read, mut pipe_write) = pipe()?;
let mut in_fd = src.as_ref().as_raw_fd();
let mut out_fd = dest.as_ref().as_raw_fd();
let mut total_count = 0;
loop {
let count = pipe_write.splice_from(&mut AsyncFd::new(in_fd)?, None, 4096).await?;
if count == 0 {
break;
}
pipe_read.splice_to(&mut AsyncFd::new(out_fd)?, None, count, true).await?;
}
Ok(total_count)
If I remove the inner body of the loop ( |
Aha I got it. That's because |
@pkolaczk Can you try the PR linked with this issue to see if it fixes the issue for you? |
Yeah, checking.... |
Looks good! Thank you! |
Huh, but the idea still doesn't work:
This fails with:
I also tried without getting the raw_fd:
But that doesn't even compile:
How to get an |
Ok, I got it. The problem is that you cannot register the same fd twice with |
The fundamental problem is that we don't any safe way to express a reference to valid fd right now, but we do have in 1.63.0 rust-lang/rust#95118 |
Is there any workaround for this, without waiting for 1.63? |
For now, you can convert |
That sounds like a plan, thank you. Will try that and get back to you. |
|
Yeah. The API for |
Aaaargh, cannot use Mutex, because the API takes an unowned reference, so the reference lives after
But trying to drop the guard earlier also doesn't work:
|
You need to use |
Ah, good catch. Ok, mutex problem solved. Compiles but still doesn't work.
I'm getting a panic here: "Bad file descriptor" |
You need to use
Though I strongly recommend you to do this instead:
It also has a |
Ok, thanks that fixed it, but apparently this idea with locking is still wrong. By using a lock on a shared descriptor, I cannot simultaneously write and read, and this leads to a deadlock. Need to find a better way without locking. And locking doesn't seem right here, as this AsyncFd is Sync and Send so why would I need locking? Crazy idea: what if I just unsafely transmute the reference behind Arc to |
@pkolaczk You don't have to use these You can use Then you won't need the |
Aaaah, and now you tell me this :D |
Sorry I didn't realize it can be done this way earlier. |
Yeah, but aren't we running again into the problem of registering the same Fd twice? |
OwnedFd::try_clone duplicates the file handle at the OS level, so the kernel sees them as separate file descriptor/handle and you can register both twice. |
Indeed the registration worked. But it still deadlocks, so I guess there must be still sth else I'm doing wrong. Debugging... |
It is not a deadlock, but a livelock. It kinda works, but only one-way. I basically try to create a bidirectional proxy between 2 sockets. Here is the code: type In = AsyncFd<OwnedFd>;
type Out = AsyncFd<OwnedFd>;
pub fn split(stream: TcpStream) -> (In, Out) {
let fd: OwnedFd = stream.into_std().unwrap().into();
let in_fd = AsyncFd::new(fd.try_clone().unwrap()).unwrap();
let out_fd = AsyncFd::new(fd).unwrap();
(in_fd, out_fd)
}
pub async fn forward(mut src: In, mut dest: Out) -> io::Result<u64> {
let (mut pipe_read, mut pipe_write) = tokio_pipe::pipe()?;
let mut total_count = 0;
loop {
let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
if count == 0 {
break;
}
total_count += count as u64;
while count > 0 {
let sent_count = pipe_read.splice_to(&mut dest, None, count, true).await?;
count -= sent_count;
}
}
Ok(total_count)
}
pub async fn handle_connection(...) {
// ...
let (iread, iwrite) = split(client);
let (fread, fwrite) = split(forwardee);
tokio::select! {
result = forward(iread, fwrite) => { if let Err(e) = result {
eprintln!("Transfer failed: {}", e);
}},
result = forward(fread, iwrite) => { if let Err(e) = result {
eprintln!("Transfer failed: {}", e);
}},
};
} Testing it with netcat, I'm able to get text from the client to the forwardee. But there are 2 problems:
How to further debug that? |
It is enough to just call splice_from to fall into that busy loop. pub async fn forward(mut src: In, mut dest: Out) -> io::Result<u64> {
let (mut pipe_read, mut pipe_write) = tokio_pipe::pipe()?;
let mut total_count = 0;
loop {
eprintln!("Waiting for more data...");
let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
eprintln!("Received {} bytes", count);
if count == 0 {
break;
}
total_count += count as u64;
}
Ok(total_count)
} Logs:
|
...
eprintln!("Waiting for readability...");
src.readable().await?;
eprintln!("Reading data...");
let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
eprintln!("Received {} bytes", count);
... results in:
|
https://zephyr.moe/2021/07/23/rust-zero-copy/ They do this hack: // read until the socket buffer is empty
// or the pipe is filled
// clear readiness (EPOLLIN)
r.read(&mut [0u8; 0]).await?; Can it be related? |
This is obiously incorrect in general, but clearing readiness flag immediately after reading manually helps: eprintln!("Reading data...");
let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
eprintln!("Received {} bytes", count);
let mut ready = src.readable().await?;
ready.clear_ready(); |
I see where the errors come from, will update the PR. |
@pkolaczk I updated the PR, can you remove the workaround and try again please? |
Also, I recommend you to replace the following lines:
with:
This is much easier to reason about than using Other alternative would be |
Thanks for the fixes, I'll test them soon. Select also allows to run both futures concurrently and automatically closes one half when the other half gets closed. A single thread is enough here, with no zero copy I was able to get over 700 MB/s, and netcat/dd was a bottleneck. |
That's good to hear. Note that |
I tried the fix and it still doesn't work. It just swapped one problem for another one. Spinning does not happen anymore.
Code: loop {
eprintln!("Reading data...");
let mut count = pipe_write.splice_from(&mut src, None, 4096).await?;
eprintln!("Received {} bytes", count);
if count == 0 {
break;
}
total_count += count as u64;
while count > 0 {
eprintln!("Writing {} bytes", count);
let sent_count = pipe_read.splice_to(&mut dest, None, count, true).await?;
count -= sent_count;
}
eprintln!("Writes done");
} Also, lookng at your patch, it looks like you're clearing the flags unconditionally. You need to check 'would block' status on both read and write side and clear the flags accordingly.
|
Also, one more suggestion (unrelated to splicing) - maybe you could make the API accept the pub async fn zero_copy<X, Y, R, W>(mut r: R, mut w: W) -> io::Result<()>
where
X: AsRawFd,
Y: AsRawFd,
R: AsyncRead + AsRef<X> + Unpin,
W: AsyncWrite + AsRef<Y> + Unpin, This is because |
That's a good idea. I can simply use: reader.read(&mut []).await and
|
@pkolaczk I am trying to implement |
I can see you can zero-copy-move data between the pipes.
Is there a way to move the data from a socket to a pipe and from a pipe to a socket by using splice/tee?
The text was updated successfully, but these errors were encountered: