Skip to content

Commit

Permalink
Merge pull request #97 from influxdata/crepererum/deletes
Browse files Browse the repository at this point in the history
feat: implement record deletion
  • Loading branch information
kodiakhq[bot] authored Feb 14, 2022
2 parents 0f9cd4e + 304eedd commit 24aac8b
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "kafka-1:9093"
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9094 cargo test
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test
```

in another session.
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
environment variables.

### Fuzzing
RSKafka offers fuzz targets for certain protocol parsing steps. To build them make sure you have [cargo-fuzz] installed.
Expand Down
3 changes: 2 additions & 1 deletion src/client/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use thiserror::Error;

pub use crate::messenger::RequestError;
pub use crate::protocol::error::Error as ProtocolError;

#[derive(Error, Debug)]
Expand All @@ -8,7 +9,7 @@ pub enum Error {
Connection(#[from] crate::connection::Error),

#[error("Request error: {0}")]
Request(#[from] crate::messenger::RequestError),
Request(#[from] RequestError),

#[error("Invalid response: {0}")]
InvalidResponse(String),
Expand Down
90 changes: 85 additions & 5 deletions src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use crate::{
protocol::{
error::Error as ProtocolError,
messages::{
FetchRequest, FetchRequestPartition, FetchRequestTopic, FetchResponse,
FetchResponsePartition, IsolationLevel, ListOffsetsRequest,
ListOffsetsRequestPartition, ListOffsetsRequestTopic, ListOffsetsResponse,
ListOffsetsResponsePartition, ProduceRequest, ProduceRequestPartitionData,
ProduceRequestTopicData, ProduceResponse, NORMAL_CONSUMER,
DeleteRecordsRequest, DeleteRecordsResponse, DeleteRequestPartition,
DeleteRequestTopic, DeleteResponsePartition, FetchRequest, FetchRequestPartition,
FetchRequestTopic, FetchResponse, FetchResponsePartition, IsolationLevel,
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
ListOffsetsResponse, ListOffsetsResponsePartition, ProduceRequest,
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse, NORMAL_CONSUMER,
},
primitives::*,
record::{Record as ProtocolRecord, *},
Expand Down Expand Up @@ -159,6 +160,29 @@ impl PartitionClient {
extract_high_watermark(partition)
}

/// Delete records whose offset is smaller than the given offset.
///
/// # Supported Brokers
/// Currently this is only supported by Apache Kafka but NOT by Redpanda, see
/// <https://github.com/redpanda-data/redpanda/issues/1016>.
pub async fn delete_records(&self, offset: i64, timeout_ms: i32) -> Result<()> {
let request =
&build_delete_records_request(offset, timeout_ms, &self.topic, self.partition);

maybe_retry(
&self.backoff_config,
self,
"delete_records",
|| async move {
let response = self.get().await?.request(&request).await?;
process_delete_records_response(&self.topic, self.partition, response)
},
)
.await?;

Ok(())
}

/// Retrieve the broker ID of the partition leader
async fn get_leader(&self, broker_override: Option<BrokerConnection>) -> Result<i32> {
let metadata = self
Expand Down Expand Up @@ -625,3 +649,59 @@ fn extract_high_watermark(partition: ListOffsetsResponsePartition) -> Result<i64
_ => unreachable!(),
}
}

fn build_delete_records_request(
offset: i64,
timeout_ms: i32,
topic: &str,
partition: i32,
) -> DeleteRecordsRequest {
DeleteRecordsRequest {
topics: vec![DeleteRequestTopic {
name: String_(topic.to_string()),
partitions: vec![DeleteRequestPartition {
partition_index: Int32(partition),
offset: Int64(offset),
tagged_fields: None,
}],
tagged_fields: None,
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: None,
}
}

fn process_delete_records_response(
topic: &str,
partition: i32,
response: DeleteRecordsResponse,
) -> Result<DeleteResponsePartition> {
let response_topic = response
.topics
.exactly_one()
.map_err(Error::exactly_one_topic)?;

if response_topic.name.0 != topic {
return Err(Error::InvalidResponse(format!(
"Expected data for topic '{}' but got data for topic '{}'",
topic, response_topic.name.0
)));
}

let response_partition = response_topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;

if response_partition.partition_index.0 != partition {
return Err(Error::InvalidResponse(format!(
"Expected data for partition {} but got data for partition {}",
partition, response_partition.partition_index.0
)));
}

match response_partition.error {
Some(err) => Err(Error::ServerError(err, String::new())),
None => Ok(response_partition),
}
}
Loading

0 comments on commit 24aac8b

Please sign in to comment.