Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(provider): Do not overflow LRU cache capacity in ChainStreamPoller #1052

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion crates/provider/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ impl<T: Transport + Clone, N: Network> ChainStreamPoller<T, N> {
}

pub(crate) fn new(client: WeakClient<T>) -> Self {
Self::with_next_yield(client, NO_BLOCK_NUMBER)
}

/// Can be used to force the poller to start at a specific block number.
/// Mostly useful for tests.
fn with_next_yield(client: WeakClient<T>, next_yield: BlockNumber) -> Self {
Self {
client: client.clone(),
poll_task: PollerBuilder::new(client, "eth_blockNumber", ()),
next_yield: NO_BLOCK_NUMBER,
next_yield,
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I'm not quite sure why an LRU cache is used here. Wouldn't a simple VecDeque suffice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, imo vecdeque would be more appropriate here actually

_phantom: PhantomData,
}
Expand Down Expand Up @@ -106,8 +112,77 @@ impl<T: Transport + Clone, N: Network> ChainStreamPoller<T, N> {
}
};
self.known_blocks.put(number, block);
if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implication here is that we will have to wait for the next value from poller_task while we technically know already that there are more blocks we can fetch. However, implementing it may be somewhat complicated and feels like a premature optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, imo like this loop's logic is flawed because this allows iterating next_yield..tip before we start yielding blocks again

I consider this a hotfix, but ideally we can solve this my improving this stream impl.

// Cache is full, should be consumed before filling more blocks.
debug!(number, "cache full");
break;
}
}
}
}
}
}

#[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing tests without anvil_mine would be more complex, so I hope it's fine given that tests are run with --all-features in CI.

mod tests {
use std::{future::Future, time::Duration};

use crate::{ext::AnvilApi, ProviderBuilder};
use alloy_node_bindings::Anvil;
use alloy_primitives::U256;
use alloy_rpc_client::ReqwestClient;

use super::*;

fn init_tracing() {
let _ = tracing_subscriber::fmt::try_init();
}

async fn with_timeout<T: Future>(fut: T) -> T::Output {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Operation timed out"),
out = fut => out,
}
}

#[tokio::test]
async fn yield_block() {
init_tracing();

let anvil = Anvil::new().spawn();

let client = ReqwestClient::new_http(anvil.endpoint_url());
let poller: ChainStreamPoller<_, Ethereum> =
ChainStreamPoller::with_next_yield(client.get_weak(), 1);
let mut stream = Box::pin(poller.into_stream());

// We will also use provider to manipulate anvil instance via RPC.
let provider = ProviderBuilder::new().on_http(anvil.endpoint_url());
provider.anvil_mine(Some(U256::from(1)), None).await.unwrap();

let block = with_timeout(stream.next()).await.expect("Block wasn't fetched");
assert_eq!(block.header.number, Some(1u64));
}

#[tokio::test]
async fn yield_many_blocks() {
// Make sure that we can process more blocks than fits in the cache.
const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;

init_tracing();

let anvil = Anvil::new().spawn();

let client = ReqwestClient::new_http(anvil.endpoint_url());
let poller: ChainStreamPoller<_, Ethereum> =
ChainStreamPoller::with_next_yield(client.get_weak(), 1);
let stream = Box::pin(poller.into_stream());

// We will also use provider to manipulate anvil instance via RPC.
let provider = ProviderBuilder::new().on_http(anvil.endpoint_url());
provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap();

let blocks = with_timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
assert_eq!(blocks.len(), BLOCKS_TO_MINE);
}
}