Skip to content

Commit

Permalink
chore: remove unused private stream type (#9357)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jul 7, 2024
1 parent abf3aff commit 2adf2d3
Showing 1 changed file with 0 additions and 102 deletions.
102 changes: 0 additions & 102 deletions crates/net/p2p/src/full_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
error::PeerRequestResult,
headers::client::{HeadersClient, SingleHeaderRequest},
};
use futures::Stream;
use reth_consensus::{Consensus, ConsensusError};
use reth_eth_wire_types::HeadersDirection;
use reth_network_peers::WithPeerId;
Expand Down Expand Up @@ -634,69 +633,6 @@ where
}
}

/// A type that buffers the result of a range request so we can return it as a `Stream`.
struct FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
/// The inner [`FetchFullBlockRangeFuture`] that is polled.
inner: FetchFullBlockRangeFuture<Client>,
/// The blocks that have been received so far.
///
/// If this is `None` then the request is still in progress. If the vec is empty, then all of
/// the response values have been consumed.
blocks: Option<Vec<SealedBlock>>,
}

impl<Client> From<FetchFullBlockRangeFuture<Client>> for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient,
{
fn from(inner: FetchFullBlockRangeFuture<Client>) -> Self {
Self { inner, blocks: None }
}
}

impl<Client> Stream for FullBlockRangeStream<Client>
where
Client: BodiesClient + HeadersClient + Unpin + 'static,
{
type Item = SealedBlock;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// If all blocks have been consumed, then return `None`.
if let Some(blocks) = &mut this.blocks {
if blocks.is_empty() {
// Stream is finished
return Poll::Ready(None)
}

// return the next block if it's ready - the vec should be in ascending order since it
// is reversed right after it is received from the future, so we can just pop() the
// elements to return them from the stream in descending order
return Poll::Ready(blocks.pop())
}

// poll the inner future if the blocks are not yet ready
let mut blocks = ready!(Pin::new(&mut this.inner).poll(cx));

// the blocks are returned in descending order, reverse the list so we can just pop() the
// vec to yield the next block in the stream
blocks.reverse();

// pop the first block from the vec as the first stream element and store the rest
let first_result = blocks.pop();

// if the inner future is ready, then we can return the blocks
this.blocks = Some(blocks);

// return the first block
Poll::Ready(first_result)
}
}

/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
/// futures until they return responses. It will return either the header or body result, depending
/// on which future successfully returned.
Expand Down Expand Up @@ -742,7 +678,6 @@ enum RangeResponseResult {
mod tests {
use super::*;
use crate::test_utils::TestFullBlockClient;
use futures::StreamExt;
use std::ops::Range;

#[tokio::test]
Expand Down Expand Up @@ -808,43 +743,6 @@ mod tests {
}
}

#[tokio::test]
async fn download_full_block_range_stream() {
let client = TestFullBlockClient::default();
let (header, body) = insert_headers_into_client(&client, 0..50);
let client = FullBlockClient::test_client(client);

let future = client.get_full_block_range(header.hash(), 1);
let mut stream = FullBlockRangeStream::from(future);

// ensure only block in the stream is the one we requested
let received = stream.next().await.expect("response should not be None");
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));

// stream should be done now
assert_eq!(stream.next().await, None);

// there are 11 total blocks
let future = client.get_full_block_range(header.hash(), 11);
let mut stream = FullBlockRangeStream::from(future);

// check first header
let received = stream.next().await.expect("response should not be None");
let mut curr_number = received.number;
assert_eq!(received, SealedBlock::new(header.clone(), body.clone()));

// check the rest of the headers
for _ in 0..10 {
let received = stream.next().await.expect("response should not be None");
assert_eq!(received.number, curr_number - 1);
curr_number = received.number;
}

// ensure stream is done
let received = stream.next().await;
assert!(received.is_none());
}

#[tokio::test]
async fn download_full_block_range_over_soft_limit() {
// default soft limit is 20, so we will request 50 blocks
Expand Down

0 comments on commit 2adf2d3

Please sign in to comment.