Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ipc): use single buffer and remove manual wakers #69

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ futures-executor = "0.3.29"

hyper = "0.14.27"
tokio = "1.33"
tokio-util = "0.7"
tower = { version = "0.4.13", features = ["util"] }

tracing = "0.1.40"
Expand Down
1 change: 1 addition & 0 deletions crates/transport-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures.workspace = true
pin-project.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["io", "compat"]}
tracing.workspace = true

bytes = "1.5.0"
Expand Down
99 changes: 47 additions & 52 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ pub mod mock;
#[cfg(feature = "mock")]
pub use mock::MockIpcServer;

use std::task::Poll::{Pending, Ready};

use alloy_json_rpc::PubSubItem;
use bytes::{Buf, BytesMut};
use futures::{io::BufReader, ready, AsyncBufRead, AsyncRead, AsyncWriteExt, StreamExt};
use futures::{ready, AsyncRead, AsyncWriteExt, StreamExt};
use interprocess::local_socket::{tokio::LocalSocketStream, ToLocalSocketName};
use std::task::Poll::Ready;
use tokio::select;
use tokio_util::compat::FuturesAsyncReadCompatExt;

type Result<T> = std::result::Result<T, std::io::Error>;

Expand Down Expand Up @@ -113,19 +112,19 @@ impl IpcBackend {
pub struct ReadJsonStream<T> {
/// The underlying reader.
#[pin]
reader: BufReader<T>,
/// A buffer of bytes read from the reader.
reader: tokio_util::compat::Compat<T>,
Copy link
Contributor

@Evalir Evalir Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, although, what a name lol. I wasted some time looking for something like this 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! I was looking for that! i found the old 0.3 compat crate but couldn't find the new one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously wasted a few hours on this myself -.-

same with finding a properly maintained tokio::codec Json implementation.
using SerializeStream is an adequate solution I think

/// A buffer for reading data from the reader.
buf: BytesMut,
/// A buffer of items deserialized from the reader.
items: Vec<PubSubItem>,
/// Whether the buffer has been drained.
drained: bool,
}

impl<T> ReadJsonStream<T>
where
T: AsyncRead,
{
fn new(reader: T) -> Self {
Self { reader: BufReader::new(reader), buf: BytesMut::with_capacity(4096), items: vec![] }
Self { reader: reader.compat(), buf: BytesMut::with_capacity(4096), drained: true }
}
}

Expand All @@ -148,57 +147,53 @@ where
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
use tokio_util::io::poll_read_buf;

// Deserialize any buffered items.
if !this.buf.is_empty() {
this.reader.consume(this.buf.len());
let mut this = self.project();

tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();
loop {
// try decoding from the buffer, but only if we have new data
if !*this.drained {
tracing::debug!(buf_len = this.buf.len(), "Deserializing buffered IPC data");
let mut de = serde_json::Deserializer::from_slice(this.buf.as_ref()).into_iter();

let item = de.next();
match item {
Some(Ok(response)) => {
this.items.push(response);
}
Some(Err(e)) => {
tracing::error!(%e, "IPC response contained invalid JSON. Buffer contents will be logged at trace level");
tracing::trace!(
buffer = %String::from_utf8_lossy(this.buf.as_ref()),
"IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.",
);
let item = de.next();

return Ready(None);
// advance the buffer
this.buf.advance(de.byte_offset());

match item {
Some(Ok(response)) => {
return Ready(Some(response));
}
Some(Err(e)) => {
tracing::error!(%e, "IPC response contained invalid JSON. Buffer contents will be logged at trace level");
tracing::trace!(
buffer = %String::from_utf8_lossy(this.buf.as_ref()),
"IPC response contained invalid JSON. NOTE: Buffer contents do not include invalid utf8.",
);

return Ready(None);
}
None => {
// nothing decoded
*this.drained = true;
}
}
None => {}
}
this.buf.advance(de.byte_offset());
cx.waker().wake_by_ref();
return Pending;
}

// Return any buffered items, rewaking.
if !this.items.is_empty() {
// may have more work!
cx.waker().wake_by_ref();
return Ready(this.items.pop());
}

tracing::debug!(buf_len = this.buf.len(), "Polling IPC socket for data");
// read more data into the buffer
match ready!(poll_read_buf(this.reader.as_mut(), cx, &mut this.buf)) {
Ok(data_len) => {
tracing::debug!(%data_len, "Read data from IPC socket");

let data = ready!(this.reader.poll_fill_buf(cx));
match data {
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket, shutting down");
Ready(None)
}
Ok(data) => {
tracing::debug!(data_len = data.len(), "Read data from IPC socket");
this.buf.extend_from_slice(data);
// wake task to run deserialization
cx.waker().wake_by_ref();
Pending
// can try decoding again
*this.drained = false;
}
Err(e) => {
tracing::error!(%e, "Failed to read from IPC socket, shutting down");
return Ready(None);
}
}
}
}
Expand Down