Skip to content

Commit

Permalink
Support for the ESP-IDF framework
Browse files Browse the repository at this point in the history
  • Loading branch information
imarkov authored and ivmarkov committed Aug 3, 2023
1 parent c86c389 commit abf9c0e
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request:
push:
branches:
- master
- espidf
schedule:
- cron: '0 2 * * 0'

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tracing = { version = "0.1.37", default-features = false }

[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies]
libc = "0.2.77"
rustix = { version = "0.37.11", features = ["process", "time", "fs", "std"], default-features = false }
rustix = { version = "0.38.2", features = ["event", "pipe", "process", "time", "fs", "std"], default-features = false }

[target.'cfg(windows)'.dependencies]
concurrent-queue = "2.2.0"
Expand Down
29 changes: 17 additions & 12 deletions src/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::time::Duration;

use rustix::event::{epoll, eventfd, EventfdFlags};
use rustix::fd::OwnedFd;
use rustix::io::{epoll, eventfd, read, write, EventfdFlags};
use rustix::io::{read, write};
use rustix::time::{
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
Timespec,
Expand All @@ -31,7 +32,7 @@ impl Poller {
// Create an epoll instance.
//
// Use `epoll_create1` with `EPOLL_CLOEXEC`.
let epoll_fd = epoll::epoll_create(epoll::CreateFlags::CLOEXEC)?;
let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;

// Set up eventfd and timerfd.
let event_fd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
Expand Down Expand Up @@ -94,10 +95,10 @@ impl Poller {
);
let _enter = span.enter();

epoll::epoll_add(
epoll::add(
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
ev.key as u64,
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
)?;

Expand All @@ -114,10 +115,10 @@ impl Poller {
);
let _enter = span.enter();

epoll::epoll_mod(
epoll::modify(
&self.epoll_fd,
unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
ev.key as u64,
epoll::EventData::new_u64(ev.key as u64),
epoll_flags(&ev, mode),
)?;

Expand All @@ -133,7 +134,7 @@ impl Poller {
);
let _enter = span.enter();

epoll::epoll_del(&self.epoll_fd, unsafe {
epoll::delete(&self.epoll_fd, unsafe {
rustix::fd::BorrowedFd::borrow_raw(fd)
})?;

Expand Down Expand Up @@ -195,7 +196,7 @@ impl Poller {
};

// Wait for I/O events.
epoll::epoll_wait(&self.epoll_fd, &mut events.list, timeout_ms)?;
epoll::wait(&self.epoll_fd, &mut events.list, timeout_ms)?;
tracing::trace!(
epoll_fd = ?self.epoll_fd.as_raw_fd(),
res = ?events.list.len(),
Expand Down Expand Up @@ -310,10 +311,14 @@ impl Events {

/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list.iter().map(|(flags, data)| Event {
key: data as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
self.list.iter().map(|event| {
let flags = event.flags;

Event {
key: event.data.u64() as usize,
readable: flags.intersects(read_flags()),
writable: flags.intersects(write_flags()),
}
})
}
}
3 changes: 2 additions & 1 deletion src/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::time::Duration;

use rustix::event::kqueue;
use rustix::fd::OwnedFd;
use rustix::io::{fcntl_setfd, kqueue, Errno, FdFlags};
use rustix::io::{fcntl_setfd, Errno, FdFlags};

use crate::{Event, PollMode};

Expand Down
4 changes: 2 additions & 2 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::io;
use std::process::Child;
use std::time::Duration;

use rustix::io::kqueue;
use rustix::event::kqueue;

use super::__private::PollerSealed;
use __private::FilterSealed;
Expand Down Expand Up @@ -238,7 +238,7 @@ unsafe impl FilterSealed for Timer {
impl Filter for Timer {}

mod __private {
use rustix::io::kqueue;
use rustix::event::kqueue;

#[doc(hidden)]
pub unsafe trait FilterSealed {
Expand Down
117 changes: 94 additions & 23 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::io;
use std::mem;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};

use rustix::event::{poll, PollFd, PollFlags};
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd};
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{
fcntl_getfd, fcntl_setfd, pipe, pipe_with, poll, read, write, FdFlags, PipeFlags, PollFd,
PollFlags,
};
use rustix::io::{read, write};

// std::os::unix doesn't exist on Fuchsia
type RawFd = std::os::raw::c_int;
Expand All @@ -27,13 +25,17 @@ pub struct Poller {

/// The file descriptor of the read half of the notify pipe. This is also stored as the first
/// file descriptor in `fds.poll_fds`.
///
/// On ESP-IDF this file descriptor represents an eventfd handle rather than the read half of a pipe.
notify_read: OwnedFd,
/// The file descriptor of the write half of the notify pipe.
///
/// Data is written to this to wake up the current instance of `wait`, which can occur when the
/// user notifies it (in which case `notified` would have been set) or when an operation needs
/// to occur (in which case `waiting_operations` would have been incremented).
notify_write: OwnedFd,
///
/// On ESP-IDF this file descriptor is set to None.
notify_write: Option<OwnedFd>,

/// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the
/// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero
Expand Down Expand Up @@ -75,19 +77,60 @@ struct FdData {
impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
// Create the notification pipe.
let (notify_read, notify_write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
let (notify_read, notify_write) = pipe()?;
fcntl_setfd(&notify_read, fcntl_getfd(&notify_read)? | FdFlags::CLOEXEC)?;
fcntl_setfd(
&notify_write,
fcntl_getfd(&notify_write)? | FdFlags::CLOEXEC,
)?;
io::Result::Ok((notify_read, notify_write))
})?;

// Put the reading side into non-blocking mode.
fcntl_setfl(&notify_read, fcntl_getfl(&notify_read)? | OFlags::NONBLOCK)?;
#[cfg(not(target_os = "espidf"))]
let (notify_read, notify_write) = {
// Create the notification pipe.

use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};

let (notify_read, notify_write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
let (notify_read, notify_write) = pipe()?;
fcntl_setfd(&notify_read, fcntl_getfd(&notify_read)? | FdFlags::CLOEXEC)?;
fcntl_setfd(
&notify_write,
fcntl_getfd(&notify_write)? | FdFlags::CLOEXEC,
)?;
io::Result::Ok((notify_read, notify_write))
})?;

// Put the reading side into non-blocking mode.
fcntl_setfl(&notify_read, fcntl_getfl(&notify_read)? | OFlags::NONBLOCK)?;

(notify_read, Some(notify_write))
};

#[cfg(target_os = "espidf")]
let (notify_read, notify_write) = {
// Note that the eventfd() implementation in ESP-IDF deviates from the specification in the following ways:
// 1) The file descriptor is always in a non-blocking mode, as if EFD_NONBLOCK was passed as a flag;
// passing EFD_NONBLOCK or calling fcntl(.., F_GETFL/F_SETFL) on the eventfd() file descriptor is not supported
// 2) It always returns the counter value, even if it is 0. This is contrary to the specification which mandates
// that it should instead fail with EAGAIN
//
// (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway
// (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified

// TODO: Upstream the `eventfd` syscall to libc and rustix for ESP IDF

// use rustix::fd::FromRawFd;

// extern "C" {
// fn eventfd(initval: u32, flags: u32) -> RawFd;
// }

// let fd = unsafe { eventfd(0, 0) };
// if fd == -1 {
// return Err(std::io::ErrorKind::Other.into());
// }

// (unsafe { OwnedFd::from_raw_fd(fd) }, None)

use rustix::event::{eventfd, EventfdFlags};

(eventfd(0, EventfdFlags::empty())?, None)
};

tracing::trace!(?notify_read, ?notify_write, "new");

Expand Down Expand Up @@ -120,7 +163,13 @@ impl Poller {

/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
if fd == self.notify_read.as_raw_fd() || fd == self.notify_write.as_raw_fd() {
if fd == self.notify_read.as_raw_fd()
|| self
.notify_write
.as_ref()
.map(|notify_write| fd == notify_write.as_raw_fd())
.unwrap_or(false)
{
return Err(io::Error::from(io::ErrorKind::InvalidInput));
}

Expand Down Expand Up @@ -253,7 +302,15 @@ impl Poller {

// Read all notifications.
if notified {
while read(&self.notify_read, &mut [0; 64]).is_ok() {}
if self.notify_write.is_some() {
// When using the `pipe` syscall, we have to read all accumulated notifications in the pipe.
while read(&self.notify_read, &mut [0; 64]).is_ok() {}
} else {
// When using the `eventfd` syscall, it is OK to read just once, so as to clear the counter.
// In fact, reading in a loop will result in an endless loop on the ESP-IDF
// which is not following the specification strictly.
let _ = self.pop_notification();
}
}

// If the only event that occurred during polling was notification and it wasn't to
Expand Down Expand Up @@ -339,13 +396,27 @@ impl Poller {

/// Wake the current thread that is calling `wait`.
fn notify_inner(&self) -> io::Result<()> {
write(&self.notify_write, &[0; 1])?;
if let Some(notify_write) = self.notify_write.as_ref() {
// `pipe`
write(notify_write, &[0; 1])?;
} else {
// `eventfd`
write(&self.notify_read, &1u64.to_ne_bytes())?;
}

Ok(())
}

/// Remove a notification created by `notify_inner`.
fn pop_notification(&self) -> io::Result<()> {
read(&self.notify_read, &mut [0; 1])?;
if self.notify_write.is_some() {
// `pipe`
read(&self.notify_read, &mut [0; 1])?;
} else {
// `eventfd`
read(&self.notify_read, &mut [0; mem::size_of::<u64>()])?;
}

Ok(())
}
}
Expand Down

0 comments on commit abf9c0e

Please sign in to comment.