Skip to content

Commit

Permalink
fix(provider): Do not overflow LRU cache capacity in ChainStreamPoller (
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc authored and ben186 committed Jul 27, 2024
1 parent baa8ad3 commit 22b635d
Showing 1 changed file with 76 additions and 1 deletion.
77 changes: 76 additions & 1 deletion crates/provider/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ impl<T: Transport + Clone, N: Network> ChainStreamPoller<T, N> {
}

pub(crate) fn new(client: WeakClient<T>) -> Self {
Self::with_next_yield(client, NO_BLOCK_NUMBER)
}

/// Can be used to force the poller to start at a specific block number.
/// Mostly useful for tests.
fn with_next_yield(client: WeakClient<T>, next_yield: BlockNumber) -> Self {
Self {
client: client.clone(),
poll_task: PollerBuilder::new(client, "eth_blockNumber", ()),
next_yield: NO_BLOCK_NUMBER,
next_yield,
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
_phantom: PhantomData,
}
Expand Down Expand Up @@ -106,8 +112,77 @@ impl<T: Transport + Clone, N: Network> ChainStreamPoller<T, N> {
}
};
self.known_blocks.put(number, block);
if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
// Cache is full, should be consumed before filling more blocks.
debug!(number, "cache full");
break;
}
}
}
}
}
}

#[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand.
mod tests {
use std::{future::Future, time::Duration};

use crate::{ext::AnvilApi, ProviderBuilder};
use alloy_node_bindings::Anvil;
use alloy_primitives::U256;
use alloy_rpc_client::ReqwestClient;

use super::*;

fn init_tracing() {
let _ = tracing_subscriber::fmt::try_init();
}

async fn with_timeout<T: Future>(fut: T) -> T::Output {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Operation timed out"),
out = fut => out,
}
}

#[tokio::test]
async fn yield_block() {
init_tracing();

let anvil = Anvil::new().spawn();

let client = ReqwestClient::new_http(anvil.endpoint_url());
let poller: ChainStreamPoller<_, Ethereum> =
ChainStreamPoller::with_next_yield(client.get_weak(), 1);
let mut stream = Box::pin(poller.into_stream());

// We will also use provider to manipulate anvil instance via RPC.
let provider = ProviderBuilder::new().on_http(anvil.endpoint_url());
provider.anvil_mine(Some(U256::from(1)), None).await.unwrap();

let block = with_timeout(stream.next()).await.expect("Block wasn't fetched");
assert_eq!(block.header.number, Some(1u64));
}

#[tokio::test]
async fn yield_many_blocks() {
// Make sure that we can process more blocks than fits in the cache.
const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;

init_tracing();

let anvil = Anvil::new().spawn();

let client = ReqwestClient::new_http(anvil.endpoint_url());
let poller: ChainStreamPoller<_, Ethereum> =
ChainStreamPoller::with_next_yield(client.get_weak(), 1);
let stream = Box::pin(poller.into_stream());

// We will also use provider to manipulate anvil instance via RPC.
let provider = ProviderBuilder::new().on_http(anvil.endpoint_url());
provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap();

let blocks = with_timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
assert_eq!(blocks.len(), BLOCKS_TO_MINE);
}
}

0 comments on commit 22b635d

Please sign in to comment.