Skip to content

Commit

Permalink
Merge pull request #99 from influxdata/crepererum/fix_half_record_bat…
Browse files Browse the repository at this point in the history
…ches

fix: filter out records that we've not requested
  • Loading branch information
kodiakhq[bot] authored Feb 14, 2022
2 parents 24aac8b + cffa616 commit 6c47b5b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
16 changes: 13 additions & 3 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl PartitionClient {
));
}

let records = extract_records(partition.records.0)?;
let records = extract_records(partition.records.0, offset)?;

Ok((records, partition.high_watermark.0))
}
Expand Down Expand Up @@ -535,7 +535,10 @@ fn process_fetch_response(
Ok(response_partition)
}

fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndOffset>> {
fn extract_records(
partition_records: Vec<RecordBatch>,
request_offset: i64,
) -> Result<Vec<RecordAndOffset>> {
let mut records = vec![];

for batch in partition_records {
Expand All @@ -547,6 +550,13 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
records.reserve(protocol_records.len());

for record in protocol_records {
let offset = batch.base_offset + record.offset_delta as i64;
if offset < request_offset {
// Kafka does not split record batches on the server side, so we need to do this filtering on
// the client side
continue;
}

let timestamp = OffsetDateTime::from_unix_timestamp_nanos(
(batch.first_timestamp + record.timestamp_delta) as i128 * 1_000_000,
)
Expand All @@ -565,7 +575,7 @@ fn extract_records(partition_records: Vec<RecordBatch>) -> Result<Vec<RecordAndO
.collect(),
timestamp,
},
offset: batch.base_offset + record.offset_delta as i64,
offset,
})
}
}
Expand Down
56 changes: 51 additions & 5 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,55 @@ async fn test_produce_consume_size_cutoff() {
assert!(is_kafka ^ is_redpanda);
}

#[tokio::test]
async fn test_consume_midbatch() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let controller_client = client.controller_client().await.unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
.await
.unwrap();

let partition_client = client.partition_client(&topic_name, 0).await.unwrap();

// produce two records into a single batch
let record_1 = record();
let record_2 = Record {
value: Some(b"x".to_vec()),
timestamp: now(),
..record_1.clone()
};

let offsets = partition_client
.produce(
vec![record_1.clone(), record_2.clone()],
Compression::NoCompression,
)
.await
.unwrap();
let _offset_1 = offsets[0];
let offset_2 = offsets[1];

// when fetching from the middle of the record batch, the server will return both records but we should filter out
// the first one on the client side
let (records, _watermark) = partition_client
.fetch_records(offset_2, 1..10_000, 1_000)
.await
.unwrap();
assert_eq!(
records,
vec![RecordAndOffset {
record: record_2,
offset: offset_2
},],
);
}

#[tokio::test]
async fn test_delete_records() {
maybe_start_logging();
Expand Down Expand Up @@ -429,18 +478,15 @@ async fn test_delete_records() {
ClientError::ServerError(ProtocolError::OffsetOutOfRange, _)
);

// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains record_2
// fetching untouched records still works, however the middle record batch is NOT half-deleted and still contains
// record_2. `fetch_records` should filter this however.
let (records, _watermark) = partition_client
.fetch_records(offset_3, 1..10_000, 1_000)
.await
.unwrap();
assert_eq!(
records,
vec![
RecordAndOffset {
record: record_2,
offset: offset_2
},
RecordAndOffset {
record: record_3,
offset: offset_3
Expand Down

0 comments on commit 6c47b5b

Please sign in to comment.