diff --git a/tansu-server/src/main.rs b/tansu-server/src/main.rs index c0e51bf..47d4f61 100644 --- a/tansu-server/src/main.rs +++ b/tansu-server/src/main.rs @@ -16,9 +16,12 @@ use std::{path::PathBuf, str::FromStr, time::Duration}; use clap::Parser; -use object_store::aws::{AmazonS3Builder, S3ConditionalPut}; +use object_store::{ + aws::{AmazonS3Builder, S3ConditionalPut}, + memory::InMemory, +}; use tansu_server::{broker::Broker, coordinator::group::administrator::Controller, Error, Result}; -use tansu_storage::{pg::Postgres, s3::S3, StorageContainer}; +use tansu_storage::{dynostore::DynoStore, pg::Postgres, StorageContainer}; use tokio::task::JoinSet; use tracing::debug; use tracing_subscriber::{fmt::format::FmtSpan, prelude::*, EnvFilter}; @@ -125,19 +128,27 @@ async fn main() -> Result<()> { "s3" => { let bucket_name = args.storage_engine.value.host_str().unwrap_or("tansu"); - let object_store_builder = AmazonS3Builder::from_env() + AmazonS3Builder::from_env() .with_bucket_name(bucket_name) - .with_conditional_put(S3ConditionalPut::ETagMatch); - - S3::new( - args.kafka_cluster_id.as_str(), - args.kafka_node_id, - object_store_builder, - ) - .map(StorageContainer::S3) - .map_err(Into::into) + .with_conditional_put(S3ConditionalPut::ETagMatch) + .build() + .map(|object_store| { + DynoStore::new( + args.kafka_cluster_id.as_str(), + args.kafka_node_id, + object_store, + ) + }) + .map(StorageContainer::DynoStore) + .map_err(Into::into) } + "memory" => Ok(StorageContainer::DynoStore(DynoStore::new( + args.kafka_cluster_id.as_str(), + args.kafka_node_id, + InMemory::new(), + ))), + _unsupported => Err(Error::UnsupportedStorageUrl(args.storage_engine.value)), }?; diff --git a/tansu-storage/src/lib.rs b/tansu-storage/src/lib.rs index e08979d..0d02935 100644 --- a/tansu-storage/src/lib.rs +++ b/tansu-storage/src/lib.rs @@ -19,7 +19,6 @@ use dynostore::DynoStore; use glob::{GlobError, PatternError}; use pg::Postgres; use regex::Regex; -use s3::S3; use serde::{Deserialize, Serialize}; use std::{ array::TryFromSliceError, @@ -56,7 +55,6 @@ pub mod dynostore; pub mod index; pub mod os; pub mod pg; -pub mod s3; pub mod segment; const NULL_TOPIC_ID: [u8; 16] = [0; 16]; @@ -593,7 +591,6 @@ pub enum UpdateError { #[derive(Clone, Debug)] pub enum StorageContainer { Postgres(Postgres), - S3(S3), DynoStore(DynoStore), } @@ -602,7 +599,6 @@ impl Storage for StorageContainer { async fn register_broker(&self, broker_registration: BrokerRegistationRequest) -> Result<()> { match self { Self::Postgres(pg) => pg.register_broker(broker_registration).await, - Self::S3(s3) => s3.register_broker(broker_registration).await, Self::DynoStore(dyn_store) => dyn_store.register_broker(broker_registration).await, } } @@ -610,7 +606,6 @@ impl Storage for StorageContainer { async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result { match self { Self::Postgres(pg) => pg.create_topic(topic, validate_only).await, - Self::S3(s3) => s3.create_topic(topic, validate_only).await, Self::DynoStore(dyn_store) => dyn_store.create_topic(topic, validate_only).await, } } @@ -621,7 +616,6 @@ impl Storage for StorageContainer { ) -> Result> { match self { Self::Postgres(pg) => pg.delete_records(topics).await, - Self::S3(s3) => s3.delete_records(topics).await, Self::DynoStore(dyn_store) => dyn_store.delete_records(topics).await, } } @@ -629,7 +623,6 @@ impl Storage for StorageContainer { async fn delete_topic(&self, name: &str) -> Result { match self { Self::Postgres(pg) => pg.delete_topic(name).await, - Self::S3(s3) => s3.delete_topic(name).await, Self::DynoStore(dyn_store) => dyn_store.delete_topic(name).await, } } @@ -637,7 +630,6 @@ impl Storage for StorageContainer { async fn brokers(&self) -> Result> { match self { Self::Postgres(pg) => pg.brokers().await, - Self::S3(s3) => s3.brokers().await, Self::DynoStore(dyn_store) => dyn_store.brokers().await, } } @@ -645,7 +637,6 @@ impl Storage for StorageContainer { async fn produce(&self, topition: &Topition, batch: deflated::Batch) -> Result { match self { Self::Postgres(pg) => pg.produce(topition, batch).await, - Self::S3(s3) => s3.produce(topition, batch).await, Self::DynoStore(dyn_store) => dyn_store.produce(topition, batch).await, } } @@ -659,7 +650,6 @@ impl Storage for StorageContainer { ) -> Result { match self { Self::Postgres(pg) => pg.fetch(topition, offset, min_bytes, max_bytes).await, - Self::S3(s3) => s3.fetch(topition, offset, min_bytes, max_bytes).await, Self::DynoStore(dyn_store) => { dyn_store .fetch(topition, offset, min_bytes, max_bytes) @@ -671,7 +661,6 @@ impl Storage for StorageContainer { async fn offset_stage(&self, topition: &Topition) -> Result { match self { Self::Postgres(pg) => pg.offset_stage(topition).await, - Self::S3(s3) => s3.offset_stage(topition).await, Self::DynoStore(dyn_store) => dyn_store.offset_stage(topition).await, } } @@ -682,7 +671,6 @@ impl Storage for StorageContainer { ) -> Result> { match self { Self::Postgres(pg) => pg.list_offsets(offsets).await, - Self::S3(s3) => s3.list_offsets(offsets).await, Self::DynoStore(dyn_store) => dyn_store.list_offsets(offsets).await, } } @@ -695,7 +683,6 @@ impl Storage for StorageContainer { ) -> Result> { match self { Self::Postgres(pg) => pg.offset_commit(group_id, retention_time_ms, offsets).await, - Self::S3(s3) => s3.offset_commit(group_id, retention_time_ms, offsets).await, Self::DynoStore(dyn_store) => { dyn_store .offset_commit(group_id, retention_time_ms, offsets) @@ -712,7 +699,6 @@ impl Storage for StorageContainer { ) -> Result> { match self { Self::Postgres(pg) => pg.offset_fetch(group_id, topics, require_stable).await, - Self::S3(s3) => s3.offset_fetch(group_id, topics, require_stable).await, Self::DynoStore(dyn_store) => { dyn_store .offset_fetch(group_id, topics, require_stable) @@ -724,7 +710,6 @@ impl Storage for StorageContainer { async fn metadata(&self, topics: Option<&[TopicId]>) -> Result { match self { Self::Postgres(pg) => pg.metadata(topics).await, - Self::S3(s3) => s3.metadata(topics).await, Self::DynoStore(dyn_store) => dyn_store.metadata(topics).await, } } @@ -737,7 +722,6 @@ impl Storage for StorageContainer { ) -> Result { match self { Self::Postgres(pg) => pg.describe_config(name, resource, keys).await, - Self::S3(s3) => s3.describe_config(name, resource, keys).await, Self::DynoStore(dyn_store) => dyn_store.describe_config(name, resource, keys).await, } } @@ -750,7 +734,6 @@ impl Storage for StorageContainer { ) -> Result> { match self { Self::Postgres(pg) => pg.update_group(group_id, detail, version).await, - Self::S3(s3) => s3.update_group(group_id, detail, version).await, Self::DynoStore(dyn_store) => dyn_store.update_group(group_id, detail, version).await, } } diff --git a/tansu-storage/src/s3.rs b/tansu-storage/src/s3.rs deleted file mode 100644 index f8b3330..0000000 --- a/tansu-storage/src/s3.rs +++ /dev/null @@ -1,1106 +0,0 @@ -// Copyright ⓒ 2024 Peter Morgan -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::BTreeSet; -use std::io::BufReader; -use std::{collections::BTreeMap, fmt::Debug, io::Cursor, str::FromStr, time::Duration}; - -use async_trait::async_trait; -use bytes::Bytes; -use futures::StreamExt; -use object_store::PutResult; -use object_store::{ - aws::{AmazonS3, AmazonS3Builder}, - path::Path, - Attribute, AttributeValue, Attributes, ObjectStore, PutMode, PutOptions, PutPayload, TagSet, - UpdateVersion, -}; -use rand::{prelude::*, thread_rng}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use tansu_kafka_sans_io::describe_configs_response::DescribeConfigsResourceResult; -use tansu_kafka_sans_io::{ - create_topics_request::CreatableTopic, - delete_records_request::DeleteRecordsTopic, - delete_records_response::DeleteRecordsTopicResult, - describe_cluster_response::DescribeClusterBroker, - describe_configs_response::DescribeConfigsResult, - metadata_response::{MetadataResponseBroker, MetadataResponsePartition, MetadataResponseTopic}, - record::{deflated, inflated}, - ConfigResource, Encoder, ErrorCode, -}; -use tansu_kafka_sans_io::{ConfigSource, ConfigType, Decoder}; -use tracing::{debug, error}; -use uuid::Uuid; - -use crate::{ - BrokerRegistationRequest, Error, GroupDetail, ListOffsetRequest, ListOffsetResponse, - MetadataResponse, OffsetCommitRequest, OffsetStage, Result, Storage, TopicId, Topition, - UpdateError, Version, NULL_TOPIC_ID, -}; - -const APPLICATION_JSON: &str = "application/json"; - -#[derive(Debug)] -pub struct S3 { - cluster: String, - node: i32, - builder: AmazonS3Builder, - object_store: AmazonS3, -} - -impl S3 { - async fn get_watermark(&self, topition: &Topition) -> Result { - debug!(?topition); - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/watermark.json", - self.cluster, topition.topic, topition.partition, - )); - - let get_result = self - .object_store - .get(&location) - .await - .inspect(|get_result| debug!(?get_result, ?topition)) - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError))?; - - let encoded = get_result - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError))?; - - serde_json::from_slice::(&encoded[..]) - .inspect(|watermark| debug!(?watermark, ?topition)) - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError)) - } - - async fn topic_metadata(&self, topic: &TopicId) -> Result { - debug!(?topic); - - let location = match topic { - TopicId::Name(name) => { - Path::from(format!("clusters/{}/topics/{}.json", self.cluster, name)) - } - - TopicId::Id(id) => Path::from(format!( - "clusters/{}/topics/uuids/{}.json", - self.cluster, id - )), - }; - - let get_result = self.object_store.get(&location).await?; - - let encoded = get_result.bytes().await?; - - serde_json::from_slice::(&encoded[..]) - .inspect(|topic_metadata| debug!(?topic_metadata, ?topic)) - .map_err(Into::into) - } -} - -// bucket structure: -// -// clusters/{cluster} -// clusters/{cluster}/brokers/broker-id.json -// clusters/{cluster}/topics/{name}.json -// clusters/{cluster}/topics/uuids/{uuid}.json -// clusters/{cluster}/topics/{name}/partitions.json -// clusters/{cluster}/topics/{name}/partitions/{partition}/records/{offset}-{timestamp} -// clusters/{cluster}/topics/{name}/partitions/0/watermark.json -// clusters/{cluster}/topics/{name}/partitions/0/records/0001 -// clusters/{cluster}/topics/{name}/partitions/1/records/0000 -// clusters/{cluster}/groups/consumers/{name}.json -// clusters/{cluster}/groups/consumers/{name}/offsets/topic/partition.json - -#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] -struct TopicMetadata { - id: Uuid, - topic: CreatableTopic, -} - -#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] -struct Watermark { - low: i64, - high: i64, - stable: i64, -} - -impl Clone for S3 { - fn clone(&self) -> Self { - Self { - cluster: self.cluster.clone(), - node: self.node, - object_store: self.builder.clone().build().expect("s3"), - builder: self.builder.clone(), - } - } -} - -impl S3 { - pub fn new(cluster: &str, node: i32, builder: AmazonS3Builder) -> Result { - builder - .clone() - .build() - .map(|object_store| Self { - cluster: cluster.into(), - node, - object_store, - builder, - }) - .map_err(Into::into) - } - - fn encode(&self, deflated: deflated::Batch) -> Result { - let mut encoded = Cursor::new(vec![]); - let mut encoder = Encoder::new(&mut encoded); - deflated.serialize(&mut encoder)?; - - Ok(PutPayload::from(Bytes::from(encoded.into_inner()))) - } - - fn decode(&self, encoded: Bytes) -> Result { - let mut c = Cursor::new(encoded); - - let mut decoder = Decoder::new(&mut c); - deflated::Batch::deserialize(&mut decoder).map_err(Into::into) - } - - async fn get

(&self, location: &Path) -> Result<(P, Version)> - where - P: DeserializeOwned, - { - let get_result = self.object_store.get(location).await?; - let meta = get_result.meta.clone(); - - let encoded = get_result.bytes().await?; - let r = BufReader::new(&encoded[..]); - - let payload = serde_json::from_reader(r)?; - - Ok((payload, meta.into())) - } - - async fn put

( - &self, - location: &Path, - value: P, - attributes: Attributes, - update_version: Option, - ) -> Result> - where - P: Serialize + DeserializeOwned + Debug, - { - debug!(?location, ?attributes, ?update_version, ?value); - - let options = PutOptions { - mode: update_version.map_or(PutMode::Create, PutMode::Update), - tags: TagSet::default(), - attributes, - }; - - let payload = serde_json::to_vec(&value) - .map(Bytes::from) - .map(PutPayload::from)?; - - match self - .object_store - .put_opts(location, payload, options) - .await - .inspect_err(|error| debug!(?location, ?error)) - { - Ok(put_result) => Ok(put_result), - - Err(object_store::Error::Precondition { .. }) - | Err(object_store::Error::AlreadyExists { .. }) => { - let (current, version) = self - .get(location) - .await - .inspect_err(|error| error!(?location, ?error))?; - - Err(UpdateError::Outdated { current, version }) - } - - Err(otherwise) => Err(otherwise.into()), - } - } -} - -#[async_trait] -impl Storage for S3 { - async fn register_broker(&self, broker_registration: BrokerRegistationRequest) -> Result<()> { - debug!(?broker_registration); - - let payload = serde_json::to_vec(&broker_registration) - .map(Bytes::from) - .map(PutPayload::from)?; - - let location = Path::from(format!( - "clusters/{}/brokers/{}.json", - self.cluster, self.node - )); - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - let options = PutOptions { - mode: PutMode::Overwrite, - tags: TagSet::default(), - attributes, - }; - - let put_result = self - .object_store - .put_opts(&location, payload, options) - .await?; - - debug!(?location, ?put_result); - - Ok(()) - } - - async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result { - debug!(?topic, ?validate_only); - - let id = Uuid::now_v7(); - - let td = TopicMetadata { id, topic }; - - let payload = serde_json::to_vec(&td) - .map(Bytes::from) - .map(PutPayload::from)?; - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - let options = PutOptions { - mode: PutMode::Create, - tags: TagSet::default(), - attributes, - }; - - match ( - self.object_store - .put_opts( - &Path::from(format!( - "clusters/{}/topics/{}.json", - self.cluster, td.topic.name, - )), - payload.clone(), - options.clone(), - ) - .await - .inspect(|put_result| debug!(?put_result)) - .inspect_err(|error| match error { - object_store::Error::AlreadyExists { .. } => { - debug!(?error, ?td, ?validate_only) - } - - _ => error!(?error, ?td, ?validate_only), - }), - self.object_store - .put_opts( - &Path::from(format!( - "clusters/{}/topics/uuids/{}.json", - self.cluster, id, - )), - payload, - options, - ) - .await - .inspect(|put_result| debug!(?put_result)) - .inspect_err(|error| match error { - object_store::Error::AlreadyExists { .. } => { - debug!(?error, ?td, ?validate_only) - } - - _ => error!(?error, ?td, ?validate_only), - }), - ) { - (Ok(_), Ok(_)) => { - let payload = serde_json::to_vec(&Watermark::default()) - .map(Bytes::from) - .map(PutPayload::from)?; - - for partition in 0..td.topic.num_partitions { - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/watermark.json", - self.cluster, td.topic.name, partition, - )); - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - let options = PutOptions { - mode: PutMode::Create, - tags: TagSet::default(), - attributes, - }; - - match self - .object_store - .put_opts(&location, payload.clone(), options) - .await - .inspect(|put_result| debug!(?location, ?put_result)) - .inspect_err(|error| match error { - object_store::Error::AlreadyExists { .. } => { - debug!(?error, ?td, ?validate_only) - } - - _ => error!(?error, ?td, ?validate_only), - }) { - Ok(_) => continue, - - Err(object_store::Error::AlreadyExists { .. }) => { - return Err(Error::Api(ErrorCode::TopicAlreadyExists)) - } - - _ => return Err(Error::Api(ErrorCode::UnknownServerError)), - } - } - - Ok(id) - } - - (Err(object_store::Error::AlreadyExists { .. }), _) => { - Err(Error::Api(ErrorCode::TopicAlreadyExists)) - } - - (_, _) => Err(Error::Api(ErrorCode::UnknownServerError)), - } - } - - async fn delete_records( - &self, - topics: &[DeleteRecordsTopic], - ) -> Result> { - debug!(?topics); - todo!() - } - - async fn delete_topic(&self, name: &str) -> Result { - debug!(?name); - - todo!() - } - - async fn brokers(&self) -> Result> { - let location = Path::from(format!("clusters/{}/brokers/", self.cluster)); - debug!(?location); - - let mut brokers = vec![]; - - let mut list_stream = self.object_store.list(Some(&location)); - - while let Some(meta) = list_stream - .next() - .await - .inspect(|meta| debug!(?meta)) - .transpose()? - { - let Ok(get_result) = self - .object_store - .get(&meta.location) - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(encoded) = get_result - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(broker_registration) = - serde_json::from_slice::(&encoded[..]) - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Some(listener) = broker_registration - .listeners - .iter() - .find(|listener| listener.name.as_str() == "broker") - else { - continue; - }; - - brokers.push(DescribeClusterBroker { - broker_id: broker_registration.broker_id, - host: listener.host.clone(), - port: listener.port as i32, - rack: broker_registration.rack, - }); - } - - Ok(brokers) - } - - async fn produce(&self, topition: &Topition, deflated: deflated::Batch) -> Result { - debug!(?topition, ?deflated); - - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/watermark.json", - self.cluster, topition.topic, topition.partition, - )); - - loop { - let get_result = self - .object_store - .get(&location) - .await - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError))?; - - let update_version = UpdateVersion { - e_tag: get_result.meta.e_tag.clone(), - version: get_result.meta.version.clone(), - }; - - let encoded = get_result - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError))?; - - let original = serde_json::from_slice::(&encoded[..]) - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_error| Error::Api(ErrorCode::UnknownServerError))?; - - let mut updated = original.clone(); - updated.high += deflated.record_count as i64; - - let payload = serde_json::to_vec(&updated) - .map(Bytes::from) - .map(PutPayload::from)?; - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - let options = PutOptions { - mode: PutMode::Update(update_version), - tags: TagSet::default(), - attributes, - }; - - match self - .object_store - .put_opts(&location, payload, options) - .await - .inspect_err(|error| error!(?error, ?location)) - .inspect(|put_result| debug!(?location, ?put_result)) - { - Ok(_) => { - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/records/{:0>20}.batch", - self.cluster, topition.topic, topition.partition, original.high, - )); - - let payload = self.encode(deflated)?; - let attributes = Attributes::new(); - - let options = PutOptions { - mode: PutMode::Create, - tags: TagSet::default(), - attributes, - }; - - _ = self - .object_store - .put_opts(&location, payload, options) - .await - .inspect_err(|error| error!(?error))?; - - return Ok(original.high); - } - Err(object_store::Error::Precondition { .. }) => continue, - Err(error) => { - return { - error!(?error); - Err(Error::Api(ErrorCode::UnknownServerError)) - } - } - } - } - } - - async fn fetch( - &self, - topition: &'_ Topition, - offset: i64, - min_bytes: u32, - max_bytes: u32, - ) -> Result { - debug!(?topition, ?offset, ?min_bytes, ?max_bytes); - - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/records/", - self.cluster, topition.topic, topition.partition - )); - - let mut offsets = BTreeSet::new(); - - let mut list_stream = self.object_store.list(Some(&location)); - - while let Some(meta) = list_stream - .next() - .await - .inspect(|meta| debug!(?meta)) - .transpose() - .inspect_err(|error| error!(?error, ?topition, ?offset, ?min_bytes, ?max_bytes)) - .map_err(|_| Error::Api(ErrorCode::UnknownServerError))? - { - let Some(offset) = meta.location.parts().last() else { - continue; - }; - - let offset = i64::from_str(&offset.as_ref()[0..20])?; - _ = offsets.insert(offset); - } - - let greater_or_equal = offsets.split_off(&offset); - - let Some(offset) = greater_or_equal.first() else { - return inflated::Batch::builder() - .build() - .and_then(TryInto::try_into) - .map_err(Into::into); - }; - - let location = Path::from(format!( - "clusters/{}/topics/{}/partitions/{:0>10}/records/{:0>20}.batch", - self.cluster, topition.topic, topition.partition, offset, - )); - - let get_result = self - .object_store - .get(&location) - .await - .inspect_err(|error| error!(?error, ?topition, ?offset, ?min_bytes, ?max_bytes)) - .map_err(|_| Error::Api(ErrorCode::UnknownServerError))?; - - get_result - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - .map_err(|_| Error::Api(ErrorCode::UnknownServerError)) - .and_then(|encoded| self.decode(encoded)) - .map(|mut deflated| { - deflated.base_offset = *offset; - deflated - }) - } - - async fn offset_stage(&self, topition: &Topition) -> Result { - debug!(?topition); - - self.get_watermark(topition) - .await - .map(|watermark| OffsetStage { - last_stable: watermark.stable, - high_watermark: watermark.high, - log_start: watermark.low, - }) - } - - async fn list_offsets( - &self, - offsets: &[(Topition, ListOffsetRequest)], - ) -> Result> { - debug!(?offsets); - - let mut responses = vec![]; - - for (topition, offset_request) in offsets { - responses.push(( - topition.to_owned(), - match offset_request { - ListOffsetRequest::Earliest => { - self.get_watermark(topition) - .await - .map(|watermark| ListOffsetResponse { - error_code: ErrorCode::None, - timestamp: None, - offset: Some(watermark.low), - })? - } - ListOffsetRequest::Latest => { - self.get_watermark(topition) - .await - .map(|watermark| ListOffsetResponse { - error_code: ErrorCode::None, - timestamp: None, - offset: Some(watermark.high), - })? - } - ListOffsetRequest::Timestamp(..) => todo!(), - }, - )); - } - - Ok(responses) - } - - async fn offset_commit( - &self, - group_id: &str, - retention_time_ms: Option, - offsets: &[(Topition, OffsetCommitRequest)], - ) -> Result> { - debug!(?retention_time_ms, ?group_id, ?offsets); - - let mut responses = vec![]; - - for (topition, offset_commit) in offsets { - let location = Path::from(format!( - "clusters/{}/groups/consumers/{}/offsets/{}/partitions/{:0>10}.json", - self.cluster, group_id, topition.topic, topition.partition, - )); - - let payload = serde_json::to_vec(&offset_commit) - .map(Bytes::from) - .map(PutPayload::from)?; - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - let options = PutOptions { - mode: PutMode::Overwrite, - tags: TagSet::default(), - attributes, - }; - - let error_code = self - .object_store - .put_opts(&location, payload, options) - .await - .map_or(ErrorCode::UnknownServerError, |_| ErrorCode::None); - - responses.push((topition.to_owned(), error_code)); - } - - Ok(responses) - } - - async fn offset_fetch( - &self, - group_id: Option<&str>, - topics: &[Topition], - require_stable: Option, - ) -> Result> { - debug!(?group_id, ?topics, ?require_stable); - let mut responses = BTreeMap::new(); - - if let Some(group_id) = group_id { - for topition in topics { - let location = Path::from(format!( - "clusters/{}/groups/consumers/{}/offsets/{}/partitions/{:0>10}.json", - self.cluster, group_id, topition.topic, topition.partition, - )); - - let offset = match self.object_store.get(&location).await { - Ok(get_result) => get_result - .bytes() - .await - .map_err(Error::from) - .and_then(|encoded| { - serde_json::from_slice::(&encoded[..]) - .map_err(Error::from) - }) - .map(|commit| commit.offset) - .inspect_err(|error| error!(?error, ?group_id, ?topition)) - .map_err(|_| Error::Api(ErrorCode::UnknownServerError)), - - Err(object_store::Error::NotFound { .. }) => Ok(-1), - - Err(error) => { - error!(?error, ?group_id, ?topition); - Err(Error::Api(ErrorCode::UnknownServerError)) - } - }?; - - _ = responses.insert(topition.to_owned(), offset); - } - } - - Ok(responses) - } - - async fn metadata(&self, topics: Option<&[TopicId]>) -> Result { - debug!(?topics); - - let location = Path::from(format!("clusters/{}/brokers/", self.cluster)); - - let mut brokers = vec![]; - - let mut list_stream = self.object_store.list(Some(&location)); - - while let Some(meta) = list_stream - .next() - .await - .inspect(|meta| debug!(?meta)) - .transpose()? - { - let Ok(payload) = self - .object_store - .get(&meta.location) - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(encoded) = payload - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(broker_registration) = - serde_json::from_slice::(&encoded[..]) - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Some(listener) = broker_registration - .listeners - .iter() - .find(|listener| listener.name.as_str() == "broker") - else { - continue; - }; - - brokers.push(MetadataResponseBroker { - node_id: broker_registration.broker_id, - host: listener.host.clone(), - port: listener.port as i32, - rack: broker_registration.rack, - }); - } - - let responses = match topics { - Some(topics) if !topics.is_empty() => { - let mut responses = vec![]; - - for topic in topics { - let response = match self - .topic_metadata(topic) - .await - .inspect_err(|error| error!(?error, ?location)) - { - Ok(topic_metadata) => { - let name = Some(topic_metadata.topic.name.to_owned()); - let error_code = ErrorCode::None.into(); - let topic_id = Some(topic_metadata.id.into_bytes()); - let is_internal = Some(false); - let partitions = topic_metadata.topic.num_partitions; - let replication_factor = topic_metadata.topic.replication_factor; - - debug!( - ?error_code, - ?topic_id, - ?name, - ?is_internal, - ?partitions, - ?replication_factor - ); - - let mut rng = thread_rng(); - let mut broker_ids: Vec<_> = - brokers.iter().map(|broker| broker.node_id).collect(); - broker_ids.shuffle(&mut rng); - - let mut brokers = broker_ids.into_iter().cycle(); - - let partitions = Some( - (0..partitions) - .map(|partition_index| { - let leader_id = brokers.next().expect("cycling"); - - let replica_nodes = Some( - (0..replication_factor) - .map(|_replica| brokers.next().expect("cycling")) - .collect(), - ); - let isr_nodes = replica_nodes.clone(); - - MetadataResponsePartition { - error_code, - partition_index, - leader_id, - leader_epoch: Some(-1), - replica_nodes, - isr_nodes, - offline_replicas: Some([].into()), - } - }) - .collect(), - ); - - MetadataResponseTopic { - error_code, - name, - topic_id, - is_internal, - partitions, - topic_authorized_operations: Some(-2147483648), - } - } - - Err(Error::ObjectStore(object_store::Error::NotFound { .. })) => { - MetadataResponseTopic { - error_code: ErrorCode::UnknownTopicOrPartition.into(), - name: match topic { - TopicId::Name(name) => Some(name.into()), - TopicId::Id(_) => Some("".into()), - }, - topic_id: Some(match topic { - TopicId::Name(_) => NULL_TOPIC_ID, - TopicId::Id(id) => id.into_bytes(), - }), - is_internal: Some(false), - partitions: Some([].into()), - topic_authorized_operations: Some(-2147483648), - } - } - - Err(_) => MetadataResponseTopic { - error_code: ErrorCode::UnknownServerError.into(), - name: match topic { - TopicId::Name(name) => Some(name.into()), - TopicId::Id(_) => Some("".into()), - }, - topic_id: Some(match topic { - TopicId::Name(_) => NULL_TOPIC_ID, - TopicId::Id(id) => id.into_bytes(), - }), - is_internal: Some(false), - partitions: Some([].into()), - topic_authorized_operations: Some(-2147483648), - }, - }; - - responses.push(response); - } - - responses - } - - _ => { - let location = Path::from(format!("clusters/{}/topics/", self.cluster)); - debug!(?location); - - let mut responses = vec![]; - - let Ok(list_result) = self - .object_store - .list_with_delimiter(Some(&location)) - .await - .inspect_err(|error| error!(?error, ?location)) - else { - return Ok(MetadataResponse { - cluster: Some(self.cluster.clone()), - controller: Some(self.node), - brokers, - topics: responses, - }); - }; - - debug!(?list_result); - - for meta in list_result.objects { - debug!(?meta); - - let Ok(payload) = self - .object_store - .get(&meta.location) - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(encoded) = payload - .bytes() - .await - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let Ok(topic_metadata) = serde_json::from_slice::(&encoded[..]) - .inspect_err(|error| error!(?error, ?location)) - else { - continue; - }; - - let name = Some(topic_metadata.topic.name.to_owned()); - let error_code = ErrorCode::None.into(); - let topic_id = Some(topic_metadata.id.into_bytes()); - let is_internal = Some(false); - let partitions = topic_metadata.topic.num_partitions; - let replication_factor = topic_metadata.topic.replication_factor; - - debug!( - ?error_code, - ?topic_id, - ?name, - ?is_internal, - ?partitions, - ?replication_factor - ); - - let mut rng = thread_rng(); - let mut broker_ids: Vec<_> = - brokers.iter().map(|broker| broker.node_id).collect(); - broker_ids.shuffle(&mut rng); - - let mut brokers = broker_ids.into_iter().cycle(); - - let partitions = Some( - (0..partitions) - .map(|partition_index| { - let leader_id = brokers.next().expect("cycling"); - - let replica_nodes = Some( - (0..replication_factor) - .map(|_replica| brokers.next().expect("cycling")) - .collect(), - ); - let isr_nodes = replica_nodes.clone(); - - MetadataResponsePartition { - error_code, - partition_index, - leader_id, - leader_epoch: Some(-1), - replica_nodes, - isr_nodes, - offline_replicas: Some([].into()), - } - }) - .collect(), - ); - - responses.push(MetadataResponseTopic { - error_code, - name, - topic_id, - is_internal, - partitions, - topic_authorized_operations: Some(-2147483648), - }); - } - - responses - } - }; - - Ok(MetadataResponse { - cluster: Some(self.cluster.clone()), - controller: Some(self.node), - brokers, - topics: responses, - }) - } - - async fn describe_config( - &self, - name: &str, - resource: ConfigResource, - keys: Option<&[String]>, - ) -> Result { - debug!(?name, ?resource, ?keys); - - match resource { - ConfigResource::Topic => match self.topic_metadata(&TopicId::Name(name.into())).await { - Ok(topic_metadata) => Ok(DescribeConfigsResult { - error_code: ErrorCode::None.into(), - error_message: Some("None".into()), - resource_type: i8::from(resource), - resource_name: name.into(), - configs: topic_metadata.topic.configs.map(|configs| { - configs - .iter() - .map(|config| DescribeConfigsResourceResult { - name: config.name.clone(), - value: config.value.clone(), - read_only: false, - is_default: Some(false), - config_source: Some(ConfigSource::DefaultConfig.into()), - is_sensitive: false, - synonyms: Some([].into()), - config_type: Some(ConfigType::String.into()), - documentation: Some("".into()), - }) - .collect() - }), - }), - Err(_) => todo!(), - }, - - _ => todo!(), - } - } - - async fn update_group( - &self, - group_id: &str, - detail: GroupDetail, - version: Option, - ) -> Result> { - debug!(?group_id, ?detail, ?version); - - let location = Path::from(format!( - "clusters/{}/groups/consumers/{}.json", - self.cluster, group_id, - )); - - let mut attributes = Attributes::new(); - _ = attributes.insert( - Attribute::ContentType, - AttributeValue::from(APPLICATION_JSON), - ); - - self.put(&location, detail, attributes, version.map(Into::into)) - .await - .map(Into::into) - } -}