diff --git a/.env b/.env index 6c31a7c..f170a22 100644 --- a/.env +++ b/.env @@ -22,14 +22,23 @@ ADVERTISED_LISTENER=localhost:9092 KAFKA_ADVERTISED_LISTENER_URL=tcp://${ADVERTISED_LISTENER}/ -# setup for a local minio S3: +# create an access key for minio at: +# http://localhost:9001/access-keys AWS_ACCESS_KEY_ID="" AWS_SECRET_ACCESS_KEY="" -AWS_ENDPOINT="http://minio:9000" + +# use this endpoint when tansu and minio are both running in docker compose: +# AWS_ENDPOINT="http://minio:9000" + +AWS_ENDPOINT="http://localhost:9000" + +# enable http requests for a local minio AWS_ALLOW_HTTP="true" # minio storage engine +# create a bucket "tansu" at: http://localhost:9001/buckets STORAGE_ENGINE="minio=s3://tansu/" # PostgreSQL storage engine # STORAGE_ENGINE="pg=postgres://postgres:postgres@db" +# STORAGE_ENGINE="pg=postgres://postgres:postgres@localhost" diff --git a/tansu-server/src/broker.rs b/tansu-server/src/broker.rs index 17a17bc..f5da0a8 100644 --- a/tansu-server/src/broker.rs +++ b/tansu-server/src/broker.rs @@ -16,6 +16,7 @@ pub mod api_versions; pub mod create_topic; pub mod delete_records; +pub mod delete_topics; pub mod describe_cluster; pub mod describe_configs; pub mod fetch; @@ -35,6 +36,7 @@ use crate::{ use api_versions::ApiVersionsRequest; use create_topic::CreateTopic; use delete_records::DeleteRecordsRequest; +use delete_topics::DeleteTopicsRequest; use describe_cluster::DescribeClusterRequest; use describe_configs::DescribeConfigsRequest; use fetch::FetchRequest; @@ -61,9 +63,6 @@ pub struct Broker { node_id: i32, cluster_id: String, incarnation_id: Uuid, - #[allow(dead_code)] - // context: Raft, - // applicator: Applicator, listener: Url, advertised_listener: Url, #[allow(dead_code)] @@ -269,8 +268,13 @@ where } => { debug!(?validate_only, ?topics); CreateTopic::with_storage(self.storage.clone()) - .request(topics, validate_only.unwrap_or(false)) + .response(topics, validate_only.unwrap_or(false)) .await + .map(Some) + .map(|topics| Body::CreateTopicsResponse { + throttle_time_ms: Some(0), + topics, + }) } Body::DeleteRecordsRequest { topics, .. } => { @@ -281,6 +285,22 @@ where .await } + Body::DeleteTopicsRequest { + topics, + topic_names, + timeout_ms, + } => { + debug!(?topics, ?topic_names, ?timeout_ms); + + Ok(Body::DeleteTopicsResponse { + throttle_time_ms: Some(0), + responses: DeleteTopicsRequest::with_storage(self.storage.clone()) + .response(topics, topic_names) + .await + .map(Some)?, + }) + } + Body::DescribeClusterRequest { include_cluster_authorized_operations, endpoint_type, diff --git a/tansu-server/src/broker/create_topic.rs b/tansu-server/src/broker/create_topic.rs index f7214d4..fc35e4d 100644 --- a/tansu-server/src/broker/create_topic.rs +++ b/tansu-server/src/broker/create_topic.rs @@ -15,8 +15,7 @@ use crate::Result; use tansu_kafka_sans_io::{ - create_topics_request::CreatableTopic, create_topics_response::CreatableTopicResult, Body, - ErrorCode, + create_topics_request::CreatableTopic, create_topics_response::CreatableTopicResult, ErrorCode, }; use tansu_storage::Storage; use tracing::debug; @@ -36,11 +35,19 @@ where async fn create_topic( &self, - topic: CreatableTopic, + mut topic: CreatableTopic, validate_only: bool, ) -> CreatableTopicResult { let _ = validate_only; + if topic.num_partitions == -1 { + topic.num_partitions = 1; + } + + if topic.replication_factor == -1 { + topic.replication_factor = 3 + } + let name = topic.name.clone(); let num_partitions = Some(topic.num_partitions); let replication_factor = Some(topic.replication_factor); @@ -89,11 +96,11 @@ where } } - pub async fn request( + pub async fn response( &self, creatable: Option>, validate_only: bool, - ) -> Result { + ) -> Result> { debug!(?creatable, ?validate_only); let mut topics = @@ -105,9 +112,153 @@ where } } - Ok(Body::CreateTopicsResponse { - throttle_time_ms: Some(0), - topics: Some(topics), - }) + Ok(topics) + } +} + +#[cfg(test)] +mod tests { + use object_store::memory::InMemory; + use tansu_storage::{dynostore::DynoStore, NULL_TOPIC_ID}; + + use super::*; + + #[tokio::test] + async fn create() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let create_topic = CreateTopic::with_storage(storage); + + let name = "pqr"; + let num_partitions = 5; + let replication_factor = 3; + let assignments = Some([].into()); + let configs = Some([].into()); + let validate_only = false; + + let r = create_topic + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments, + configs, + }]), + validate_only, + ) + .await?; + + assert_eq!(1, r.len()); + assert_eq!(name, r[0].name.as_str()); + assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id); + assert_eq!(Some(5), r[0].num_partitions); + assert_eq!(Some(3), r[0].replication_factor); + assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?); + + Ok(()) + } + + #[tokio::test] + async fn create_with_default() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let create_topic = CreateTopic::with_storage(storage); + + let name = "pqr"; + let num_partitions = -1; + let replication_factor = -1; + let assignments = Some([].into()); + let configs = Some([].into()); + let validate_only = false; + + let r = create_topic + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments, + configs, + }]), + validate_only, + ) + .await?; + + assert_eq!(1, r.len()); + assert_eq!(name, r[0].name.as_str()); + assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id); + assert_eq!(Some(1), r[0].num_partitions); + assert_eq!(Some(3), r[0].replication_factor); + assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?); + + Ok(()) + } + + #[tokio::test] + async fn duplicate() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let create_topic = CreateTopic::with_storage(storage); + + let name = "pqr"; + let num_partitions = 5; + let replication_factor = 3; + let assignments = Some([].into()); + let configs = Some([].into()); + let validate_only = false; + + let r = create_topic + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments: assignments.clone(), + configs: configs.clone(), + }]), + validate_only, + ) + .await?; + + assert_eq!(1, r.len()); + assert_eq!(name, r[0].name.as_str()); + assert_ne!(Some(NULL_TOPIC_ID), r[0].topic_id); + assert_eq!(Some(5), r[0].num_partitions); + assert_eq!(Some(3), r[0].replication_factor); + assert_eq!(ErrorCode::None, ErrorCode::try_from(r[0].error_code)?); + + let r = create_topic + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments, + configs, + }]), + validate_only, + ) + .await?; + + assert_eq!(1, r.len()); + assert_eq!(name, r[0].name.as_str()); + assert_eq!(Some(NULL_TOPIC_ID), r[0].topic_id); + assert_eq!(Some(5), r[0].num_partitions); + assert_eq!(Some(3), r[0].replication_factor); + assert_eq!( + ErrorCode::TopicAlreadyExists, + ErrorCode::try_from(r[0].error_code)? + ); + Ok(()) } } diff --git a/tansu-server/src/broker/delete_topics.rs b/tansu-server/src/broker/delete_topics.rs new file mode 100644 index 0000000..c57b7ef --- /dev/null +++ b/tansu-server/src/broker/delete_topics.rs @@ -0,0 +1,182 @@ +// Copyright ⓒ 2024 Peter Morgan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::Result; +use tansu_kafka_sans_io::{ + delete_topics_request::DeleteTopicState, delete_topics_response::DeletableTopicResult, +}; +use tansu_storage::Storage; +use tracing::debug; + +#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct DeleteTopicsRequest { + storage: S, +} + +impl DeleteTopicsRequest +where + S: Storage, +{ + pub fn with_storage(storage: S) -> Self { + Self { storage } + } + + pub async fn response( + &self, + topics: Option>, + topic_names: Option>, + ) -> Result> { + debug!(?topics, ?topic_names); + + let mut responses = vec![]; + + if let Some(topics) = topics { + for topic in topics { + let error_code = self.storage.delete_topic(&topic.clone().into()).await?; + + responses.push(DeletableTopicResult { + name: topic.name.clone(), + topic_id: Some(topic.topic_id), + error_code: i16::from(error_code), + error_message: Some(error_code.to_string()), + }); + } + } + + if let Some(topic_names) = topic_names { + for name in topic_names { + let topic_id = name.clone().into(); + let error_code = self.storage.delete_topic(&topic_id).await?; + + responses.push(DeletableTopicResult { + name: Some(name), + topic_id: None, + error_code: i16::from(error_code), + error_message: Some(error_code.to_string()), + }); + } + } + + Ok(responses) + } +} + +#[cfg(test)] +mod tests { + use crate::broker::create_topic::CreateTopic; + + use super::*; + use object_store::memory::InMemory; + use tansu_kafka_sans_io::{create_topics_request::CreatableTopic, ErrorCode}; + use tansu_storage::{dynostore::DynoStore, NULL_TOPIC_ID}; + use uuid::Uuid; + + #[tokio::test] + async fn delete_unknown_by_name() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let topic = "pqr"; + + assert_eq!( + ErrorCode::UnknownTopicOrPartition, + storage.delete_topic(&topic.into()).await? + ); + + Ok(()) + } + + #[tokio::test] + async fn delete_unknown_by_uuid() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let topic = Uuid::new_v4(); + + assert_eq!( + ErrorCode::UnknownTopicOrPartition, + storage.delete_topic(&topic.into()).await? + ); + + Ok(()) + } + + #[tokio::test] + async fn create_delete_create_by_name() -> Result<()> { + let cluster = "abc"; + let node = 12321; + + let storage = DynoStore::new(cluster, node, InMemory::new()); + + let name = "pqr"; + let num_partitions = 5; + let replication_factor = 3; + let assignments = Some([].into()); + let configs = Some([].into()); + let validate_only = false; + + let created = CreateTopic::with_storage(storage.clone()) + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments: assignments.clone(), + configs: configs.clone(), + }]), + validate_only, + ) + .await?; + + assert_eq!(1, created.len()); + assert_eq!(ErrorCode::None, ErrorCode::try_from(created[0].error_code)?); + + let deleted = DeleteTopicsRequest::with_storage(storage.clone()) + .response( + Some(vec![DeleteTopicState { + name: Some(name.into()), + topic_id: NULL_TOPIC_ID, + }]), + Some(vec![]), + ) + .await?; + + assert_eq!(1, deleted.len()); + assert_eq!(ErrorCode::None, ErrorCode::try_from(deleted[0].error_code)?); + assert_eq!(Some(NULL_TOPIC_ID), deleted[0].topic_id); + + let created = CreateTopic::with_storage(storage.clone()) + .response( + Some(vec![CreatableTopic { + name: name.into(), + num_partitions, + replication_factor, + assignments, + configs, + }]), + validate_only, + ) + .await?; + + assert_eq!(1, created.len()); + assert_eq!(ErrorCode::None, ErrorCode::try_from(created[0].error_code)?); + + Ok(()) + } +} diff --git a/tansu-storage/src/dynostore.rs b/tansu-storage/src/dynostore.rs index 1f4ad39..62e1a65 100644 --- a/tansu-storage/src/dynostore.rs +++ b/tansu-storage/src/dynostore.rs @@ -20,6 +20,7 @@ use std::{collections::BTreeMap, fmt::Debug, io::Cursor, str::FromStr, time::Dur use async_trait::async_trait; use bytes::Bytes; +use futures::stream::TryStreamExt; use futures::StreamExt; use object_store::{ path::Path, Attribute, AttributeValue, Attributes, ObjectStore, PutMode, PutOptions, @@ -368,10 +369,80 @@ impl Storage for DynoStore { todo!() } - async fn delete_topic(&self, name: &str) -> Result { - debug!(?name); + async fn delete_topic(&self, topic: &TopicId) -> Result { + debug!(?topic); - todo!() + if let Ok(metadata) = self.topic_metadata(topic).await { + let prefix = Path::from(format!( + "clusters/{}/topics/{}/", + self.cluster, metadata.topic.name, + )); + + let locations = self + .object_store + .list(Some(&prefix)) + .map_ok(|m| m.location) + .boxed(); + + _ = self + .object_store + .delete_stream(locations) + .try_collect::>() + .await?; + + let prefix = Path::from(format!("clusters/{}/groups/consumers/", self.cluster)); + + let locations = self + .object_store + .list(Some(&prefix)) + .filter_map(|m| async { + m.map_or(None, |m| { + debug!(?m.location); + + m.location.prefix_match(&prefix).and_then(|mut i| { + // skip over the consumer group name + _ = i.next(); + + let sub = Path::from_iter(i); + debug!(?sub); + + if sub.prefix_matches(&Path::from(format!( + "offsets/{}/partitions/", + metadata.topic.name + ))) { + Some(Ok(m.location.clone())) + } else { + None + } + }) + }) + }) + .boxed(); + + _ = self + .object_store + .delete_stream(locations) + .try_collect::>() + .await?; + + self.object_store + .delete(&Path::from(format!( + "clusters/{}/topics/uuids/{}.json", + self.cluster, metadata.id, + ))) + .await?; + + self.object_store + .delete(&Path::from(format!( + "clusters/{}/topics/{}.json", + self.cluster, metadata.topic.name, + ))) + .await?; + + Ok(ErrorCode::None) + } else { + Ok(ErrorCode::UnknownTopicOrPartition) + } } async fn brokers(&self) -> Result> { diff --git a/tansu-storage/src/lib.rs b/tansu-storage/src/lib.rs index 0d02935..48a4a57 100644 --- a/tansu-storage/src/lib.rs +++ b/tansu-storage/src/lib.rs @@ -39,6 +39,7 @@ use tansu_kafka_sans_io::{ create_topics_request::CreatableTopic, delete_records_request::DeleteRecordsTopic, delete_records_response::DeleteRecordsTopicResult, + delete_topics_request::DeleteTopicState, describe_cluster_response::DescribeClusterBroker, describe_configs_response::DescribeConfigsResult, fetch_request::FetchTopic, @@ -57,7 +58,7 @@ pub mod os; pub mod pg; pub mod segment; -const NULL_TOPIC_ID: [u8; 16] = [0; 16]; +pub const NULL_TOPIC_ID: [u8; 16] = [0; 16]; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -363,6 +364,38 @@ pub enum TopicId { Id(Uuid), } +impl FromStr for TopicId { + type Err = Error; + + fn from_str(s: &str) -> result::Result { + Ok(Self::Name(s.into())) + } +} + +impl From<&str> for TopicId { + fn from(value: &str) -> Self { + Self::Name(value.to_owned()) + } +} + +impl From for TopicId { + fn from(value: String) -> Self { + Self::Name(value) + } +} + +impl From for TopicId { + fn from(value: Uuid) -> Self { + Self::Id(value) + } +} + +impl From<[u8; 16]> for TopicId { + fn from(value: [u8; 16]) -> Self { + Self::Id(Uuid::from_bytes(value)) + } +} + impl From<&FetchTopic> for TopicId { fn from(value: &FetchTopic) -> Self { if let Some(ref name) = value.topic { @@ -387,6 +420,19 @@ impl From<&MetadataRequestTopic> for TopicId { } } +impl From for TopicId { + fn from(value: DeleteTopicState) -> Self { + match value { + DeleteTopicState { + name: Some(name), + topic_id, + } if topic_id == NULL_TOPIC_ID => name.into(), + + DeleteTopicState { topic_id, .. } => topic_id.into(), + } + } +} + #[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] pub struct BrokerRegistationRequest { pub broker_id: i32, @@ -525,7 +571,7 @@ pub trait Storage: Clone + Debug + Send + Sync + 'static { topics: &[DeleteRecordsTopic], ) -> Result>; - async fn delete_topic(&self, name: &str) -> Result; + async fn delete_topic(&self, topic: &TopicId) -> Result; async fn brokers(&self) -> Result>; @@ -620,10 +666,10 @@ impl Storage for StorageContainer { } } - async fn delete_topic(&self, name: &str) -> Result { + async fn delete_topic(&self, topic: &TopicId) -> Result { match self { - Self::Postgres(pg) => pg.delete_topic(name).await, - Self::DynoStore(dyn_store) => dyn_store.delete_topic(name).await, + Self::Postgres(pg) => pg.delete_topic(topic).await, + Self::DynoStore(dyn_store) => dyn_store.delete_topic(topic).await, } } diff --git a/tansu-storage/src/pg.rs b/tansu-storage/src/pg.rs index 9d1684c..02285dd 100644 --- a/tansu-storage/src/pg.rs +++ b/tansu-storage/src/pg.rs @@ -35,7 +35,7 @@ use tansu_kafka_sans_io::{ record::{deflated, inflated, Header, Record}, to_system_time, to_timestamp, ConfigResource, ConfigSource, ConfigType, ErrorCode, }; -use tokio_postgres::{error::SqlState, Config, NoTls}; +use tokio_postgres::{error::SqlState, Config, NoTls, Transaction}; use tracing::{debug, error}; use uuid::Uuid; @@ -45,6 +45,66 @@ use crate::{ UpdateError, Version, NULL_TOPIC_ID, }; +const DELETE_CONSUMER_OFFSETS_FOR_TOPIC: &str = concat!( + "delete from consumer_offset", + " using", + " cluster", + ", topic", + " where", + " consumer_offset.topic = topic.id", + " and", + " topic.cluster = cluster.id", + " and", + " cluster.name = $1", + " and", + " topic. = $2" +); + +const DELETE_HEADERS_FOR_TOPIC: &str = concat!( + "delete from header", + " using", + " cluster", + ", record", + ", topic", + " where", + " header.record = record.id", + " and", + " record.topic = topic.id", + " and", + " topic.cluster = cluster.id", + " and", + " cluster.name = $1", + " and", + " topic. = $2" +); + +const DELETE_RECORDS_FOR_TOPIC: &str = concat!( + "delete from record", + " using", + " cluster", + ", topic", + " where", + " record.topic = topic.id", + " and", + " topic.cluster = cluster.id", + " and", + " cluster.name = $1", + " and", + " topic. = $2" +); + +const DELETE_TOPIC: &str = concat!( + "delete from topic", + " using", + " cluster", + " where", + " topic.cluster = cluster.id", + " and", + " cluster.name = $1", + " and", + " topic. = $2" +); + #[derive(Clone, Debug)] pub struct Postgres { cluster: String, @@ -128,6 +188,36 @@ impl Postgres { async fn connection(&self) -> Result { self.pool.get().await.map_err(Into::into) } + + async fn delete_for_topic( + &self, + tx: &Transaction<'_>, + sql: &str, + topic: &TopicId, + ) -> Result { + match topic { + TopicId::Id(id) => { + let sql = sql.replace("", "id"); + + let prepared = tx.prepare(&sql).await.inspect_err(|err| error!(?err))?; + + tx.execute(&prepared, &[&self.cluster, &id]) + .await + .inspect_err(|err| error!(?err)) + .map_err(Into::into) + } + TopicId::Name(name) => { + let sql = sql.replace("", "name"); + + let prepared = tx.prepare(&sql).await.inspect_err(|err| error!(?err))?; + + tx.execute(&prepared, &[&self.cluster, &name]) + .await + .inspect_err(|err| error!(?err)) + .map_err(Into::into) + } + } + } } #[async_trait] @@ -291,7 +381,7 @@ impl Storage for Postgres { &self.cluster.as_str(), &topic.name.as_str(), &topic.num_partitions, - &(topic.replication_factor as i32), + &topic.replication_factor, ], ) .await @@ -463,12 +553,36 @@ impl Storage for Postgres { Ok(responses) } - async fn delete_topic(&self, name: &str) -> Result { - let c = self.connection().await?; + async fn delete_topic(&self, topic: &TopicId) -> Result { + let mut c = self.connection().await?; + let tx = c.transaction().await?; + + for (description, sql) in [ + ("consumer offsets", DELETE_CONSUMER_OFFSETS_FOR_TOPIC), + ("headers", DELETE_HEADERS_FOR_TOPIC), + ("records", DELETE_RECORDS_FOR_TOPIC), + ] { + let rows = self.delete_for_topic(&tx, sql, topic).await?; + debug!(?topic, ?rows, ?description); + } - let delete_topic = c.prepare("delete from topic where name=$1 cascade").await?; + let topic_deletion_result = + self.delete_for_topic(&tx, DELETE_TOPIC, topic) + .await + .map(|rows| match rows { + 0 => ErrorCode::UnknownTopicOrPartition, + 1 => ErrorCode::None, + otherwise => { + error!(?otherwise, ?DELETE_TOPIC, ?topic, ?self.cluster); + ErrorCode::None + } + }); - c.execute(&delete_topic, &[&name]).await.map_err(Into::into) + tx.commit() + .await + .inspect_err(|err| error!(?err)) + .map_err(Into::into) + .and(topic_deletion_result) } async fn produce(&self, topition: &'_ Topition, deflated: deflated::Batch) -> Result {