Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ipc): use single buffer and remove manual wakers #69

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_json = "1.0"
serde_with = "3.4"
thiserror = "1.0"
tokio = { version = "1.33", features = ["sync", "macros"] }
tokio-util = "0.7"
tower = { version = "0.4.13", features = ["util"] }
tracing = "0.1.40"
url = "2.4"
1 change: 1 addition & 0 deletions crates/transport-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures.workspace = true
pin-project.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["io", "compat"]}
tracing.workspace = true

bytes = "1.5.0"
Expand Down
67 changes: 27 additions & 40 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ pub mod mock;
#[cfg(feature = "mock")]
pub use mock::MockIpcServer;

use std::task::Poll::{Pending, Ready};

use alloy_json_rpc::PubSubItem;
use bytes::{Buf, BytesMut};
use futures::{io::BufReader, ready, AsyncBufRead, AsyncRead, AsyncWriteExt, StreamExt};
use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName};
use std::task::Poll::Ready;
use tokio::select;
use tokio_util::compat::FuturesAsyncReadCompatExt;

type Result<T> = std::result::Result<T, std::io::Error>;

Expand Down Expand Up @@ -113,19 +112,17 @@ impl IpcBackend {
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: BufReader<T>,
/// A buffer of bytes read from the reader.
reader: tokio_util::compat::Compat<T>,
Copy link
Contributor

@Evalir Evalir Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, although, what a name lol. I wasted some time looking for something like this 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously wasted a few hours on this myself -.-

same with finding a properly maintained tokio::codec Json implementation.
using SerializeStream is an adequate solution I think

/// A buffer for reading data from the reader.
buf: BytesMut,
/// A buffer of items deserialized from the reader.
items: Vec<PubSubItem>,
}

impl<T> ReadJsonStream<T>
where
T: AsyncRead,
{
fn new(reader: T) -> Self {
Self { reader: BufReader::new(reader), buf: BytesMut::with_capacity(4096), items: vec![] }
Self { reader: reader.compat(), buf: BytesMut::with_capacity(4096) }
}
}

Expand All @@ -148,19 +145,23 @@ where
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
use tokio_util::io::poll_read_buf;

// Deserialize any buffered items.
if !this.buf.is_empty() {
this.reader.consume(this.buf.len());
let mut this = self.project();

loop {
// try decoding from the buffer
tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instantiating the deserializer performs a new alloc every time the poll is invoked. instead, let's check if the buffer is empty before instantiating, and then after instantiating, drain the whole buffer into an items: Vec<PubSubItem>. Will reduce total allocs significantly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a toggle drained that is set when we have more data/no more items


let item = de.next();

// advance the buffer
this.buf.advance(de.byte_offset());

match item {
Some(Ok(response)) => {
this.items.push(response);
return Ready(Some(response));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the thing here is that you assume the read has EXACTLY ONE item in it. but that may not be the case if geth has sent multiple notifications

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh then i guess the followup would be that the underlying stream re-wakes the task as there's more data?

Copy link
Member Author

@mattsse mattsse Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending

https://docs.rs/futures/latest/futures/stream/trait.Stream.html#tymethod.poll_next

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted.

the way this loop works is:

it drains the buffer by yielding deserialized values (advancing the buffer)
if there are no more items to serialize it fills the buffer by reading from the reader until pending

Copy link
Member

@prestwich prestwich Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont this now fail to wake up again until the socket has more data?

this yields a new value, the caller is supposed to call poll_next again, streams should only register the waker if they are pending

this doesn't mesh with my understanding of my understanding of wakers. AIUI the waker exists to notify the executor that there is additional work (i.e. that something that made us pending has become unblocked), and the executor may never poll a task that does not have a call to Waker::wake or Waker::wake_by_ref. E.g. the example executor would not re-poll this future until the waker stored in the underlying IO re-wakes the task because more data is available to read. So the observed behavior with that executor would be that any time 2 responses are in the buffer, the 2nd one is not read until the ipc socket has even more data available (and that data would not be read until more is available)

when they say "register for waking" I'm pretty sure what they mean is "store a copy of the waker in the local state. this is how, e.g., the tokio::time::sleep works under the hood. it stores the waker from the context of any task passed into it.

as a side note, your code may work in prod because tokio has an optimization where it polls futures again immediately after they return ready (even if the waker is not triggered), in case there's more work. While it may work, I'm not sure it is correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio does not poll the future directly, the user of the stream does by calling poll_next. This happens in a Future, or poll function

the executor deals with the top-level task which contains the stream state machine, and any futures or other streams or other structs that contain that stream, and schedules that task for execution. futures and streams use the wakers to communicate to the executor that the task should be scheduled.

while let Some(next) = stream.next().await

this is a busy loop if the stream is always ready, yes. but as soon as the stream returns pending and the executor switches tasks, there is no guarantee that the executor returns to task before the task's Waker is used to re-wake it.

the implicit "guarantee" that poll_next registers the waker only exists if the stream is pending, per docs.

For example: mpsc::UnboundedReceiver Stream:

https://docs.rs/futures-channel/0.3.29/src/futures_channel/mpsc/mod.rs.html#1084-1090

this is "registering" the waker by adding it to task state to be re-woken when the channel has more items, ensuring that the executor calls poll when items are available, as i described above. ctx.waker().wake_by_ref() is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.waker().wake_by_ref() is not registering the waker in this way, it is requesting that the task be polled again by the executor immediately

right, that's why it isn't necessary to call it when returning an item

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is necessary if our buffer isn't empty, as we have work to do immediately, which wont get done if the task isnt rewoken

Copy link
Member Author

@mattsse mattsse Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new poll loop has 4 exits:

  1. item decoded -> return the item
  2. decoding failed -> terminate
  3. no item decoded, nothing read from the reader -> pending (this ensures we're getting called again as soon as there's more to read)
  4. reading failed -> terminate

I fail to see where we'd need to request another call.
But after writing this I think we can optimize this a bit more and skip the decoding attempt one time when we previously returned pending

Copy link
Member

@prestwich prestwich Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we read every item as soon as it's written, then we don't have a problem

however, if the other end of the socket ever writes 2 items between polls of this stream, thne then next time the stream is polled it will fill the buffer (getting the 2 items), and then deser and return an item. At that point, we have a serialized item in the buffer. If we do not re-wake properly, then the next poll will occur when the asyncread is ready to give us more serialized items for our buffer. That wake will again result in only 1 item being read from the buffer

we need to re-wake until the buffer is empty, otherwise we risk falling significantly behind and filling our buffer

}
Some(Err(e)) => {
tracing::error!(%e, "IPC response contained invalid JSON. Buffer contents will be logged at trace level");
Expand All @@ -171,34 +172,20 @@ where

return Ready(None);
}
None => {}
None => {
// nothing decoded
}
}
this.buf.advance(de.byte_offset());
cx.waker().wake_by_ref();
return Pending;
}

// Return any buffered items, rewaking.
if !this.items.is_empty() {
// may have more work!
cx.waker().wake_by_ref();
return Ready(this.items.pop());
}

tracing::debug!(buf_len = this.buf.len(), "Polling IPC socket for data");

let data = ready!(this.reader.poll_fill_buf(cx));
match data {
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket, shutting down");
Ready(None)
}
Ok(data) => {
tracing::debug!(data_len = data.len(), "Read data from IPC socket");
this.buf.extend_from_slice(data);
// wake task to run deserialization
cx.waker().wake_by_ref();
Pending
// read more data into the buffer
match ready!(poll_read_buf(this.reader.as_mut(), cx, &mut this.buf)) {
Ok(data_len) => {
tracing::debug!(%data_len, "Read data from IPC socket");
}
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket, shutting down");
return Ready(None);
}
}
}
}
Expand Down