diff --git a/src/client/consumer.rs b/src/client/consumer.rs index fe8b661e..2cd956bc 100644 --- a/src/client/consumer.rs +++ b/src/client/consumer.rs @@ -111,6 +111,7 @@ impl StreamConsumerBuilder { min_batch_size: self.min_batch_size, max_batch_size: self.max_batch_size, next_offset: self.start_offset, + terminated: false, last_high_watermark: -1, buffer: Default::default(), fetch_fut: Fuse::terminated(), @@ -122,6 +123,9 @@ type FetchResult = Result<(Vec, i64)>; /// A trait wrapper to allow mocking trait FetchClient: std::fmt::Debug + Send + Sync { + /// Fetch records. + /// + /// Arguments are identical to [`PartitionClient::fetch_records`]. fn fetch_records( &self, offset: i64, @@ -142,6 +146,11 @@ impl FetchClient for PartitionClient { } pin_project! { + /// Stream consuming data from start offset. + /// + /// # Error Handling + /// If an error is returned by [`fetch_records`](`FetchClient::fetch_records`) then the stream will emit this error + /// once and will terminate afterwards. pub struct StreamConsumer { client: Arc, @@ -153,6 +162,8 @@ pin_project! { next_offset: i64, + terminated: bool, + last_high_watermark: i64, buffer: VecDeque, @@ -167,6 +178,9 @@ impl Stream for StreamConsumer { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); loop { + if *this.terminated { + return Poll::Ready(None); + } if let Some(x) = this.buffer.pop_front() { return Poll::Ready(Some(Ok((x, *this.last_high_watermark)))); } @@ -203,7 +217,12 @@ impl Stream for StreamConsumer { } continue; } - Err(e) => return Poll::Ready(Some(Err(e))), + Err(e) => { + *this.terminated = true; + + // report error once + return Poll::Ready(Some(Err(e))); + } } } } @@ -217,9 +236,10 @@ impl std::fmt::Debug for StreamConsumer { .field("max_batch_size", &self.max_batch_size) .field("max_wait_ms", &self.max_wait_ms) .field("next_offset", &self.next_offset) + .field("terminated", &self.terminated) .field("last_high_watermark", &self.last_high_watermark) .field("buffer", &self.buffer) - .finish() + .finish_non_exhaustive() } } @@ -227,11 +247,15 @@ impl std::fmt::Debug for StreamConsumer { mod tests { use std::time::Duration; + use assert_matches::assert_matches; use futures::{pin_mut, StreamExt}; use time::OffsetDateTime; use tokio::sync::{mpsc, Mutex}; - use crate::record::Record; + use crate::{ + client::error::{Error, ProtocolError}, + record::Record, + }; use super::*; @@ -348,6 +372,36 @@ mod tests { } } + #[derive(Debug)] + struct MockErrFetch { + inner: Arc>>, + } + + impl MockErrFetch { + fn new(e: Error) -> Self { + Self { + inner: Arc::new(Mutex::new(Some(e))), + } + } + } + + impl FetchClient for MockErrFetch { + fn fetch_records( + &self, + _offset: i64, + _bytes: Range, + _max_wait_ms: i32, + ) -> BoxFuture<'_, Result<(Vec, i64)>> { + let inner = Arc::clone(&self.inner); + Box::pin(async move { + match inner.lock().await.take() { + Some(e) => Err(e), + None => panic!("EOF"), + } + }) + } + } + #[tokio::test] async fn test_consumer() { let record = Record { @@ -471,4 +525,24 @@ mod tests { .unwrap() .unwrap(); } + + #[tokio::test] + async fn test_consumer_terminate() { + let e = Error::ServerError( + ProtocolError::OffsetOutOfRange, + String::from("offset out of range"), + ); + let consumer = Arc::new(MockErrFetch::new(e)); + + let mut stream = StreamConsumerBuilder::new_with_client(consumer, 0).build(); + + let error = stream.next().await.expect("stream not empty").unwrap_err(); + assert_matches!( + error, + Error::ServerError(ProtocolError::OffsetOutOfRange, _) + ); + + // stream ends + assert!(stream.next().await.is_none()); + } } diff --git a/src/client/partition.rs b/src/client/partition.rs index cdcb01c1..6cd2761e 100644 --- a/src/client/partition.rs +++ b/src/client/partition.rs @@ -23,7 +23,7 @@ use std::ops::{ControlFlow, Deref, Range}; use std::sync::Arc; use time::OffsetDateTime; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{error, info, warn}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Compression { @@ -105,6 +105,11 @@ impl PartitionClient { /// Fetch `bytes` bytes of record data starting at sequence number `offset` /// /// Returns the records, and the current high watermark. + /// + /// + /// # Error Handling + /// Fetching records outside the range known the to broker (marked by low and high watermark) will lead to a + /// [`ServerError`](Error::ServerError) with [`OffsetOutOfRange`](ProtocolError::OffsetOutOfRange). pub async fn fetch_records( &self, offset: i64, @@ -119,6 +124,18 @@ impl PartitionClient { }) .await?; + // Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can + // implement a simple heuristic. + if partition.high_watermark.0 < offset { + warn!( + "This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't." + ); + return Err(Error::ServerError( + ProtocolError::OffsetOutOfRange, + String::from("Offset out of range"), + )); + } + let records = extract_records(partition.records.0)?; Ok((records, partition.high_watermark.0)) diff --git a/tests/client.rs b/tests/client.rs index 3cfbf092..89d42c28 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,3 +1,4 @@ +use assert_matches::assert_matches; use rskafka::{ client::{ error::{Error as ClientError, ProtocolError}, @@ -157,6 +158,63 @@ async fn test_produce_empty() { .unwrap(); } +#[tokio::test] +async fn test_consume_empty() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + let n_partitions = 2; + + let client = ClientBuilder::new(vec![connection]).build().await.unwrap(); + let controller_client = client.controller_client().await.unwrap(); + controller_client + .create_topic(&topic_name, n_partitions, 1, 5_000) + .await + .unwrap(); + + let partition_client = client.partition_client(&topic_name, 1).await.unwrap(); + let (records, watermark) = partition_client + .fetch_records(0, 1..10_000, 1_000) + .await + .unwrap(); + assert!(records.is_empty()); + assert_eq!(watermark, 0); +} + +#[tokio::test] +async fn test_consume_offset_out_of_range() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + let n_partitions = 2; + + let client = ClientBuilder::new(vec![connection]).build().await.unwrap(); + let controller_client = client.controller_client().await.unwrap(); + controller_client + .create_topic(&topic_name, n_partitions, 1, 5_000) + .await + .unwrap(); + + let partition_client = client.partition_client(&topic_name, 1).await.unwrap(); + let record = record(); + let offsets = partition_client + .produce(vec![record], Compression::NoCompression) + .await + .unwrap(); + let offset = offsets[0]; + + let err = partition_client + .fetch_records(offset + 2, 1..10_000, 1_000) + .await + .unwrap_err(); + assert_matches!( + err, + ClientError::ServerError(ProtocolError::OffsetOutOfRange, _) + ); +} + #[tokio::test] async fn test_get_high_watermark() { maybe_start_logging(); diff --git a/tests/consumer.rs b/tests/consumer.rs index bb1444c9..ff025c11 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use std::time::Duration; +use assert_matches::assert_matches; use futures::{Stream, StreamExt}; use tokio::time::timeout; use rskafka::client::{ consumer::{StreamConsumer, StreamConsumerBuilder}, + error::{Error, ProtocolError}, partition::Compression, ClientBuilder, }; @@ -35,7 +37,9 @@ async fn test_stream_consumer() { .await .unwrap(); - let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0).build(); + let mut stream = StreamConsumerBuilder::new(Arc::clone(&partition_client), 0) + .with_max_wait_ms(50) + .build(); let assert_ok = |r: Result::Item>, tokio::time::error::Elapsed>| { @@ -48,7 +52,7 @@ async fn test_stream_consumer() { assert_ok(timeout(Duration::from_millis(100), stream.next()).await); // No further records - timeout(Duration::from_millis(100), stream.next()) + timeout(Duration::from_millis(200), stream.next()) .await .expect_err("timeout"); @@ -71,3 +75,31 @@ async fn test_stream_consumer() { .await .expect_err("timeout"); } + +#[tokio::test] +async fn test_stream_consumer_offset_out_of_range() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let client = ClientBuilder::new(vec![connection]).build().await.unwrap(); + let controller_client = client.controller_client().await.unwrap(); + + let topic = random_topic_name(); + controller_client + .create_topic(&topic, 1, 1, 5_000) + .await + .unwrap(); + + let partition_client = Arc::new(client.partition_client(&topic, 0).await.unwrap()); + + let mut stream = StreamConsumerBuilder::new(partition_client, 1).build(); + + let error = stream.next().await.expect("stream not empty").unwrap_err(); + assert_matches!( + error, + Error::ServerError(ProtocolError::OffsetOutOfRange, _) + ); + + // stream ends + assert!(stream.next().await.is_none()); +}