Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncFd reports socket misleadingly as readable and hangs in an endless loop #4549

Closed
eddi0815 opened this issue Mar 1, 2022 · 8 comments
Closed
Assignees
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io

Comments

@eddi0815
Copy link

eddi0815 commented Mar 1, 2022

Version

$ cargo tree | grep tokio
└── tokio v1.17.0
    └── tokio-macros v1.7.0 (proc-macro)

Platform
Linux HOSTNAME 5.10.0-11-amd64 #1 SMP Debian 5.10.92-1 (2022-01-18) x86_64 GNU/Linux

Description
Socket is reported misleadingly as readable. But it isn't. The command returns WOULDBLOCK but the socket is reported still as readable. So I encounter an endless loop which is never leaved. This issue started from a discussion: #4529

I want to use a plain Linux socket/file descriptor with Tokio and the help of AsyncFd. I created an example where socket2::Socket is a replacement for the stuff I want to integrate and I was able to reproduce the behavior. If I start the test communication_async_multi_thread the underlying accept is called in an endless loop. This happens not on the first or second time. This test needs to be executed multiple times. But after some tries accept enters an endless loop because the socket is reported as readable. This should not happen. The test communication_async_current_thread works as expected. I was able to execute it a few dozen times without problems. But maybe on this configuration this problems happens not so often and the problem is not revealed.

use std::{io::Error, net::SocketAddr, os::unix::prelude::AsRawFd};

use tokio::io::unix::AsyncFd;

struct Socket {
    socket: socket2::Socket,
}

impl AsRawFd for Socket {
    fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
        self.socket.as_raw_fd()
    }
}

impl Socket {
    pub fn new() -> std::io::Result<Self> {
        let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?;
        let socket_ref = socket2::SockRef::from(&socket);
        socket_ref.set_reuse_address(true)?;
        socket_ref.set_reuse_port(true)?;

        Ok(Self { socket })
    }

    pub fn set_nonblocking(&self) -> std::io::Result<()> {
        self.socket.set_nonblocking(true)?;

        Ok(())
    }

    pub fn bind(&self, local_address: std::net::SocketAddr) -> std::io::Result<()> {
        self.socket.bind(&socket2::SockAddr::from(local_address))?;

        Ok(())
    }

    pub fn listen(&self) -> std::io::Result<()> {
        self.socket.listen(128)?;

        Ok(())
    }

    pub fn accept(&self) -> std::io::Result<(Socket, std::net::SocketAddr)> {
        println!("accept");
        let (peer_socket, peer_address) = self.socket.accept()?;

        Ok((
            Self {
                socket: peer_socket,
            },
            peer_address.as_socket().unwrap(),
        ))
    }
}

pub struct AsyncSocket {
    socket: AsyncFd<Socket>,
}

impl AsRawFd for AsyncSocket {
    fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
        self.socket.as_raw_fd()
    }
}

impl AsyncSocket {
    pub fn new() -> std::io::Result<Self> {
        let socket = Socket::new()?;
        socket.set_nonblocking()?;

        Ok(Self {
            socket: AsyncFd::new(socket)?,
        })
    }

    pub fn bind(&self, local_address: std::net::SocketAddr) -> std::io::Result<()> {
        self.socket.get_ref().bind(local_address)?;

        Ok(())
    }

    pub fn listen(&self) -> std::io::Result<()> {
        self.socket.get_ref().listen()?;

        Ok(())
    }

    pub async fn accept(&self) -> std::io::Result<(AsyncSocket, SocketAddr)> {
        loop {
            let mut guard = self.socket.readable().await?;

            match guard.try_io(|inner| inner.get_ref().accept()) {
                Ok(result) => match result {
                    Ok((socket, address)) => {
                        return Ok((
                            AsyncSocket {
                                socket: AsyncFd::new(socket)?,
                            },
                            address,
                        ))
                    }
                    Err(e) => return Err(e),
                },
                Err(_would_block) => continue,
            }
        }
    }
}

#[cfg(test)]
mod tests {

    use std::time::Duration;

    use super::*;

    pub fn create_address<A: std::net::ToSocketAddrs>(
        address: A,
    ) -> std::io::Result<std::net::SocketAddr> {
        Ok(address.to_socket_addrs()?.next().unwrap())
    }

    //Needs to be executed multiple times. After some tries accept hangs in an endless loop and never cames back
    #[tokio::test(flavor = "multi_thread")]
    async fn communication_async_multi_thread() {
        let server_address = create_address(("127.0.0.1", 5002)).unwrap();

        let server = AsyncSocket::new().unwrap();
        server.bind(server_address).unwrap();
        server.listen().unwrap();

        match tokio::time::timeout(Duration::from_millis(3000), server.accept()).await {
            Ok(_) => {
                println!("Accepted")
            }
            Err(_) => {
                println!("Got timeout")
            }
        }
    }

    //Works as exepected. Even after dozens of tries.
    #[tokio::test]
    async fn communication_async_current_thread() {
        let server_address = create_address(("127.0.0.1", 5002)).unwrap();

        let server = AsyncSocket::new().unwrap();
        server.bind(server_address).unwrap();
        server.listen().unwrap();

        match tokio::time::timeout(Duration::from_millis(3000), server.accept()).await {
            Ok(_) => {
                println!("Accepted")
            }
            Err(_) => {
                println!("Got timeout")
            }
        }
    }
}

I created also a binary for testing purposes with the same problem.

use std::time::Duration;
use tokio::runtime::Builder;

use async_socket2::AsyncSocket;

pub fn create_address<A: std::net::ToSocketAddrs>(
    address: A,
) -> std::io::Result<std::net::SocketAddr> {
    Ok(address.to_socket_addrs()?.next().unwrap())
}

//Needs to be executed multiple times. After some tries with 'new_multi_thread' accept hangs in an endless loop and never cames back
fn main() {
    //Works as expected
    //let rt = Builder::new_current_thread().enable_io().enable_time().build().unwrap();
    //Problems here.
    let rt = Builder::new_multi_thread()
        .enable_io()
        .enable_time()
        .build()
        .unwrap();

    rt.block_on(async {
        let server_address = create_address(("127.0.0.1", 5008)).unwrap();

        let server = AsyncSocket::new().unwrap();
        server.bind(server_address).unwrap();
        server.listen().unwrap();

        match tokio::time::timeout(Duration::from_millis(3000), server.accept()).await {
            Ok(_) => {
                println!("Accepted")
            }
            Err(_) => {
                println!("Got timeout")
            }
        }
    });
}
@eddi0815 eddi0815 added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Mar 1, 2022
@Darksonn Darksonn added the M-io Module: tokio/io label Mar 1, 2022
@Noah-Kennedy
Copy link
Contributor

I've also repro'd this. I'm gonna take a look at this later tonight.

@Noah-Kennedy Noah-Kennedy self-assigned this Mar 1, 2022
@eddi0815
Copy link
Author

eddi0815 commented Mar 8, 2022

I had time to dig a little bit into this topic with my rough knowledge of Tokio code. These are my findings so far:

  1. There are always two sources registered to Mio. The function Registration::new_with_interest_and_handle is called from AsyncFd and from PollEvented, which is called somewhere from the RunTime (builder). In both modes (current_thread and multi_thread). What is the Token(0) that is always registered?
  2. The endless loop occurs because the socket is reported one time read_closed and write_closed by Mio(epoll). So it is readable and writeable. Which wakes the AsyncFd. But this events are not cleared in https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/driver/scheduled_io.rs#L339 and because of that it never enters the waiting state of the future https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/driver/scheduled_io.rs#L472 and runs forever.

It is intended not to clear read_closed and write_closed as the comment points out. Is it sparsed out to handle the shutdown case?

I haven't found out yet why the socket is reported read_closed and write_closed only on multi_thread configuration.

@eddi0815
Copy link
Author

eddi0815 commented Mar 8, 2022

I think I found the problem. It is a problem on my side and I was able to reproduce it with plain epoll as well.

I am creating the AsynFd in AsyncSocket::new(). There it is registered to epoll. A little bit later I set the underlying socket to listen mode. This triggers EPOLLHUP to the already registered socket which sets the event to read_closed and write_closed and starts the loop.

I should do that all on the right place in new() before AsyncFd is created.

@eddi0815 eddi0815 closed this as completed Mar 8, 2022
@eddi0815
Copy link
Author

eddi0815 commented Mar 9, 2022

It wasn't the full solution. I still face the problem. On server side, if the call to listen() is missing Mio(epoll) repeats immediately read_close/write_close after the call to AsyncFd::new(). This will not happen, if the call to listen() happens.

On client side there is no call to listen(), so I get read_close/write_close after the call to AsyncFd::new() immediately from Mio(epoll). As these events are excluded by clear_readiness(), the socket is always reported readable/writable but it isn't.

So the questions:

  1. Why is read_close/write_close excluded from clear_readiness()?
  2. Why registering an fd with AsyncFd::new() reports immediately read_close/write_close?
use std::time::Duration;
use tokio::io::unix::AsyncFd;

pub fn create_address<A: std::net::ToSocketAddrs>(
    address: A,
) -> std::io::Result<std::net::SocketAddr> {
    Ok(address.to_socket_addrs()?.next().unwrap())
}

#[tokio::main]
async fn main() {
    //chose between client and server
    //server().await;
    client().await;
}

async fn server() {
    let local_address_server = create_address(("127.0.0.1", 5000)).unwrap();

    let server_socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None).unwrap();        
    server_socket.bind(&socket2::SockAddr::from(local_address_server)).unwrap();
    //Comment out listen() and read_closed/write_closed it reported which enters an endless loop because these events are not cleared
    server_socket.listen(128).unwrap();
    server_socket.set_nonblocking(true).unwrap();

    //After calling AsyncFd::new read_closed/write_closed is reported by Mio(epoll) if listen() is commented out
    let mut server = AsyncFd::new(server_socket).unwrap();

    loop {
        let mut guard = server.readable_mut().await.unwrap();
        println!("Server readable. Should not be seen or at least not in a loop");
        tokio::time::sleep(Duration::from_millis(1000)).await;
        guard.clear_ready();
    }
}

async fn client() {
    let local_address_client = create_address(("127.0.0.1", 5001)).unwrap();

    let client_socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None).unwrap();
    client_socket.bind(&socket2::SockAddr::from(local_address_client)).unwrap();
    client_socket.set_nonblocking(true).unwrap();

    //After calling AsyncFd::new read_closed/write_closed is reported by Mio(epoll)
    let mut client = AsyncFd::new(client_socket).unwrap();

    loop {
        let mut guard = client.readable_mut().await.unwrap();
        println!("Client readable. Should not be seen or at least not in a loop");
        tokio::time::sleep(Duration::from_millis(1000)).await;
        guard.clear_ready();
    }
}

@eddi0815 eddi0815 reopened this Mar 9, 2022
@Darksonn
Copy link
Contributor

Darksonn commented Mar 9, 2022

Are you not missing a call to connect? And besides, why aren't you using tokio::net::TcpSocket instead of AsyncFd?

@eddi0815
Copy link
Author

eddi0815 commented Mar 9, 2022

You are right. I have to call first connect and then register it to epoll. Otherwise I get EPOLLHUP which is read_close/write_close. I will try that tomorrow. I missed that.

I use socket2 as a replacement here as an example. On my project I need to use a third party C library that I want to use with Tokio and a FFI I created. This has formal/regulatory reasons that I cannot replace the C-Lib with beautiful Rust stuff. That would be to easy.

@Darksonn
Copy link
Contributor

Darksonn commented Mar 9, 2022

In any case, you should be sure to study how TcpSocket::connect handles connecting, because it it somewhat subtle to do correctly.

@eddi0815
Copy link
Author

Finally I was able to get my stuff running and it works as expected. Thanks for help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io
Projects
None yet
Development

No branches or pull requests

3 participants