Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
refactor: use VecDeque and address review
Browse files Browse the repository at this point in the history
  • Loading branch information
meetmangukiya committed May 16, 2022
1 parent 073cf68 commit 78337d1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
19 changes: 8 additions & 11 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use thiserror::Error;
use url::{ParseError, Url};

use futures_util::{lock::Mutex, try_join};
use std::{convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::VecDeque, convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration,
};
use tracing::trace;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -1102,22 +1104,17 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
where
P: PubsubClient,
{
let logs = match filter.block_option {
let loaded_logs = match filter.block_option {
FilterBlockOption::Range { from_block, to_block: _ } => {
if from_block.is_none() {
Ok(vec![])
vec![]
} else {
self.get_logs(filter).await
self.get_logs(filter).await?
}
}
FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await,
FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await?,
};

if logs.is_err() {
return Err(logs.err().unwrap())
}

let loaded_logs = logs.unwrap();
let loaded_logs = VecDeque::from(loaded_logs);

let logs = utils::serialize(&"logs"); // TODO: Make this a static
let filter = utils::serialize(filter);
Expand Down
11 changes: 6 additions & 5 deletions ethers-providers/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use pin_project::{pin_project, pinned_drop};
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use std::{
collections::VecDeque,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
Expand All @@ -31,7 +32,7 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
pub id: U256,

loaded_elements: Vec<R>,
loaded_elements: VecDeque<R>,

provider: &'a Provider<P>,

Expand All @@ -56,15 +57,15 @@ where
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?;
Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: vec![] })
Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
}

/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
}

pub fn set_loaded_elements(&mut self, loaded_elements: Vec<R>) {
pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
self.loaded_elements = loaded_elements;
}
}
Expand All @@ -81,8 +82,8 @@ where

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.loaded_elements.is_empty() {
let next_element = self.get_mut().loaded_elements.remove(0);
return Poll::Ready(Some(next_element))
let next_element = self.get_mut().loaded_elements.pop_front();
return Poll::Ready(next_element)
}

let this = self.project();
Expand Down

0 comments on commit 78337d1

Please sign in to comment.