diff --git a/crates/net/p2p/src/full_block.rs b/crates/net/p2p/src/full_block.rs index fcee4c52b13e..724290ec3f65 100644 --- a/crates/net/p2p/src/full_block.rs +++ b/crates/net/p2p/src/full_block.rs @@ -4,7 +4,6 @@ use crate::{ error::PeerRequestResult, headers::client::{HeadersClient, SingleHeaderRequest}, }; -use futures::Stream; use reth_consensus::{Consensus, ConsensusError}; use reth_eth_wire_types::HeadersDirection; use reth_network_peers::WithPeerId; @@ -634,69 +633,6 @@ where } } -/// A type that buffers the result of a range request so we can return it as a `Stream`. -struct FullBlockRangeStream -where - Client: BodiesClient + HeadersClient, -{ - /// The inner [`FetchFullBlockRangeFuture`] that is polled. - inner: FetchFullBlockRangeFuture, - /// The blocks that have been received so far. - /// - /// If this is `None` then the request is still in progress. If the vec is empty, then all of - /// the response values have been consumed. - blocks: Option>, -} - -impl From> for FullBlockRangeStream -where - Client: BodiesClient + HeadersClient, -{ - fn from(inner: FetchFullBlockRangeFuture) -> Self { - Self { inner, blocks: None } - } -} - -impl Stream for FullBlockRangeStream -where - Client: BodiesClient + HeadersClient + Unpin + 'static, -{ - type Item = SealedBlock; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - - // If all blocks have been consumed, then return `None`. - if let Some(blocks) = &mut this.blocks { - if blocks.is_empty() { - // Stream is finished - return Poll::Ready(None) - } - - // return the next block if it's ready - the vec should be in ascending order since it - // is reversed right after it is received from the future, so we can just pop() the - // elements to return them from the stream in descending order - return Poll::Ready(blocks.pop()) - } - - // poll the inner future if the blocks are not yet ready - let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx)); - - // the blocks are returned in descending order, reverse the list so we can just pop() the - // vec to yield the next block in the stream - blocks.reverse(); - - // pop the first block from the vec as the first stream element and store the rest - let first_result = blocks.pop(); - - // if the inner future is ready, then we can return the blocks - this.blocks = Some(blocks); - - // return the first block - Poll::Ready(first_result) - } -} - /// A request for a range of full blocks. Polling this will poll the inner headers and bodies /// futures until they return responses. It will return either the header or body result, depending /// on which future successfully returned. @@ -742,7 +678,6 @@ enum RangeResponseResult { mod tests { use super::*; use crate::test_utils::TestFullBlockClient; - use futures::StreamExt; use std::ops::Range; #[tokio::test] @@ -808,43 +743,6 @@ mod tests { } } - #[tokio::test] - async fn download_full_block_range_stream() { - let client = TestFullBlockClient::default(); - let (header, body) = insert_headers_into_client(&client, 0..50); - let client = FullBlockClient::test_client(client); - - let future = client.get_full_block_range(header.hash(), 1); - let mut stream = FullBlockRangeStream::from(future); - - // ensure only block in the stream is the one we requested - let received = stream.next().await.expect("response should not be None"); - assert_eq!(received, SealedBlock::new(header.clone(), body.clone())); - - // stream should be done now - assert_eq!(stream.next().await, None); - - // there are 11 total blocks - let future = client.get_full_block_range(header.hash(), 11); - let mut stream = FullBlockRangeStream::from(future); - - // check first header - let received = stream.next().await.expect("response should not be None"); - let mut curr_number = received.number; - assert_eq!(received, SealedBlock::new(header.clone(), body.clone())); - - // check the rest of the headers - for _ in 0..10 { - let received = stream.next().await.expect("response should not be None"); - assert_eq!(received.number, curr_number - 1); - curr_number = received.number; - } - - // ensure stream is done - let received = stream.next().await; - assert!(received.is_none()); - } - #[tokio::test] async fn download_full_block_range_over_soft_limit() { // default soft limit is 20, so we will request 50 blocks