diff --git a/src/backoff.rs b/src/backoff.rs index 11f9113..2e867d7 100644 --- a/src/backoff.rs +++ b/src/backoff.rs @@ -29,6 +29,16 @@ impl Default for BackoffConfig { pub type BackoffError = std::convert::Infallible; pub type BackoffResult = Result; +/// Error (which should increase backoff) or throttle for a specific duration (as asked for by the broker). +#[derive(Debug)] +pub enum ErrorOrThrottle +where + E: std::error::Error + Send, +{ + Error(E), + Throttle(Duration), +} + /// [`Backoff`] can be created from a [`BackoffConfig`] /// /// Consecutive calls to [`Backoff::next`] will return the next backoff interval @@ -99,23 +109,32 @@ impl Backoff { ) -> BackoffResult where F: (Fn() -> F1) + Send + Sync, - F1: std::future::Future> + Send, + F1: std::future::Future>> + Send, E: std::error::Error + Send, { loop { - let e = match do_stuff().await { - ControlFlow::Break(r) => break Ok(r), - ControlFlow::Continue(e) => e, + // split match statement from `tokio::time::sleep`, because otherwise rustc requires `B: Send` + let sleep_time = match do_stuff().await { + ControlFlow::Break(r) => { + break Ok(r); + } + ControlFlow::Continue(ErrorOrThrottle::Error(e)) => { + let backoff = self.next(); + info!( + e=%e, + request_name, + backoff_secs = backoff.as_secs(), + "request encountered non-fatal error - backing off", + ); + backoff + } + ControlFlow::Continue(ErrorOrThrottle::Throttle(throttle)) => { + info!(?throttle, request_name, "broker asked us to throttle",); + throttle + } }; - let backoff = self.next(); - info!( - e=%e, - request_name, - backoff_secs = backoff.as_secs(), - "request encountered non-fatal error - backing off", - ); - tokio::time::sleep(backoff).await; + tokio::time::sleep(sleep_time).await; } } } diff --git a/src/client/controller.rs b/src/client/controller.rs index d358edc..9e8a6c0 100644 --- a/src/client/controller.rs +++ b/src/client/controller.rs @@ -5,7 +5,7 @@ use tokio::sync::Mutex; use tracing::{debug, error, info}; use crate::{ - backoff::{Backoff, BackoffConfig}, + backoff::{Backoff, BackoffConfig, ErrorOrThrottle}, client::{Error, Result}, connection::{ BrokerCache, BrokerConnection, BrokerConnector, MessengerTransport, MetadataLookupMode, @@ -16,6 +16,7 @@ use crate::{ messages::{CreateTopicRequest, CreateTopicsRequest}, primitives::{Int16, Int32, String_}, }, + throttle::maybe_throttle, validation::ExactlyOne, }; @@ -63,23 +64,28 @@ impl ControllerClient { }; maybe_retry(&self.backoff_config, self, "create_topic", || async move { - let broker = self.get().await?; - let response = broker.request(request).await?; + let broker = self.get().await.map_err(ErrorOrThrottle::Error)?; + let response = broker + .request(request) + .await + .map_err(|e| ErrorOrThrottle::Error(e.into()))?; + + maybe_throttle(response.throttle_time_ms)?; let topic = response .topics .exactly_one() - .map_err(Error::exactly_one_topic)?; + .map_err(|e| ErrorOrThrottle::Error(Error::exactly_one_topic(e)))?; match topic.error { None => Ok(()), - Some(protocol_error) => Err(Error::ServerError { + Some(protocol_error) => Err(ErrorOrThrottle::Error(Error::ServerError { protocol_error, error_message: topic.error_message.and_then(|s| s.0), request: RequestContext::Topic(topic.name.0), response: None, is_virtual: false, - }), + })), } }) .await?; @@ -150,15 +156,20 @@ async fn maybe_retry( where B: BrokerCache, R: (Fn() -> F) + Send + Sync, - F: std::future::Future> + Send, + F: std::future::Future>> + Send, { let mut backoff = Backoff::new(backoff_config); backoff .retry_with_backoff(request_name, || async { let error = match f().await { - Ok(v) => return ControlFlow::Break(Ok(v)), - Err(e) => e, + Ok(v) => { + return ControlFlow::Break(Ok(v)); + } + Err(ErrorOrThrottle::Throttle(t)) => { + return ControlFlow::Continue(ErrorOrThrottle::Throttle(t)); + } + Err(ErrorOrThrottle::Error(e)) => e, }; match error { @@ -184,7 +195,7 @@ where return ControlFlow::Break(Err(error)); } } - ControlFlow::Continue(error) + ControlFlow::Continue(ErrorOrThrottle::Error(error)) }) .await .map_err(Error::RetryFailed)? diff --git a/src/client/partition.rs b/src/client/partition.rs index 9d56061..13cf7db 100644 --- a/src/client/partition.rs +++ b/src/client/partition.rs @@ -1,5 +1,5 @@ use crate::{ - backoff::{Backoff, BackoffConfig}, + backoff::{Backoff, BackoffConfig, ErrorOrThrottle}, client::error::{Error, RequestContext, Result}, connection::{ BrokerCache, BrokerConnection, BrokerConnector, MessengerTransport, MetadataLookupMode, @@ -19,6 +19,7 @@ use crate::{ record::{Record as ProtocolRecord, *}, }, record::{Record, RecordAndOffset}, + throttle::maybe_throttle, validation::ExactlyOne, }; use async_trait::async_trait; @@ -151,7 +152,7 @@ impl PartitionClient { &*brokers, "leader_detection", || async move { - scope.get().await?; + scope.get().await.map_err(ErrorOrThrottle::Error)?; Ok(()) }, ) @@ -190,9 +191,14 @@ impl PartitionClient { self, "produce", || async move { - let broker = self.get().await?; - let response = broker.request(&request).await?; + let broker = self.get().await.map_err(ErrorOrThrottle::Error)?; + let response = broker + .request(&request) + .await + .map_err(|e| ErrorOrThrottle::Error(e.into()))?; + maybe_throttle(response.throttle_time_ms)?; process_produce_response(self.partition, &self.topic, n, response) + .map_err(ErrorOrThrottle::Error) }, ) .await @@ -221,8 +227,16 @@ impl PartitionClient { self, "fetch_records", || async move { - let response = self.get().await?.request(&request).await?; + let response = self + .get() + .await + .map_err(ErrorOrThrottle::Error)? + .request(&request) + .await + .map_err(|e| ErrorOrThrottle::Error(e.into()))?; + maybe_throttle(response.throttle_time_ms)?; process_fetch_response(self.partition, &self.topic, response, offset) + .map_err(ErrorOrThrottle::Error) }, ) .await?; @@ -248,8 +262,16 @@ impl PartitionClient { self, "get_offset", || async move { - let response = self.get().await?.request(&request).await?; + let response = self + .get() + .await + .map_err(ErrorOrThrottle::Error)? + .request(&request) + .await + .map_err(|e| ErrorOrThrottle::Error(e.into()))?; + maybe_throttle(response.throttle_time_ms)?; process_list_offsets_response(self.partition, &self.topic, response) + .map_err(ErrorOrThrottle::Error) }, ) .await?; @@ -272,8 +294,16 @@ impl PartitionClient { self, "delete_records", || async move { - let response = self.get().await?.request(&request).await?; + let response = self + .get() + .await + .map_err(ErrorOrThrottle::Error)? + .request(&request) + .await + .map_err(|e| ErrorOrThrottle::Error(e.into()))?; + maybe_throttle(Some(response.throttle_time_ms))?; process_delete_records_response(&self.topic, self.partition, response) + .map_err(ErrorOrThrottle::Error) }, ) .await?; @@ -462,15 +492,20 @@ async fn maybe_retry( where B: BrokerCache, R: (Fn() -> F) + Send + Sync, - F: std::future::Future> + Send, + F: std::future::Future>> + Send, { let mut backoff = Backoff::new(backoff_config); backoff .retry_with_backoff(request_name, || async { let error = match f().await { - Ok(v) => return ControlFlow::Break(Ok(v)), - Err(e) => e, + Ok(v) => { + return ControlFlow::Break(Ok(v)); + } + Err(ErrorOrThrottle::Throttle(throttle)) => { + return ControlFlow::Continue(ErrorOrThrottle::Throttle(throttle)); + } + Err(ErrorOrThrottle::Error(e)) => e, }; let retry = match error { @@ -504,7 +539,7 @@ where }; if retry { - ControlFlow::Continue(error) + ControlFlow::Continue(ErrorOrThrottle::Error(error)) } else { error!( e=%error, diff --git a/src/connection.rs b/src/connection.rs index 13af8f0..56bb2c5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,11 +6,13 @@ use thiserror::Error; use tokio::{io::BufStream, sync::Mutex}; use tracing::{debug, error, info, warn}; +use crate::backoff::ErrorOrThrottle; use crate::connection::topology::{Broker, BrokerTopology}; use crate::connection::transport::Transport; use crate::messenger::{Messenger, RequestError}; use crate::protocol::messages::{MetadataRequest, MetadataRequestTopic, MetadataResponse}; use crate::protocol::primitives::String_; +use crate::throttle::maybe_throttle; use crate::{ backoff::{Backoff, BackoffConfig, BackoffError}, client::metadata_cache::MetadataCache, @@ -418,7 +420,7 @@ where "Failed to connect to any broker, backing off".to_string(), ); let err: Arc = err.into(); - ControlFlow::Continue(err) + ControlFlow::Continue(ErrorOrThrottle::Error(err)) }) .await .map_err(Error::RetryFailed) @@ -449,12 +451,18 @@ where }; match broker.metadata_request(request_params).await { - Ok(response) => ControlFlow::Break(Ok(response)), + Ok(response) => { + if let Err(e) = maybe_throttle(response.throttle_time_ms) { + return ControlFlow::Continue(e); + } + + ControlFlow::Break(Ok(response)) + } Err(e @ RequestError::Poisoned(_) | e @ RequestError::IO(_)) if !matches!(metadata_mode, MetadataLookupMode::SpecificBroker(_)) => { arbitrary_broker_cache.invalidate().await; - ControlFlow::Continue(e) + ControlFlow::Continue(ErrorOrThrottle::Error(e)) } Err(error) => { error!( diff --git a/src/lib.rs b/src/lib.rs index 457df5f..1fb9f37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,8 @@ mod protocol; pub mod record; +mod throttle; + pub mod topic; // re-exports diff --git a/src/messenger.rs b/src/messenger.rs index 3d25191..481ca97 100644 --- a/src/messenger.rs +++ b/src/messenger.rs @@ -21,20 +21,24 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::{debug, warn}; - -use crate::protocol::{ - api_key::ApiKey, - api_version::ApiVersion, - frame::{AsyncMessageRead, AsyncMessageWrite}, - messages::{ - ReadVersionedError, ReadVersionedType, RequestBody, RequestHeader, ResponseHeader, - WriteVersionedError, WriteVersionedType, - }, - primitives::{Int16, Int32, NullableString, TaggedFields}, -}; +use tracing::{debug, info, warn}; + use crate::protocol::{api_version::ApiVersionRange, primitives::CompactString}; use crate::protocol::{messages::ApiVersionsRequest, traits::ReadType}; +use crate::{ + backoff::ErrorOrThrottle, + protocol::{ + api_key::ApiKey, + api_version::ApiVersion, + frame::{AsyncMessageRead, AsyncMessageWrite}, + messages::{ + ReadVersionedError, ReadVersionedType, RequestBody, RequestHeader, ResponseHeader, + WriteVersionedError, WriteVersionedType, + }, + primitives::{Int16, Int32, NullableString, TaggedFields}, + }, + throttle::maybe_throttle, +}; #[derive(Debug)] struct Response { @@ -405,7 +409,7 @@ where /// /// Takes `&self mut` to ensure exclusive access. pub async fn sync_versions(&mut self) -> Result<(), SyncVersionsError> { - for upper_bound in (ApiVersionsRequest::API_VERSION_RANGE.min().0 .0 + 'iter_upper_bound: for upper_bound in (ApiVersionsRequest::API_VERSION_RANGE.min().0 .0 ..=ApiVersionsRequest::API_VERSION_RANGE.max().0 .0) .rev() { @@ -425,77 +429,91 @@ where tagged_fields: Some(TaggedFields::default()), }; - match self - .request_with_version_ranges(body, &version_ranges) - .await - { - Ok(response) => { - if let Some(e) = response.error_code { + 'throttle: loop { + match self + .request_with_version_ranges(&body, &version_ranges) + .await + { + Ok(response) => { + if let Err(ErrorOrThrottle::Throttle(throttle)) = + maybe_throttle::(response.throttle_time_ms) + { + info!( + ?throttle, + request_name = "version sync", + "broker asked us to throttle" + ); + tokio::time::sleep(throttle).await; + continue 'throttle; + } + + if let Some(e) = response.error_code { + debug!( + %e, + version=upper_bound, + "Got error during version sync, cannot use version for ApiVersionRequest", + ); + continue 'iter_upper_bound; + } + + // check range sanity + for api_key in &response.api_keys { + if api_key.min_version.0 > api_key.max_version.0 { + return Err(SyncVersionsError::FlippedVersionRange { + api_key: api_key.api_key, + min: api_key.min_version, + max: api_key.max_version, + }); + } + } + + let ranges = response + .api_keys + .into_iter() + .map(|x| { + ( + x.api_key, + ApiVersionRange::new(x.min_version, x.max_version), + ) + }) + .collect(); + debug!( + versions=%sorted_ranges_repr(&ranges), + "Detected supported broker versions", + ); + self.set_version_ranges(ranges); + return Ok(()); + } + Err(RequestError::NoVersionMatch { .. }) => { + unreachable!("Just set to version range to a non-empty range") + } + Err(RequestError::ReadVersionedError(e)) => { debug!( %e, version=upper_bound, - "Got error during version sync, cannot use version for ApiVersionRequest", + "Cannot read ApiVersionResponse for version", ); - continue; + continue 'iter_upper_bound; } - - // check range sanity - for api_key in &response.api_keys { - if api_key.min_version.0 > api_key.max_version.0 { - return Err(SyncVersionsError::FlippedVersionRange { - api_key: api_key.api_key, - min: api_key.min_version, - max: api_key.max_version, - }); - } + Err(RequestError::ReadError(e)) => { + debug!( + %e, + version=upper_bound, + "Cannot read ApiVersionResponse for version", + ); + continue 'iter_upper_bound; + } + Err(e @ RequestError::TooMuchData { .. }) => { + debug!( + %e, + version=upper_bound, + "Cannot read ApiVersionResponse for version", + ); + continue 'iter_upper_bound; + } + Err(e) => { + return Err(SyncVersionsError::RequestError(e)); } - - let ranges = response - .api_keys - .into_iter() - .map(|x| { - ( - x.api_key, - ApiVersionRange::new(x.min_version, x.max_version), - ) - }) - .collect(); - debug!( - versions=%sorted_ranges_repr(&ranges), - "Detected supported broker versions", - ); - self.set_version_ranges(ranges); - return Ok(()); - } - Err(RequestError::NoVersionMatch { .. }) => { - unreachable!("Just set to version range to a non-empty range") - } - Err(RequestError::ReadVersionedError(e)) => { - debug!( - %e, - version=upper_bound, - "Cannot read ApiVersionResponse for version", - ); - continue; - } - Err(RequestError::ReadError(e)) => { - debug!( - %e, - version=upper_bound, - "Cannot read ApiVersionResponse for version", - ); - continue; - } - Err(e @ RequestError::TooMuchData { .. }) => { - debug!( - %e, - version=upper_bound, - "Cannot read ApiVersionResponse for version", - ); - continue; - } - Err(e) => { - return Err(SyncVersionsError::RequestError(e)); } } } diff --git a/src/throttle.rs b/src/throttle.rs new file mode 100644 index 0000000..dec84b1 --- /dev/null +++ b/src/throttle.rs @@ -0,0 +1,28 @@ +//! Helpers to implement throttling within the Kafka protocol. + +use std::time::Duration; + +use tracing::warn; + +use crate::{backoff::ErrorOrThrottle, protocol::primitives::Int32}; + +pub fn maybe_throttle(throttle_time_ms: Option) -> Result<(), ErrorOrThrottle> +where + E: std::error::Error + Send, +{ + let throttle_time_ms = throttle_time_ms.map(|t| t.0).unwrap_or_default(); + let throttle_time_ms: u64 = match throttle_time_ms.try_into() { + Ok(t) => t, + Err(_) => { + warn!(throttle_time_ms, "Invalid throttle time",); + return Ok(()); + } + }; + + if throttle_time_ms == 0 { + return Ok(()); + } + + let duration = Duration::from_millis(throttle_time_ms); + Err(ErrorOrThrottle::Throttle(duration)) +}