Skip to content

Commit

Permalink
Fix too many 429 response
Browse files Browse the repository at this point in the history
Fixed #1229

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
  • Loading branch information
NobodyXu committed Aug 2, 2023
1 parent 7c2ddd9 commit 77aef10
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 113 deletions.
25 changes: 0 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/binstalk-downloader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread", "sync", "time", "fs"], default-features = false }
tokio-tar = "0.3.0"
tokio-util = { version = "0.7.8", features = ["io"] }
tower = { version = "0.4.13", features = ["limit", "util"] }
tracing = "0.1.37"
# trust-dns-resolver must be kept in sync with the version reqwest uses
trust-dns-resolver = { version = "0.22.0", optional = true, default-features = false, features = ["dnssec-ring"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/binstalk-downloader/src/gh_api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod request;
pub use request::{GhApiContextError, GhApiError, GhGraphQLErrors};

/// default retry duration if x-ratelimit-reset is not found in response header
const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(5 * 60);
const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(10 * 60);

fn percent_encode_http_url_path(path: &str) -> PercentEncode<'_> {
/// https://url.spec.whatwg.org/#fragment-percent-encode-set
Expand Down
18 changes: 7 additions & 11 deletions crates/binstalk-downloader/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use reqwest::{
Request,
};
use thiserror::Error as ThisError;
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
use tracing::{debug, info};

pub use reqwest::{header, Error as ReqwestError, Method, StatusCode};
Expand Down Expand Up @@ -73,15 +72,18 @@ impl HttpError {
#[derive(Debug)]
struct Inner {
client: reqwest::Client,
service: DelayRequest<RateLimit<reqwest::Client>>,
service: DelayRequest,
}

#[derive(Clone, Debug)]
pub struct Client(Arc<Inner>);

#[cfg_attr(not(feature = "__tls"), allow(unused_variables, unused_mut))]
impl Client {
/// * `per` - must not be 0.
/// * `per` - must not be 0, could be increased if rate-limit
/// happens.
/// This can be no larger than 60s, it will get truncated
/// to 60s if it is larger than that.
/// * `num_request` - maximum number of requests to be processed for
/// each `per` duration.
///
Expand Down Expand Up @@ -122,11 +124,7 @@ impl Client {

Ok(Client(Arc::new(Inner {
client: client.clone(),
service: DelayRequest::new(
ServiceBuilder::new()
.rate_limit(num_request.get(), per)
.service(client),
),
service: DelayRequest::new(num_request, per.min(Duration::from_secs(60)), client),
})))
}

Expand Down Expand Up @@ -159,9 +157,7 @@ impl Client {
url: &Url,
) -> Result<ControlFlow<reqwest::Response, Result<reqwest::Response, ReqwestError>>, ReqwestError>
{
let future = (&self.0.service).ready().await?.call(request);

let response = match future.await {
let response = match self.0.service.call(request).await {
Err(err) if err.is_timeout() || err.is_connect() => {
let duration = RETRY_DURATION_FOR_TIMEOUT;

Expand Down
232 changes: 170 additions & 62 deletions crates/binstalk-downloader/src/remote/delay_request.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
use std::{
collections::HashMap,
future::Future,
iter::Peekable,
pin::Pin,
collections::HashMap, future::Future, iter::Peekable, num::NonZeroU64, ops::ControlFlow,
sync::Mutex,
task::{Context, Poll},
};

use compact_str::{CompactString, ToCompactString};
use reqwest::{Request, Url};
use tokio::{
sync::Mutex as AsyncMutex,
time::{sleep_until, Duration, Instant},
};
use tower::{Service, ServiceExt};
use tokio::time::{sleep_until, Duration, Instant};
use tracing::debug;

pub(super) type RequestResult = Result<reqwest::Response, reqwest::Error>;

trait IterExt: Iterator {
fn dedup(self) -> Dedup<Self>
Expand Down Expand Up @@ -47,15 +42,107 @@ where
}

#[derive(Debug)]
pub(super) struct DelayRequest<S> {
inner: AsyncMutex<S>,
struct Inner {
client: reqwest::Client,
num_request: NonZeroU64,
per: Duration,
until: Instant,
state: State,
}

#[derive(Debug)]
enum State {
Limited,
Ready { rem: NonZeroU64 },
}

impl Inner {
fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self {
Inner {
client,
per,
num_request,
until: Instant::now() + per,
state: State::Ready { rem: num_request },
}
}

fn inc_rate_limit(&mut self) {
if let Some(num_request) = NonZeroU64::new(self.num_request.get() / 2) {
// If self.num_request.get() > 1, then cut it by half
self.num_request = num_request;
if let State::Ready { rem, .. } = &mut self.state {
*rem = num_request.min(*rem)
}
}

let per = self.per;
if per < Duration::from_millis(700) {
self.per = per.mul_f32(1.2);
self.until += self.per - per;
}
}

fn ready(&mut self) -> Readiness {
match self.state {
State::Ready { .. } => Readiness::Ready,
State::Limited => {
if self.until.elapsed().is_zero() {
Readiness::Limited(self.until)
} else {
// rate limit can be reset now and is ready
self.until = Instant::now() + self.per;
self.state = State::Ready {
rem: self.num_request,
};

Readiness::Ready
}
}
}
}

fn call(&mut self, req: Request) -> impl Future<Output = RequestResult> {
match &mut self.state {
State::Ready { rem } => {
let now = Instant::now();

// If the period has elapsed, reset it.
if now >= self.until {
self.until = now + self.per;
*rem = self.num_request;
}

if let Some(new_rem) = NonZeroU64::new(rem.get() - 1) {
*rem = new_rem;
} else {
// The service is disabled until further notice
self.state = State::Limited;
}

// Call the inner future
self.client.execute(req)
}
State::Limited => panic!("service not ready; poll_ready must be called first"),
}
}
}

enum Readiness {
Limited(Instant),
Ready,
}

#[derive(Debug)]
pub(super) struct DelayRequest {
inner: Mutex<Inner>,
hosts_to_delay: Mutex<HashMap<CompactString, Instant>>,
}

impl<S> DelayRequest<S> {
pub(super) fn new(inner: S) -> Self {
impl DelayRequest {
pub(super) fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self {
Self {
inner: AsyncMutex::new(inner),
inner: Mutex::new(Inner::new(num_request, per, client)),
hosts_to_delay: Default::default(),
}
}
Expand All @@ -78,61 +165,82 @@ impl<S> DelayRequest<S> {
});
}

fn wait_until_available(&self, url: &Url) -> impl Future<Output = ()> + Send + 'static {
fn get_delay_until(&self, host: &str) -> Option<Instant> {
let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap();

let deadline = url
.host_str()
.and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host)))
.and_then(|(deadline, host)| {
if deadline.elapsed().is_zero() {
Some(deadline)
} else {
// We have already gone past the deadline,
// so we should remove it instead.
hosts_to_delay.remove(host);
None
}
});

async move {
if let Some(deadline) = deadline {
sleep_until(deadline).await;
hosts_to_delay.get(host).copied().and_then(|until| {
if until.elapsed().is_zero() {
Some(until)
} else {
// We have already gone past the deadline,
// so we should remove it instead.
hosts_to_delay.remove(host);
None
}
}
})
}
}

impl<'this, S> Service<Request> for &'this DelayRequest<S>
where
S: Service<Request> + Send,
S::Future: Send,
{
type Response = S::Response;
type Error = S::Error;
// TODO: Replace this with `type_alias_impl_trait` once it stablises
// https://github.com/rust-lang/rust/issues/63063
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'this>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
// Define a new function so that the guard will be dropped ASAP and not
// included in the future.
fn call_inner(
&self,
counter: &mut u32,
req: &mut Option<Request>,
) -> ControlFlow<impl Future<Output = RequestResult>, Instant> {
// Wait until we are ready to send next requests
// (client-side rate-limit throttler).
let mut guard = self.inner.lock().unwrap();

if let Readiness::Limited(until) = guard.ready() {
debug!("client-side rate limit exceeded; sleeping.");
ControlFlow::Continue(until)
} else if let Some(until) = req
.as_ref()
.unwrap()
.url()
.host_str()
.and_then(|host| self.get_delay_until(host))
{
// If the host rate-limit us, then wait until then
// and try again (server-side rate-limit throttler).

fn call(&mut self, req: Request) -> Self::Future {
let this = *self;
// Try increasing client-side rate-limit throttler to prevent
// rate-limit in the future.
guard.inc_rate_limit();

Box::pin(async move {
this.wait_until_available(req.url()).await;
let additional_delay =
Duration::from_millis(200) + Duration::from_millis(100) * 20.min(*counter);

// Reduce critical section:
// - Construct the request before locking
// - Once it is ready, call it and obtain
// the future, then release the lock before
// polling the future, which performs network I/O that could
// take really long.
let future = this.inner.lock().await.ready().await?.call(req);
*counter += 1;

future.await
})
debug!("server-side rate limit exceeded; sleeping.");
ControlFlow::Continue(until + additional_delay)
} else {
ControlFlow::Break(guard.call(req.take().unwrap()))
}
}

pub(super) async fn call(&self, req: Request) -> RequestResult {
// Put all variables in a block so that will be dropped before polling
// the future returned by reqwest.
{
let mut counter = 0;
// Use Option here so that we don't have to move entire `Request`
// twice when calling `self.call_inner` while retain the ability to
// take its value without boxing.
//
// This will be taken when `ControlFlow::Break` is then it will
// break the loop, so it will never call `self.call_inner` with
// a `None`.
let mut req = Some(req);

loop {
match self.call_inner(&mut counter, &mut req) {
ControlFlow::Continue(until) => sleep_until(until).await,
ControlFlow::Break(future) => break future,
}
}
}
.await
}
}
Loading

0 comments on commit 77aef10

Please sign in to comment.