Skip to content

Commit

Permalink
Merge pull request #83 from tansu-io/80-delete-topics-api-20-causing-…
Browse files Browse the repository at this point in the history
…panic

implement: delete topics, use some defaults on create topic
  • Loading branch information
shortishly authored Oct 19, 2024
2 parents acd2bfe + 6ddc4ab commit c80b57d
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 29 deletions.
13 changes: 11 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@ ADVERTISED_LISTENER=localhost:9092

KAFKA_ADVERTISED_LISTENER_URL=tcp://${ADVERTISED_LISTENER}/

# setup for a local minio S3:
# create an access key for minio at:
# http://localhost:9001/access-keys
AWS_ACCESS_KEY_ID="<access key>"
AWS_SECRET_ACCESS_KEY="<secret key>"
AWS_ENDPOINT="http://minio:9000"

# use this endpoint when tansu and minio are both running in docker compose:
# AWS_ENDPOINT="http://minio:9000"

AWS_ENDPOINT="http://localhost:9000"

# enable http requests for a local minio
AWS_ALLOW_HTTP="true"

# minio storage engine
# create a bucket "tansu" at: http://localhost:9001/buckets
STORAGE_ENGINE="minio=s3://tansu/"

# PostgreSQL storage engine
# STORAGE_ENGINE="pg=postgres://postgres:postgres@db"
# STORAGE_ENGINE="pg=postgres://postgres:postgres@localhost"
28 changes: 24 additions & 4 deletions tansu-server/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
pub mod api_versions;
pub mod create_topic;
pub mod delete_records;
pub mod delete_topics;
pub mod describe_cluster;
pub mod describe_configs;
pub mod fetch;
Expand All @@ -35,6 +36,7 @@ use crate::{
use api_versions::ApiVersionsRequest;
use create_topic::CreateTopic;
use delete_records::DeleteRecordsRequest;
use delete_topics::DeleteTopicsRequest;
use describe_cluster::DescribeClusterRequest;
use describe_configs::DescribeConfigsRequest;
use fetch::FetchRequest;
Expand All @@ -61,9 +63,6 @@ pub struct Broker<G, S> {
node_id: i32,
cluster_id: String,
incarnation_id: Uuid,
#[allow(dead_code)]
// context: Raft,
// applicator: Applicator,
listener: Url,
advertised_listener: Url,
#[allow(dead_code)]
Expand Down Expand Up @@ -269,8 +268,13 @@ where
} => {
debug!(?validate_only, ?topics);
CreateTopic::with_storage(self.storage.clone())
.request(topics, validate_only.unwrap_or(false))
.response(topics, validate_only.unwrap_or(false))
.await
.map(Some)
.map(|topics| Body::CreateTopicsResponse {
throttle_time_ms: Some(0),
topics,
})
}

Body::DeleteRecordsRequest { topics, .. } => {
Expand All @@ -281,6 +285,22 @@ where
.await
}

Body::DeleteTopicsRequest {
topics,
topic_names,
timeout_ms,
} => {
debug!(?topics, ?topic_names, ?timeout_ms);

Ok(Body::DeleteTopicsResponse {
throttle_time_ms: Some(0),
responses: DeleteTopicsRequest::with_storage(self.storage.clone())
.response(topics, topic_names)
.await
.map(Some)?,
})
}

Body::DescribeClusterRequest {
include_cluster_authorized_operations,
endpoint_type,
Expand Down
169 changes: 160 additions & 9 deletions tansu-server/src/broker/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

use crate::Result;
use tansu_kafka_sans_io::{
create_topics_request::CreatableTopic, create_topics_response::CreatableTopicResult, Body,
ErrorCode,
create_topics_request::CreatableTopic, create_topics_response::CreatableTopicResult, ErrorCode,
};
use tansu_storage::Storage;
use tracing::debug;
Expand All @@ -36,11 +35,19 @@ where

async fn create_topic(
&self,
topic: CreatableTopic,
mut topic: CreatableTopic,
validate_only: bool,
) -> CreatableTopicResult {
let _ = validate_only;

if topic.num_partitions == -1 {
topic.num_partitions = 1;
}

if topic.replication_factor == -1 {
topic.replication_factor = 3
}

let name = topic.name.clone();
let num_partitions = Some(topic.num_partitions);
let replication_factor = Some(topic.replication_factor);
Expand Down Expand Up @@ -89,11 +96,11 @@ where
}
}

pub async fn request(
pub async fn response(
&self,
creatable: Option<Vec<CreatableTopic>>,
validate_only: bool,
) -> Result<Body> {
) -> Result<Vec<CreatableTopicResult>> {
debug!(?creatable, ?validate_only);

let mut topics =
Expand All @@ -105,9 +112,153 @@ where
}
}

Ok(Body::CreateTopicsResponse {
throttle_time_ms: Some(0),
topics: Some(topics),
})
Ok(topics)
}
}

#[cfg(test)]
mod tests {
use object_store::memory::InMemory;
use tansu_storage::{dynostore::DynoStore, NULL_TOPIC_ID};

use super::*;

#[tokio::test]
async fn create() -> Result<()> {
let cluster = "abc";
let node = 12321;

let storage = DynoStore::new(cluster, node, InMemory::new());

let create_topic = CreateTopic::with_storage(storage);

let name = "pqr";
let num_partitions = 5;
let replication_factor = 3;
let assignments = Some([].into());
let configs = Some([].into());
let validate_only = false;

let r = create_topic
.response(
Some(vec![CreatableTopic {
name: name.into(),
num_partitions,
replication_factor,
assignments,
configs,
}]),
validate_only,
)
.await?;

assert_eq!(1, r.len());
assert_eq!(name, r[0].name.as_str());
assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id);
assert_eq!(Some(5), r[0].num_partitions);
assert_eq!(Some(3), r[0].replication_factor);
assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?);

Ok(())
}

#[tokio::test]
async fn create_with_default() -> Result<()> {
let cluster = "abc";
let node = 12321;

let storage = DynoStore::new(cluster, node, InMemory::new());

let create_topic = CreateTopic::with_storage(storage);

let name = "pqr";
let num_partitions = -1;
let replication_factor = -1;
let assignments = Some([].into());
let configs = Some([].into());
let validate_only = false;

let r = create_topic
.response(
Some(vec![CreatableTopic {
name: name.into(),
num_partitions,
replication_factor,
assignments,
configs,
}]),
validate_only,
)
.await?;

assert_eq!(1, r.len());
assert_eq!(name, r[0].name.as_str());
assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id);
assert_eq!(Some(1), r[0].num_partitions);
assert_eq!(Some(3), r[0].replication_factor);
assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?);

Ok(())
}

#[tokio::test]
async fn duplicate() -> Result<()> {
let cluster = "abc";
let node = 12321;

let storage = DynoStore::new(cluster, node, InMemory::new());

let create_topic = CreateTopic::with_storage(storage);

let name = "pqr";
let num_partitions = 5;
let replication_factor = 3;
let assignments = Some([].into());
let configs = Some([].into());
let validate_only = false;

let r = create_topic
.response(
Some(vec![CreatableTopic {
name: name.into(),
num_partitions,
replication_factor,
assignments: assignments.clone(),
configs: configs.clone(),
}]),
validate_only,
)
.await?;

assert_eq!(1, r.len());
assert_eq!(name, r[0].name.as_str());
assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id);
assert_eq!(Some(5), r[0].num_partitions);
assert_eq!(Some(3), r[0].replication_factor);
assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?);

let r = create_topic
.response(
Some(vec![CreatableTopic {
name: name.into(),
num_partitions,
replication_factor,
assignments,
configs,
}]),
validate_only,
)
.await?;

assert_eq!(1, r.len());
assert_eq!(name, r[0].name.as_str());
assert_eq!(Some(NULL_TOPIC_ID), r[0].topic_id);
assert_eq!(Some(5), r[0].num_partitions);
assert_eq!(Some(3), r[0].replication_factor);
assert_eq!(
ErrorCode::TopicAlreadyExists,
ErrorCode::try_from(r[0].error_code)?
);
Ok(())
}
}
Loading

0 comments on commit c80b57d

Please sign in to comment.