diff --git a/Cargo.lock b/Cargo.lock index b10396cb7b53..0bf52e54affe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,39 +876,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "benchmarks" -version = "0.8.2" -dependencies = [ - "api", - "arrow", - "chrono", - "clap 4.5.4", - "common-base", - "common-telemetry", - "common-wal", - "dotenv", - "futures", - "futures-util", - "humantime", - "humantime-serde", - "indicatif", - "itertools 0.10.5", - "lazy_static", - "log-store", - "mito2", - "num_cpus", - "parquet", - "prometheus", - "rand", - "rskafka", - "serde", - "store-api", - "tokio", - "toml 0.8.13", - "uuid", -] - [[package]] name = "bigdecimal" version = "0.4.3" @@ -4850,19 +4817,6 @@ dependencies = [ "serde", ] -[[package]] -name = "indicatif" -version = "0.17.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" -dependencies = [ - "console", - "instant", - "number_prefix", - "portable-atomic", - "unicode-width", -] - [[package]] name = "indoc" version = "2.0.5" @@ -6467,12 +6421,6 @@ dependencies = [ "libc", ] -[[package]] -name = "number_prefix" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" - [[package]] name = "objc" version = "0.2.7" diff --git a/Cargo.toml b/Cargo.toml index 0cb9d9b93999..9260028e66ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,5 @@ [workspace] members = [ - "benchmarks", "src/api", "src/auth", "src/catalog", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml deleted file mode 100644 index 1ce91acaf786..000000000000 --- a/benchmarks/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "benchmarks" -version.workspace = true -edition.workspace = true -license.workspace = true - -[lints] -workspace = true - -[dependencies] -api.workspace = true -arrow.workspace = true -chrono.workspace = true -clap.workspace = true -common-base.workspace = true -common-telemetry.workspace = true -common-wal.workspace = true -dotenv.workspace = true -futures.workspace = true -futures-util.workspace = true -humantime.workspace = true -humantime-serde.workspace = true -indicatif = "0.17.1" -itertools.workspace = true -lazy_static.workspace = true -log-store.workspace = true -mito2.workspace = true -num_cpus.workspace = true -parquet.workspace = true -prometheus.workspace = true -rand.workspace = true -rskafka.workspace = true -serde.workspace = true -store-api.workspace = true -tokio.workspace = true -toml.workspace = true -uuid.workspace = true diff --git a/benchmarks/README.md b/benchmarks/README.md deleted file mode 100644 index c281af38293e..000000000000 --- a/benchmarks/README.md +++ /dev/null @@ -1,11 +0,0 @@ -Benchmarkers for GreptimeDB --------------------------------- - -## Wal Benchmarker -The wal benchmarker serves to evaluate the performance of GreptimeDB's Write-Ahead Log (WAL) component. It meticulously assesses the read/write performance of the WAL under diverse workloads generated by the benchmarker. - - -### How to use -To compile the benchmarker, navigate to the `greptimedb/benchmarks` directory and execute `cargo build --release`. Subsequently, you'll find the compiled target located at `greptimedb/target/release/wal_bench`. - -The `./wal_bench -h` command reveals numerous arguments that the target accepts. Among these, a notable one is the `cfg-file` argument. By utilizing a configuration file in the TOML format, you can bypass the need to repeatedly specify cumbersome arguments. diff --git a/benchmarks/config/wal_bench.example.toml b/benchmarks/config/wal_bench.example.toml deleted file mode 100644 index 72faed0d7410..000000000000 --- a/benchmarks/config/wal_bench.example.toml +++ /dev/null @@ -1,21 +0,0 @@ -# Refers to the documents of `Args` in benchmarks/src/wal.rs`. -wal_provider = "kafka" -bootstrap_brokers = ["localhost:9092"] -num_workers = 10 -num_topics = 32 -num_regions = 1000 -num_scrapes = 1000 -num_rows = 5 -col_types = "ifs" -max_batch_size = "512KB" -linger = "1ms" -backoff_init = "10ms" -backoff_max = "1ms" -backoff_base = 2 -backoff_deadline = "3s" -compression = "zstd" -rng_seed = 42 -skip_read = false -skip_write = false -random_topics = true -report_metrics = false diff --git a/benchmarks/src/bin/wal_bench.rs b/benchmarks/src/bin/wal_bench.rs deleted file mode 100644 index 6caa7b699871..000000000000 --- a/benchmarks/src/bin/wal_bench.rs +++ /dev/null @@ -1,326 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![feature(int_roundings)] - -use std::fs; -use std::sync::Arc; -use std::time::Instant; - -use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; -use benchmarks::metrics; -use benchmarks::wal_bench::{Args, Config, Region, WalProvider}; -use clap::Parser; -use common_telemetry::info; -use common_wal::config::kafka::common::BackoffConfig; -use common_wal::config::kafka::DatanodeKafkaConfig as KafkaConfig; -use common_wal::config::raft_engine::RaftEngineConfig; -use common_wal::options::{KafkaWalOptions, WalOptions}; -use itertools::Itertools; -use log_store::kafka::log_store::KafkaLogStore; -use log_store::raft_engine::log_store::RaftEngineLogStore; -use mito2::wal::Wal; -use prometheus::{Encoder, TextEncoder}; -use rand::distributions::{Alphanumeric, DistString}; -use rand::rngs::SmallRng; -use rand::SeedableRng; -use rskafka::client::partition::Compression; -use rskafka::client::ClientBuilder; -use store_api::logstore::LogStore; -use store_api::storage::RegionId; - -async fn run_benchmarker(cfg: &Config, topics: &[String], wal: Arc>) { - let chunk_size = cfg.num_regions.div_ceil(cfg.num_workers); - let region_chunks = (0..cfg.num_regions) - .map(|id| { - build_region( - id as u64, - topics, - &mut SmallRng::seed_from_u64(cfg.rng_seed), - cfg, - ) - }) - .chunks(chunk_size as usize) - .into_iter() - .map(|chunk| Arc::new(chunk.collect::>())) - .collect::>(); - - let mut write_elapsed = 0; - let mut read_elapsed = 0; - - if !cfg.skip_write { - info!("Benchmarking write ..."); - - let num_scrapes = cfg.num_scrapes; - let timer = Instant::now(); - futures::future::join_all((0..cfg.num_workers).map(|i| { - let wal = wal.clone(); - let regions = region_chunks[i as usize].clone(); - tokio::spawn(async move { - for _ in 0..num_scrapes { - let mut wal_writer = wal.writer(); - regions - .iter() - .for_each(|region| region.add_wal_entry(&mut wal_writer)); - wal_writer.write_to_wal().await.unwrap(); - } - }) - })) - .await; - write_elapsed += timer.elapsed().as_millis(); - } - - if !cfg.skip_read { - info!("Benchmarking read ..."); - - let timer = Instant::now(); - futures::future::join_all((0..cfg.num_workers).map(|i| { - let wal = wal.clone(); - let regions = region_chunks[i as usize].clone(); - tokio::spawn(async move { - for region in regions.iter() { - region.replay(&wal).await; - } - }) - })) - .await; - read_elapsed = timer.elapsed().as_millis(); - } - - dump_report(cfg, write_elapsed, read_elapsed); -} - -fn build_region(id: u64, topics: &[String], rng: &mut SmallRng, cfg: &Config) -> Region { - let wal_options = match cfg.wal_provider { - WalProvider::Kafka => { - assert!(!topics.is_empty()); - WalOptions::Kafka(KafkaWalOptions { - topic: topics.get(id as usize % topics.len()).cloned().unwrap(), - }) - } - WalProvider::RaftEngine => WalOptions::RaftEngine, - }; - Region::new( - RegionId::from_u64(id), - build_schema(&parse_col_types(&cfg.col_types), rng), - wal_options, - cfg.num_rows, - cfg.rng_seed, - ) -} - -fn build_schema(col_types: &[ColumnDataType], mut rng: &mut SmallRng) -> Vec { - col_types - .iter() - .map(|col_type| ColumnSchema { - column_name: Alphanumeric.sample_string(&mut rng, 5), - datatype: *col_type as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - }) - .chain(vec![ColumnSchema { - column_name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - semantic_type: SemanticType::Tag as i32, - datatype_extension: None, - }]) - .collect() -} - -fn dump_report(cfg: &Config, write_elapsed: u128, read_elapsed: u128) { - let cost_report = format!( - "write costs: {} ms, read costs: {} ms", - write_elapsed, read_elapsed, - ); - - let total_written_bytes = metrics::METRIC_WAL_WRITE_BYTES_TOTAL.get() as u128; - let write_throughput = if write_elapsed > 0 { - (total_written_bytes * 1000).div_floor(write_elapsed) - } else { - 0 - }; - let total_read_bytes = metrics::METRIC_WAL_READ_BYTES_TOTAL.get() as u128; - let read_throughput = if read_elapsed > 0 { - (total_read_bytes * 1000).div_floor(read_elapsed) - } else { - 0 - }; - - let throughput_report = format!( - "total written bytes: {} bytes, total read bytes: {} bytes, write throuput: {} bytes/s ({} mb/s), read throughput: {} bytes/s ({} mb/s)", - total_written_bytes, - total_read_bytes, - write_throughput, - write_throughput.div_floor(1 << 20), - read_throughput, - read_throughput.div_floor(1 << 20), - ); - - let metrics_report = if cfg.report_metrics { - let mut buffer = Vec::new(); - let encoder = TextEncoder::new(); - let metrics = prometheus::gather(); - encoder.encode(&metrics, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap() - } else { - String::new() - }; - - info!( - r#" -Benchmark config: -{cfg:?} - -Benchmark report: -{cost_report} -{throughput_report} -{metrics_report}"# - ); -} - -async fn create_topics(cfg: &Config) -> Vec { - // Creates topics. - let client = ClientBuilder::new(cfg.bootstrap_brokers.clone()) - .build() - .await - .unwrap(); - let ctrl_client = client.controller_client().unwrap(); - let (topics, tasks): (Vec<_>, Vec<_>) = (0..cfg.num_topics) - .map(|i| { - let topic = if cfg.random_topics { - format!( - "greptime_wal_bench_topic_{}_{}", - uuid::Uuid::new_v4().as_u128(), - i - ) - } else { - format!("greptime_wal_bench_topic_{}", i) - }; - let task = ctrl_client.create_topic( - topic.clone(), - 1, - cfg.bootstrap_brokers.len() as i16, - 2000, - ); - (topic, task) - }) - .unzip(); - // Must ignore errors since we allow topics being created more than once. - let _ = futures::future::try_join_all(tasks).await; - - topics -} - -fn parse_compression(comp: &str) -> Compression { - match comp { - "no" => Compression::NoCompression, - "gzip" => Compression::Gzip, - "lz4" => Compression::Lz4, - "snappy" => Compression::Snappy, - "zstd" => Compression::Zstd, - other => unreachable!("Unrecognized compression {other}"), - } -} - -fn parse_col_types(col_types: &str) -> Vec { - let parts = col_types.split('x').collect::>(); - assert!(parts.len() <= 2); - - let pattern = parts[0]; - let repeat = parts - .get(1) - .map(|r| r.parse::().unwrap()) - .unwrap_or(1); - - pattern - .chars() - .map(|c| match c { - 'i' | 'I' => ColumnDataType::Int64, - 'f' | 'F' => ColumnDataType::Float64, - 's' | 'S' => ColumnDataType::String, - other => unreachable!("Cannot parse {other} as a column data type"), - }) - .cycle() - .take(pattern.len() * repeat) - .collect() -} - -fn main() { - // Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones. - std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error"); - common_telemetry::init_default_ut_logging(); - - let args = Args::parse(); - let cfg = if !args.cfg_file.is_empty() { - toml::from_str(&fs::read_to_string(&args.cfg_file).unwrap()).unwrap() - } else { - Config::from(args) - }; - - // Validates arguments. - if cfg.num_regions < cfg.num_workers { - panic!("num_regions must be greater than or equal to num_workers"); - } - if cfg - .num_workers - .min(cfg.num_topics) - .min(cfg.num_regions) - .min(cfg.num_scrapes) - .min(cfg.max_batch_size.as_bytes() as u32) - .min(cfg.bootstrap_brokers.len() as u32) - == 0 - { - panic!("Invalid arguments"); - } - - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - match cfg.wal_provider { - WalProvider::Kafka => { - let topics = create_topics(&cfg).await; - let kafka_cfg = KafkaConfig { - broker_endpoints: cfg.bootstrap_brokers.clone(), - max_batch_size: cfg.max_batch_size, - linger: cfg.linger, - backoff: BackoffConfig { - init: cfg.backoff_init, - max: cfg.backoff_max, - base: cfg.backoff_base, - deadline: Some(cfg.backoff_deadline), - }, - compression: parse_compression(&cfg.compression), - ..Default::default() - }; - let store = Arc::new(KafkaLogStore::try_new(&kafka_cfg).await.unwrap()); - let wal = Arc::new(Wal::new(store)); - run_benchmarker(&cfg, &topics, wal).await; - } - WalProvider::RaftEngine => { - // The benchmarker assumes the raft engine directory exists. - let store = RaftEngineLogStore::try_new( - "/tmp/greptimedb/raft-engine-wal".to_string(), - RaftEngineConfig::default(), - ) - .await - .map(Arc::new) - .unwrap(); - let wal = Arc::new(Wal::new(store)); - run_benchmarker(&cfg, &[], wal).await; - } - } - }); -} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs deleted file mode 100644 index bab08887f765..000000000000 --- a/benchmarks/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod metrics; -pub mod wal_bench; diff --git a/benchmarks/src/metrics.rs b/benchmarks/src/metrics.rs deleted file mode 100644 index d65fd1eb9aa0..000000000000 --- a/benchmarks/src/metrics.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use lazy_static::lazy_static; -use prometheus::*; - -/// Logstore label. -pub const LOGSTORE_LABEL: &str = "logstore"; -/// Operation type label. -pub const OPTYPE_LABEL: &str = "optype"; - -lazy_static! { - /// Counters of bytes of each operation on a logstore. - pub static ref METRIC_WAL_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( - "greptime_bench_wal_op_bytes_total", - "wal operation bytes total", - &[OPTYPE_LABEL], - ) - .unwrap(); - /// Counter of bytes of the append_batch operation. - pub static ref METRIC_WAL_WRITE_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( - &["write"], - ); - /// Counter of bytes of the read operation. - pub static ref METRIC_WAL_READ_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( - &["read"], - ); -} diff --git a/benchmarks/src/wal_bench.rs b/benchmarks/src/wal_bench.rs deleted file mode 100644 index 681dacfbb60e..000000000000 --- a/benchmarks/src/wal_bench.rs +++ /dev/null @@ -1,366 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::mem::size_of; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use api::v1::value::ValueData; -use api::v1::{ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, Value, WalEntry}; -use clap::{Parser, ValueEnum}; -use common_base::readable_size::ReadableSize; -use common_wal::options::WalOptions; -use futures::StreamExt; -use mito2::wal::{Wal, WalWriter}; -use rand::distributions::{Alphanumeric, DistString, Uniform}; -use rand::rngs::SmallRng; -use rand::{Rng, SeedableRng}; -use serde::{Deserialize, Serialize}; -use store_api::logstore::provider::Provider; -use store_api::logstore::LogStore; -use store_api::storage::RegionId; - -use crate::metrics; - -/// The wal provider. -#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum WalProvider { - #[default] - RaftEngine, - Kafka, -} - -#[derive(Parser)] -pub struct Args { - /// The provided configuration file. - /// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`. - #[clap(long, short = 'c')] - pub cfg_file: String, - - /// The wal provider. - #[clap(long, value_enum, default_value_t = WalProvider::default())] - pub wal_provider: WalProvider, - - /// The advertised addresses of the kafka brokers. - /// If there're multiple bootstrap brokers, their addresses should be separated by comma, for e.g. "localhost:9092,localhost:9093". - #[clap(long, short = 'b', default_value = "localhost:9092")] - pub bootstrap_brokers: String, - - /// The number of workers each running in a dedicated thread. - #[clap(long, default_value_t = num_cpus::get() as u32)] - pub num_workers: u32, - - /// The number of kafka topics to be created. - #[clap(long, default_value_t = 32)] - pub num_topics: u32, - - /// The number of regions. - #[clap(long, default_value_t = 1000)] - pub num_regions: u32, - - /// The number of times each region is scraped. - #[clap(long, default_value_t = 1000)] - pub num_scrapes: u32, - - /// The number of rows in each wal entry. - /// Each time a region is scraped, a wal entry containing will be produced. - #[clap(long, default_value_t = 5)] - pub num_rows: u32, - - /// The column types of the schema for each region. - /// Currently, three column types are supported: - /// - i = ColumnDataType::Int64 - /// - f = ColumnDataType::Float64 - /// - s = ColumnDataType::String - /// For e.g., "ifs" will be parsed as three columns: i64, f64, and string. - /// - /// Additionally, a "x" sign can be provided to repeat the column types for a given number of times. - /// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64. - /// This feature is useful if you want to specify many columns. - #[clap(long, default_value = "ifs")] - pub col_types: String, - - /// The maximum size of a batch of kafka records. - /// The default value is 1mb. - #[clap(long, default_value = "512KB")] - pub max_batch_size: ReadableSize, - - /// The minimum latency the kafka client issues a batch of kafka records. - /// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch. - #[clap(long, default_value = "1ms")] - pub linger: String, - - /// The initial backoff delay of the kafka consumer. - #[clap(long, default_value = "10ms")] - pub backoff_init: String, - - /// The maximum backoff delay of the kafka consumer. - #[clap(long, default_value = "1s")] - pub backoff_max: String, - - /// The exponential backoff rate of the kafka consumer. The next back off = base * the current backoff. - #[clap(long, default_value_t = 2)] - pub backoff_base: u32, - - /// The deadline of backoff. The backoff ends if the total backoff delay reaches the deadline. - #[clap(long, default_value = "3s")] - pub backoff_deadline: String, - - /// The client-side compression algorithm for kafka records. - #[clap(long, default_value = "zstd")] - pub compression: String, - - /// The seed of random number generators. - #[clap(long, default_value_t = 42)] - pub rng_seed: u64, - - /// Skips the read phase, aka. region replay, if set to true. - #[clap(long, default_value_t = false)] - pub skip_read: bool, - - /// Skips the write phase if set to true. - #[clap(long, default_value_t = false)] - pub skip_write: bool, - - /// Randomly generates topic names if set to true. - /// Useful when you want to run the benchmarker without worrying about the topics created before. - #[clap(long, default_value_t = false)] - pub random_topics: bool, - - /// Logs out the gathered prometheus metrics when the benchmarker ends. - #[clap(long, default_value_t = false)] - pub report_metrics: bool, -} - -/// Benchmarker config. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Config { - pub wal_provider: WalProvider, - pub bootstrap_brokers: Vec, - pub num_workers: u32, - pub num_topics: u32, - pub num_regions: u32, - pub num_scrapes: u32, - pub num_rows: u32, - pub col_types: String, - pub max_batch_size: ReadableSize, - #[serde(with = "humantime_serde")] - pub linger: Duration, - #[serde(with = "humantime_serde")] - pub backoff_init: Duration, - #[serde(with = "humantime_serde")] - pub backoff_max: Duration, - pub backoff_base: u32, - #[serde(with = "humantime_serde")] - pub backoff_deadline: Duration, - pub compression: String, - pub rng_seed: u64, - pub skip_read: bool, - pub skip_write: bool, - pub random_topics: bool, - pub report_metrics: bool, -} - -impl From for Config { - fn from(args: Args) -> Self { - let cfg = Self { - wal_provider: args.wal_provider, - bootstrap_brokers: args - .bootstrap_brokers - .split(',') - .map(ToString::to_string) - .collect::>(), - num_workers: args.num_workers.min(num_cpus::get() as u32), - num_topics: args.num_topics, - num_regions: args.num_regions, - num_scrapes: args.num_scrapes, - num_rows: args.num_rows, - col_types: args.col_types, - max_batch_size: args.max_batch_size, - linger: humantime::parse_duration(&args.linger).unwrap(), - backoff_init: humantime::parse_duration(&args.backoff_init).unwrap(), - backoff_max: humantime::parse_duration(&args.backoff_max).unwrap(), - backoff_base: args.backoff_base, - backoff_deadline: humantime::parse_duration(&args.backoff_deadline).unwrap(), - compression: args.compression, - rng_seed: args.rng_seed, - skip_read: args.skip_read, - skip_write: args.skip_write, - random_topics: args.random_topics, - report_metrics: args.report_metrics, - }; - - cfg - } -} - -/// The region used for wal benchmarker. -pub struct Region { - id: RegionId, - schema: Vec, - provider: Provider, - next_sequence: AtomicU64, - next_entry_id: AtomicU64, - next_timestamp: AtomicI64, - rng: Mutex>, - num_rows: u32, -} - -impl Region { - /// Creates a new region. - pub fn new( - id: RegionId, - schema: Vec, - wal_options: WalOptions, - num_rows: u32, - rng_seed: u64, - ) -> Self { - let provider = match wal_options { - WalOptions::RaftEngine => Provider::raft_engine_provider(id.as_u64()), - WalOptions::Kafka(opts) => Provider::kafka_provider(opts.topic), - }; - Self { - id, - schema, - provider, - next_sequence: AtomicU64::new(1), - next_entry_id: AtomicU64::new(1), - next_timestamp: AtomicI64::new(1655276557000), - rng: Mutex::new(Some(SmallRng::seed_from_u64(rng_seed))), - num_rows, - } - } - - /// Scrapes the region and adds the generated entry to wal. - pub fn add_wal_entry(&self, wal_writer: &mut WalWriter) { - let mutation = Mutation { - op_type: OpType::Put as i32, - sequence: self - .next_sequence - .fetch_add(self.num_rows as u64, Ordering::Relaxed), - rows: Some(self.build_rows()), - }; - let entry = WalEntry { - mutations: vec![mutation], - }; - metrics::METRIC_WAL_WRITE_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); - - wal_writer - .add_entry( - self.id, - self.next_entry_id.fetch_add(1, Ordering::Relaxed), - &entry, - &self.provider, - ) - .unwrap(); - } - - /// Replays the region. - pub async fn replay(&self, wal: &Arc>) { - let mut wal_stream = wal.scan(self.id, 0, &self.provider).unwrap(); - while let Some(res) = wal_stream.next().await { - let (_, entry) = res.unwrap(); - metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); - } - } - - /// Computes the estimated size in bytes of the entry. - pub fn entry_estimated_size(entry: &WalEntry) -> usize { - let wrapper_size = size_of::() - + entry.mutations.capacity() * size_of::() - + size_of::(); - - let rows = entry.mutations[0].rows.as_ref().unwrap(); - - let schema_size = rows.schema.capacity() * size_of::() - + rows - .schema - .iter() - .map(|s| s.column_name.capacity()) - .sum::(); - let values_size = (rows.rows.capacity() * size_of::()) - + rows - .rows - .iter() - .map(|r| r.values.capacity() * size_of::()) - .sum::(); - - wrapper_size + schema_size + values_size - } - - fn build_rows(&self) -> Rows { - let cols = self - .schema - .iter() - .map(|col_schema| { - let col_data_type = ColumnDataType::try_from(col_schema.datatype).unwrap(); - self.build_col(&col_data_type, self.num_rows) - }) - .collect::>(); - - let rows = (0..self.num_rows) - .map(|i| { - let values = cols.iter().map(|col| col[i as usize].clone()).collect(); - Row { values } - }) - .collect(); - - Rows { - schema: self.schema.clone(), - rows, - } - } - - fn build_col(&self, col_data_type: &ColumnDataType, num_rows: u32) -> Vec { - let mut rng_guard = self.rng.lock().unwrap(); - let rng = rng_guard.as_mut().unwrap(); - match col_data_type { - ColumnDataType::TimestampMillisecond => (0..num_rows) - .map(|_| { - let ts = self.next_timestamp.fetch_add(1000, Ordering::Relaxed); - Value { - value_data: Some(ValueData::TimestampMillisecondValue(ts)), - } - }) - .collect(), - ColumnDataType::Int64 => (0..num_rows) - .map(|_| { - let v = rng.sample(Uniform::new(0, 10_000)); - Value { - value_data: Some(ValueData::I64Value(v)), - } - }) - .collect(), - ColumnDataType::Float64 => (0..num_rows) - .map(|_| { - let v = rng.sample(Uniform::new(0.0, 5000.0)); - Value { - value_data: Some(ValueData::F64Value(v)), - } - }) - .collect(), - ColumnDataType::String => (0..num_rows) - .map(|_| { - let v = Alphanumeric.sample_string(rng, 10); - Value { - value_data: Some(ValueData::StringValue(v)), - } - }) - .collect(), - _ => unreachable!(), - } - } -} diff --git a/config/config.md b/config/config.md index 97efadc3177d..8a39272827a2 100644 --- a/config/config.md +++ b/config/config.md @@ -66,8 +66,7 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.max_batch_size` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | -| `wal.linger` | String | `200ms` | The linger duration of a kafka batch producer.
**It's only used when the provider is `kafka`**. | +| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. | | `wal.backoff_init` | String | `500ms` | The initial backoff delay.
**It's only used when the provider is `kafka`**. | | `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. | @@ -342,8 +341,7 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.max_batch_size` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | -| `wal.linger` | String | `200ms` | The linger duration of a kafka batch producer.
**It's only used when the provider is `kafka`**. | +| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. | | `wal.backoff_init` | String | `500ms` | The initial backoff delay.
**It's only used when the provider is `kafka`**. | | `wal.backoff_max` | String | `10s` | The maximum backoff delay.
**It's only used when the provider is `kafka`**. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 16ffbbb0a4b9..a5da6be266df 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -132,11 +132,7 @@ broker_endpoints = ["127.0.0.1:9092"] ## The max size of a single producer batch. ## Warning: Kafka has a default limit of 1MB per message in a topic. ## **It's only used when the provider is `kafka`**. -max_batch_size = "1MB" - -## The linger duration of a kafka batch producer. -## **It's only used when the provider is `kafka`**. -linger = "200ms" +max_batch_bytes = "1MB" ## The consumer wait timeout. ## **It's only used when the provider is `kafka`**. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index d6fcc3e8943e..6209f71f1f2a 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -175,11 +175,7 @@ broker_endpoints = ["127.0.0.1:9092"] ## The max size of a single producer batch. ## Warning: Kafka has a default limit of 1MB per message in a topic. ## **It's only used when the provider is `kafka`**. -max_batch_size = "1MB" - -## The linger duration of a kafka batch producer. -## **It's only used when the provider is `kafka`**. -linger = "200ms" +max_batch_bytes = "1MB" ## The consumer wait timeout. ## **It's only used when the provider is `kafka`**. diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 072763782750..3a1b87b86af0 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -101,8 +101,7 @@ impl From for DatanodeWalConfig { StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, compression: config.compression, - max_batch_size: config.max_batch_size, - linger: config.linger, + max_batch_bytes: config.max_batch_bytes, consumer_wait_timeout: config.consumer_wait_timeout, backoff: config.backoff, }), @@ -176,7 +175,7 @@ mod tests { topic_name_prefix = "greptimedb_wal_topic" replication_factor = 1 create_topic_timeout = "30s" - max_batch_size = "1MB" + max_batch_bytes = "1MB" linger = "200ms" consumer_wait_timeout = "100ms" backoff_init = "500ms" @@ -209,8 +208,7 @@ mod tests { let expected = DatanodeKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: Compression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), + max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { init: Duration::from_millis(500), @@ -232,8 +230,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), compression: Compression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), + max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { init: Duration::from_millis(500), diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b15d13dffc2a..b50ac685537d 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -30,11 +30,10 @@ pub struct DatanodeKafkaConfig { /// The compression algorithm used to compress kafka records. #[serde(skip)] pub compression: Compression, + /// TODO(weny): Remove the alias once we release v0.9. /// The max size of a single producer batch. - pub max_batch_size: ReadableSize, - /// The linger duration of a kafka batch producer. - #[serde(with = "humantime_serde")] - pub linger: Duration, + #[serde(alias = "max_batch_size")] + pub max_batch_bytes: ReadableSize, /// The consumer wait timeout. #[serde(with = "humantime_serde")] pub consumer_wait_timeout: Duration, @@ -49,8 +48,7 @@ impl Default for DatanodeKafkaConfig { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], compression: Compression::NoCompression, // Warning: Kafka has a default limit of 1MB per message in a topic. - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), + max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), } diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index 3da8fa498092..ddee160bf642 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -43,11 +43,10 @@ pub struct StandaloneKafkaConfig { /// The compression algorithm used to compress kafka records. #[serde(skip)] pub compression: Compression, + /// TODO(weny): Remove the alias once we release v0.9. /// The max size of a single producer batch. - pub max_batch_size: ReadableSize, - /// The linger duration of a kafka batch producer. - #[serde(with = "humantime_serde")] - pub linger: Duration, + #[serde(alias = "max_batch_size")] + pub max_batch_bytes: ReadableSize, /// The consumer wait timeout. #[serde(with = "humantime_serde")] pub consumer_wait_timeout: Duration, @@ -70,8 +69,7 @@ impl Default for StandaloneKafkaConfig { create_topic_timeout: Duration::from_secs(30), compression: Compression::NoCompression, // Warning: Kafka has a default limit of 1MB per message in a topic. - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), + max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index f3c26b50df13..7d324d81ef09 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -44,4 +44,5 @@ common-wal = { workspace = true, features = ["testing"] } itertools.workspace = true rand.workspace = true rand_distr = "0.4" +rskafka = { workspace = true, features = ["unstable-fuzzing"] } uuid.workspace = true diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 280ce6410609..4918bdf3567b 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -21,6 +21,8 @@ use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; +use crate::kafka::producer::ProduceRequest; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -129,6 +131,12 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display("Failed to found client"))] + ClientNotFount { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to resolve Kafka broker endpoint."))] ResolveKafkaEndpoint { source: common_wal::error::Error }, @@ -186,6 +194,14 @@ pub enum Error { error: rskafka::client::producer::Error, }, + #[snafu(display("Failed to produce batch records to Kafka"))] + BatchProduce { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))] ConsumeRecord { topic: String, @@ -244,6 +260,40 @@ pub enum Error { last_index: u64, attempt_index: u64, }, + + #[snafu(display("Failed to send produce request"))] + SendProduceRequest { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::mpsc::error::SendError, + }, + + #[snafu(display("Failed to send produce request"))] + WaitProduceResultReceiver { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::oneshot::error::RecvError, + }, + + #[snafu(display( + "The length of meta if exceeded the limit: {}, actual: {}", + limit, + actual + ))] + MetaLengthExceededLimit { + #[snafu(implicit)] + location: Location, + limit: usize, + actual: usize, + }, + + #[snafu(display("No max value"))] + NoMaxValue { + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 415cc53ddbce..a1cb2dc1b18b 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,6 +14,7 @@ pub(crate) mod client_manager; pub mod log_store; +pub(crate) mod producer; pub(crate) mod util; use serde::{Deserialize, Serialize}; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 81feaddb6627..5f686e6ace5e 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -16,62 +16,58 @@ use std::collections::HashMap; use std::sync::Arc; use common_wal::config::kafka::DatanodeKafkaConfig; -use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; -use rskafka::client::producer::aggregator::RecordAggregator; -use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; -use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; +use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; +use rskafka::client::ClientBuilder; use rskafka::BackoffConfig; use snafu::ResultExt; -use tokio::sync::RwLock; +use store_api::logstore::provider::KafkaProvider; +use tokio::sync::{Mutex, RwLock}; +use super::producer::OrderedBatchProducer; use crate::error::{ BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, }; -use crate::kafka::util::record::MIN_BATCH_SIZE; +use crate::kafka::producer::OrderedBatchProducerRef; // Each topic only has one partition for now. // The `DEFAULT_PARTITION` refers to the index of the partition. const DEFAULT_PARTITION: i32 = 0; +// Max batch size for a `OrderedBatchProducer` to handle requests. +const REQUEST_BATCH_SIZE: usize = 64; + /// Arc wrapper of ClientManager. pub(crate) type ClientManagerRef = Arc; -/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. -/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. +/// Topic client. #[derive(Debug, Clone)] pub(crate) struct Client { - /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. - pub(crate) raw_client: Arc, - /// A producer used to buffer log entries for a specific topic before sending them in a batching manner. - pub(crate) producer: Arc>, + client: Arc, + producer: OrderedBatchProducerRef, } impl Client { - /// Creates a Client from the raw client. - pub(crate) fn new(raw_client: Arc, config: &DatanodeKafkaConfig) -> Self { - let record_aggregator = - RecordAggregator::new((config.max_batch_size.as_bytes() as usize).max(MIN_BATCH_SIZE)); - let batch_producer = BatchProducerBuilder::new(raw_client.clone()) - .with_compression(config.compression) - .with_linger(config.linger) - .build(record_aggregator); - - Self { - raw_client, - producer: Arc::new(batch_producer), - } + pub(crate) fn client(&self) -> &Arc { + &self.client + } + + pub(crate) fn producer(&self) -> &OrderedBatchProducerRef { + &self.producer } } /// Manages client construction and accesses. #[derive(Debug)] pub(crate) struct ClientManager { - pub(crate) config: DatanodeKafkaConfig, - /// Top-level client in kafka. All clients are constructed by this client. - client_factory: RsKafkaClient, - /// A pool maintaining a collection of clients. - /// Key: a topic. Value: the associated client of the topic. - client_pool: RwLock>, + client: rskafka::client::Client, + /// Used to initialize a new [Client]. + mutex: Mutex<()>, + instances: RwLock, Client>>, + + producer_channel_size: usize, + producer_request_batch_size: usize, + flush_batch_size: usize, + compression: Compression, } impl ClientManager { @@ -96,48 +92,70 @@ impl ClientManager { })?; Ok(Self { - config: config.clone(), - client_factory: client, - client_pool: RwLock::new(HashMap::new()), + client, + mutex: Mutex::new(()), + instances: RwLock::new(HashMap::new()), + producer_channel_size: REQUEST_BATCH_SIZE * 2, + producer_request_batch_size: REQUEST_BATCH_SIZE, + flush_batch_size: config.max_batch_bytes.as_bytes() as usize, + compression: config.compression, }) } - /// Gets the client associated with the topic. If the client does not exist, a new one will - /// be created and returned. - pub(crate) async fn get_or_insert(&self, topic: &String) -> Result { - { - let client_pool = self.client_pool.read().await; - if let Some(client) = client_pool.get(topic) { - return Ok(client.clone()); - } - } + async fn try_insert(&self, provider: &Arc) -> Result { + let _guard = self.mutex.lock().await; - let mut client_pool = self.client_pool.write().await; - match client_pool.get(topic) { - Some(client) => Ok(client.clone()), + let client = self.instances.read().await.get(provider).cloned(); + match client { + Some(client) => Ok(client), None => { - let client = self.try_create_client(topic).await?; - client_pool.insert(topic.clone(), client.clone()); + let client = self.try_create_client(provider).await?; + self.instances + .write() + .await + .insert(provider.clone(), client.clone()); Ok(client) } } } - async fn try_create_client(&self, topic: &String) -> Result { + /// Gets the client associated with the topic. If the client does not exist, a new one will + /// be created and returned. + pub(crate) async fn get_or_insert(&self, provider: &Arc) -> Result { + let client = self.instances.read().await.get(provider).cloned(); + match client { + Some(client) => Ok(client), + None => self.try_insert(provider).await, + } + } + + async fn try_create_client(&self, provider: &Arc) -> Result { // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. // The reconnecting won't stop until succeed or a different error returns. - let raw_client = self - .client_factory - .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + let client = self + .client + .partition_client( + provider.topic.as_str(), + DEFAULT_PARTITION, + UnknownTopicHandling::Retry, + ) .await .context(BuildPartitionClientSnafu { - topic, + topic: &provider.topic, partition: DEFAULT_PARTITION, }) .map(Arc::new)?; - Ok(Client::new(raw_client, &self.config)) + let producer = Arc::new(OrderedBatchProducer::new( + client.clone(), + self.compression, + self.producer_channel_size, + self.producer_request_batch_size, + self.flush_batch_size, + )); + + Ok(Client { client, producer }) } } @@ -147,7 +165,32 @@ mod tests { use tokio::sync::Barrier; use super::*; - use crate::test_util::kafka::create_topics; + + /// Creates `num_topiocs` number of topics each will be decorated by the given decorator. + pub async fn create_topics( + num_topics: usize, + decorator: F, + broker_endpoints: &[String], + ) -> Vec + where + F: Fn(usize) -> String, + { + assert!(!broker_endpoints.is_empty()); + let client = ClientBuilder::new(broker_endpoints.to_vec()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) + .map(|i| { + let topic = decorator(i); + let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); + (topic, task) + }) + .unzip(); + futures::future::try_join_all(tasks).await.unwrap(); + topics + } /// Prepares for a test in that a collection of topics and a client manager are created. async fn prepare( @@ -184,12 +227,16 @@ mod tests { // Gets all clients sequentially. for (_, topic) in region_topic { - manager.get_or_insert(topic).await.unwrap(); + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + manager.get_or_insert(&provider).await.unwrap(); } // Ensures all clients exist. - let client_pool = manager.client_pool.read().await; - let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); + let client_pool = manager.instances.read().await; + let all_exist = topics.iter().all(|topic| { + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + client_pool.contains_key(&provider) + }); assert!(all_exist); }) }) @@ -215,17 +262,22 @@ mod tests { .map(|topic| { let manager = manager.clone(); let barrier = barrier.clone(); + tokio::spawn(async move { barrier.wait().await; - assert!(manager.get_or_insert(&topic).await.is_ok()); + let provider = Arc::new(KafkaProvider::new(topic)); + assert!(manager.get_or_insert(&provider).await.is_ok()); }) }) .collect::>(); futures::future::try_join_all(tasks).await.unwrap(); // Ensures all clients exist. - let client_pool = manager.client_pool.read().await; - let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); + let client_pool = manager.instances.read().await; + let all_exist = topics.iter().all(|topic| { + let provider = Arc::new(KafkaProvider::new(topic.to_string())); + client_pool.contains_key(&provider) + }); assert!(all_exist); }) }) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index ceca6fc30bd7..19518575315e 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use common_telemetry::{debug, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; +use futures::future::try_join_all; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; @@ -30,26 +32,32 @@ use store_api::storage::RegionId; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; -use crate::kafka::util::offset::Offset; +use crate::kafka::producer::OrderedBatchProducerRef; use crate::kafka::util::record::{ - maybe_emit_entry, remaining_entries, Record, RecordProducer, ESTIMATED_META_SIZE, + convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE, }; use crate::metrics; /// A log store backed by Kafka. #[derive(Debug)] pub struct KafkaLogStore { - config: DatanodeKafkaConfig, - /// Manages kafka clients through which the log store contact the Kafka cluster. + /// The manager of topic clients. client_manager: ClientManagerRef, + /// The max size of a batch. + max_batch_bytes: usize, + /// The consumer wait timeout. + consumer_wait_timeout: Duration, } impl KafkaLogStore { /// Tries to create a Kafka log store. pub async fn try_new(config: &DatanodeKafkaConfig) -> Result { + let client_manager = Arc::new(ClientManager::try_new(config).await?); + Ok(Self { - client_manager: Arc::new(ClientManager::try_new(config).await?), - config: config.clone(), + client_manager, + max_batch_bytes: config.max_batch_bytes.as_bytes() as usize, + consumer_wait_timeout: config.consumer_wait_timeout, }) } } @@ -109,8 +117,7 @@ impl LogStore for KafkaLogStore { actual: provider.type_name(), })?; - let max_data_size = - self.client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE; + let max_data_size = self.max_batch_bytes - ESTIMATED_META_SIZE; Ok(build_entry( data, entry_id, @@ -120,7 +127,6 @@ impl LogStore for KafkaLogStore { )) } - // TODO(weny): refactor the writing. /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { @@ -137,39 +143,55 @@ impl LogStore for KafkaLogStore { return Ok(AppendBatchResponse::default()); } - // Groups entries by region id and pushes them to an associated record producer. - let mut producers = HashMap::with_capacity(entries.len()); + let region_ids = entries + .iter() + .map(|entry| entry.region_id()) + .collect::>(); + let mut region_grouped_records: HashMap)> = + HashMap::with_capacity(region_ids.len()); for entry in entries { - let provider = entry - .provider() - .as_kafka_provider() - .context(error::InvalidProviderSnafu { + let provider = entry.provider().as_kafka_provider().with_context(|| { + error::InvalidProviderSnafu { expected: KafkaProvider::type_name(), actual: entry.provider().type_name(), - })? - .clone(); - producers - .entry(entry.region_id()) - .or_insert_with(|| RecordProducer::new(provider)) - .push(entry); + } + })?; + let region_id = entry.region_id(); + match region_grouped_records.entry(region_id) { + std::collections::hash_map::Entry::Occupied(mut slot) => { + slot.get_mut().1.extend(convert_to_kafka_records(entry)?); + } + std::collections::hash_map::Entry::Vacant(slot) => { + let producer = self + .client_manager + .get_or_insert(provider) + .await? + .producer() + .clone(); + + slot.insert((producer, convert_to_kafka_records(entry)?)); + } + } } - // Produces entries for each region and gets the offset those entries written to. - // The returned offset is then converted into an entry id. - let last_entry_ids = futures::future::try_join_all(producers.into_iter().map( - |(region_id, producer)| async move { - let entry_id = producer - .produce(&self.client_manager) - .await - .map(TryInto::try_into)??; - Ok((region_id, entry_id)) - }, - )) - .await? - .into_iter() - .collect::>(); + let mut region_grouped_result_receivers = Vec::with_capacity(region_ids.len()); + for (region_id, (producer, records)) in region_grouped_records { + // Safety: `KafkaLogStore::entry` will ensure that the + // `Record`'s `approximate_size` must be less or equal to `max_batch_bytes`. + region_grouped_result_receivers.push((region_id, producer.produce(records).await?)) + } - Ok(AppendBatchResponse { last_entry_ids }) + let region_grouped_max_offset = + try_join_all(region_grouped_result_receivers.into_iter().map( + |(region_id, receiver)| async move { + receiver.wait().await.map(|offset| (region_id, offset)) + }, + )) + .await?; + + Ok(AppendBatchResponse { + last_entry_ids: region_grouped_max_offset.into_iter().collect(), + }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids. @@ -192,9 +214,9 @@ impl LogStore for KafkaLogStore { // Gets the client associated with the topic. let client = self .client_manager - .get_or_insert(&provider.topic) + .get_or_insert(provider) .await? - .raw_client + .client() .clone(); // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. @@ -209,7 +231,7 @@ impl LogStore for KafkaLogStore { })? - 1; // Reads entries with offsets in the range [start_offset, end_offset]. - let start_offset = Offset::try_from(entry_id)?.0; + let start_offset = entry_id as i64; debug!( "Start reading entries in range [{}, {}] for ns {}", @@ -227,8 +249,8 @@ impl LogStore for KafkaLogStore { } let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) - .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) - .with_max_wait_ms(self.config.consumer_wait_timeout.as_millis() as i32) + .with_max_batch_size(self.max_batch_bytes as i32) + .with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32) .build(); debug!( @@ -440,7 +462,7 @@ mod tests { .collect::>(); let config = DatanodeKafkaConfig { broker_endpoints, - max_batch_size: ReadableSize::kb(32), + max_batch_bytes: ReadableSize::kb(32), ..Default::default() }; let logstore = KafkaLogStore::try_new(&config).await.unwrap(); @@ -509,7 +531,7 @@ mod tests { .collect::>(); let config = DatanodeKafkaConfig { broker_endpoints, - max_batch_size: ReadableSize::kb(8), + max_batch_bytes: ReadableSize::kb(8), ..Default::default() }; let logstore = KafkaLogStore::try_new(&config).await.unwrap(); diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs new file mode 100644 index 000000000000..aaa76834cbdb --- /dev/null +++ b/src/log-store/src/kafka/producer.rs @@ -0,0 +1,474 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use common_telemetry::{debug, warn}; +use futures::future::try_join_all; +use rskafka::client::partition::Compression; +use rskafka::client::producer::ProducerClient; +use rskafka::record::Record; +use snafu::{OptionExt, ResultExt}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::oneshot; + +use crate::error::{self, NoMaxValueSnafu, Result}; + +pub struct ProduceRequest { + batch: Vec, + sender: oneshot::Sender, +} + +#[derive(Default)] +struct ProduceResultReceiver { + receivers: Vec>>>, +} + +impl ProduceResultReceiver { + fn add_receiver(&mut self, receiver: oneshot::Receiver>>) { + self.receivers.push(receiver) + } + + async fn wait(self) -> Result { + Ok(try_join_all(self.receivers) + .await + .into_iter() + .flatten() + .collect::>>()? + .into_iter() + .flatten() + .max() + .context(NoMaxValueSnafu)? as u64) + } +} + +struct BackgroundProducerWorker { + /// The [`ProducerClient`]. + client: Arc, + // The compression configuration. + compression: Compression, + // The running flag. + running: Arc, + /// Receiver of [ProduceRequest]. + receiver: Receiver, + /// Max batch size for a worker to handle requests. + request_batch_size: usize, + /// Max bytes size for a single flush. + max_batch_bytes: usize, + /// The [PendingRequest]s. + pending_requests: Vec, +} + +struct PendingRequest { + batch: Vec, + size: usize, + sender: oneshot::Sender>>, +} + +/// ## Panic +/// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`. +fn handle_produce_requests( + requests: &mut Vec, + max_batch_bytes: usize, +) -> Vec { + let mut records_buffer = vec![]; + let mut batch_size = 0; + let mut pending_requests = Vec::with_capacity(requests.len()); + + for ProduceRequest { batch, sender } in requests.drain(..) { + let mut receiver = ProduceResultReceiver::default(); + for record in batch { + assert!(record.approximate_size() <= max_batch_bytes); + // Yields the `PendingRequest` if buffer is full. + if batch_size + record.approximate_size() > max_batch_bytes { + let (tx, rx) = oneshot::channel(); + pending_requests.push(PendingRequest { + batch: std::mem::take(&mut records_buffer), + size: batch_size, + sender: tx, + }); + batch_size = 0; + receiver.add_receiver(rx); + } + + batch_size += record.approximate_size(); + records_buffer.push(record); + } + // The remaining records. + if batch_size > 0 { + // Yields `PendingRequest` + let (tx, rx) = oneshot::channel(); + pending_requests.push(PendingRequest { + batch: std::mem::take(&mut records_buffer), + size: batch_size, + sender: tx, + }); + batch_size = 0; + receiver.add_receiver(rx); + } + + let _ = sender.send(receiver); + } + pending_requests +} + +async fn do_flush( + client: &Arc, + PendingRequest { + batch, + sender, + size: _size, + }: PendingRequest, + compression: Compression, +) { + let result = client + .produce(batch, compression) + .await + .context(error::BatchProduceSnafu); + + if let Err(err) = sender.send(result) { + warn!(err; "BatchFlushState Receiver is dropped"); + } +} + +impl BackgroundProducerWorker { + async fn run(&mut self) { + let mut buffer = Vec::with_capacity(self.request_batch_size); + while self.running.load(Ordering::Relaxed) { + // Processes pending requests first. + if !self.pending_requests.is_empty() { + // TODO(weny): Considering merge `PendingRequest`s. + for req in self.pending_requests.drain(..) { + do_flush(&self.client, req, self.compression).await + } + } + match self.receiver.recv().await { + Some(req) => { + buffer.clear(); + buffer.push(req); + for _ in 1..self.request_batch_size { + match self.receiver.try_recv() { + Ok(req) => buffer.push(req), + Err(_) => break, + } + } + self.pending_requests = + handle_produce_requests(&mut buffer, self.max_batch_bytes); + } + None => { + debug!("The sender is dropped, BackgroundProducerWorker exited"); + // Exits the loop if the `sender` is dropped. + break; + } + } + } + } +} + +pub type OrderedBatchProducerRef = Arc; + +/// [`OrderedBatchProducer`] attempts to aggregate multiple produce requests together +#[derive(Debug)] +pub(crate) struct OrderedBatchProducer { + sender: Sender, + /// Used to control the [`BackgroundProducerWorker`]. + running: Arc, +} + +impl Drop for OrderedBatchProducer { + fn drop(&mut self) { + self.running.store(false, Ordering::Relaxed); + } +} + +/// Receives the committed offsets when data has been committed to Kafka +/// or an unrecoverable error has been encountered. +pub(crate) struct ProduceResultHandle { + receiver: oneshot::Receiver, +} + +impl ProduceResultHandle { + /// Waits for the data has been committed to Kafka. + /// Returns the **max** committed offsets. + pub(crate) async fn wait(self) -> Result { + self.receiver + .await + .context(error::WaitProduceResultReceiverSnafu)? + .wait() + .await + } +} + +impl OrderedBatchProducer { + /// Constructs a new [`OrderedBatchProducer`]. + pub(crate) fn new( + client: Arc, + compression: Compression, + channel_size: usize, + request_batch_size: usize, + max_batch_bytes: usize, + ) -> Self { + let (tx, rx) = mpsc::channel(channel_size); + let running = Arc::new(AtomicBool::new(true)); + let mut worker = BackgroundProducerWorker { + client, + compression, + running: running.clone(), + receiver: rx, + request_batch_size, + max_batch_bytes, + pending_requests: vec![], + }; + tokio::spawn(async move { worker.run().await }); + Self { + sender: tx, + running, + } + } + + /// Writes `data` to the [`OrderedBatchProducer`]. + /// + /// Returns the [ProduceResultHandle], which will receive a result when data has been committed to Kafka + /// or an unrecoverable error has been encountered. + /// + /// ## Panic + /// Panic if any [Record]'s `approximate_size` > `max_batch_bytes`. + pub(crate) async fn produce(&self, batch: Vec) -> Result { + let receiver = { + let (tx, rx) = oneshot::channel(); + self.sender + .send(ProduceRequest { batch, sender: tx }) + .await + .context(error::SendProduceRequestSnafu)?; + rx + }; + + Ok(ProduceResultHandle { receiver }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use chrono::{TimeZone, Utc}; + use common_base::readable_size::ReadableSize; + use common_telemetry::debug; + use futures::future::BoxFuture; + use futures::stream::FuturesUnordered; + use futures::{FutureExt, StreamExt}; + use rskafka::client::error::{Error as ClientError, RequestContext}; + use rskafka::client::partition::Compression; + use rskafka::client::producer::ProducerClient; + use rskafka::protocol::error::Error as ProtocolError; + use rskafka::record::Record; + + use crate::kafka::producer::OrderedBatchProducer; + + #[derive(Debug)] + struct MockClient { + error: Option, + panic: Option, + delay: Duration, + batch_sizes: Mutex>, + } + + impl ProducerClient for MockClient { + fn produce( + &self, + records: Vec, + _compression: Compression, + ) -> BoxFuture<'_, Result, ClientError>> { + Box::pin(async move { + tokio::time::sleep(self.delay).await; + + if let Some(e) = self.error { + return Err(ClientError::ServerError { + protocol_error: e, + error_message: None, + request: RequestContext::Partition("foo".into(), 1), + response: None, + is_virtual: false, + }); + } + + if let Some(p) = self.panic.as_ref() { + panic!("{}", p); + } + + let mut batch_sizes = self.batch_sizes.lock().unwrap(); + let offset_base = batch_sizes.iter().sum::(); + let offsets = (0..records.len()) + .map(|x| (x + offset_base) as i64) + .collect(); + batch_sizes.push(records.len()); + debug!("Return offsets: {offsets:?}"); + Ok(offsets) + }) + } + } + + fn record() -> Record { + Record { + key: Some(vec![0; 4]), + value: Some(vec![0; 6]), + headers: Default::default(), + timestamp: Utc.timestamp_millis_opt(320).unwrap(), + } + } + + #[tokio::test] + async fn test_producer() { + common_telemetry::init_default_ut_logging(); + let record = record(); + let delay = Duration::from_secs(0); + let client = Arc::new(MockClient { + error: None, + panic: None, + delay, + batch_sizes: Default::default(), + }); + + let producer = OrderedBatchProducer::new( + client.clone(), + Compression::NoCompression, + 128, + 64, + ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + ); + + // Produces 3 records + let handle = producer + .produce(vec![record.clone(), record.clone(), record.clone()]) + .await + .unwrap(); + assert_eq!(handle.wait().await.unwrap(), 2); + assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1]); + + // Produces 2 records + let handle = producer + .produce(vec![record.clone(), record.clone()]) + .await + .unwrap(); + assert_eq!(handle.wait().await.unwrap(), 4); + assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2]); + + // Produces 1 records + let handle = producer.produce(vec![record.clone()]).await.unwrap(); + assert_eq!(handle.wait().await.unwrap(), 5); + assert_eq!(client.batch_sizes.lock().unwrap().as_slice(), &[2, 1, 2, 1]); + } + + #[tokio::test] + async fn test_producer_client_error() { + let record = record(); + let client = Arc::new(MockClient { + error: Some(ProtocolError::NetworkException), + panic: None, + delay: Duration::from_millis(1), + batch_sizes: Default::default(), + }); + + let producer = OrderedBatchProducer::new( + client.clone(), + Compression::NoCompression, + 128, + 64, + ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + ); + + let mut futures = FuturesUnordered::new(); + futures.push( + producer + .produce(vec![record.clone(), record.clone(), record.clone()]) + .await + .unwrap() + .wait(), + ); + futures.push( + producer + .produce(vec![record.clone(), record.clone()]) + .await + .unwrap() + .wait(), + ); + futures.push(producer.produce(vec![record.clone()]).await.unwrap().wait()); + + futures.next().await.unwrap().unwrap_err(); + futures.next().await.unwrap().unwrap_err(); + futures.next().await.unwrap().unwrap_err(); + } + + #[tokio::test] + async fn test_producer_cancel() { + let record = record(); + let client = Arc::new(MockClient { + error: None, + panic: None, + delay: Duration::from_millis(1), + batch_sizes: Default::default(), + }); + + let producer = OrderedBatchProducer::new( + client.clone(), + Compression::NoCompression, + 128, + 64, + ReadableSize((record.approximate_size() * 2) as u64).as_bytes() as usize, + ); + + let a = producer + .produce(vec![record.clone(), record.clone(), record.clone()]) + .await + .unwrap() + .wait() + .fuse(); + + let b = producer.produce(vec![record]).await.unwrap().wait().fuse(); + + let mut b = Box::pin(b); + + { + // Cancel a when it exits this block + let mut a = Box::pin(a); + + // Select biased to encourage `a` to be the one with the linger that + // expires first and performs the produce operation + futures::select_biased! { + _ = &mut a => panic!("a should not have flushed"), + _ = &mut b => panic!("b should not have flushed"), + _ = tokio::time::sleep(Duration::from_millis(1)).fuse() => {}, + } + } + + // But `b` should still complete successfully + tokio::time::timeout(Duration::from_secs(1), b) + .await + .unwrap() + .unwrap(); + + assert_eq!( + client + .batch_sizes + .lock() + .unwrap() + .as_slice() + .iter() + .sum::(), + 4 + ); + } +} diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs index 61059b16451f..52d575cbce40 100644 --- a/src/log-store/src/kafka/util.rs +++ b/src/log-store/src/kafka/util.rs @@ -12,5 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod offset; pub mod record; diff --git a/src/log-store/src/kafka/util/offset.rs b/src/log-store/src/kafka/util/offset.rs deleted file mode 100644 index 8c1c66b9f9f5..000000000000 --- a/src/log-store/src/kafka/util/offset.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::error::{CastSnafu, Result}; -use crate::kafka::EntryId; - -/// A wrapper of kafka offset. -pub(crate) struct Offset(pub i64); - -impl TryFrom for EntryId { - type Error = crate::error::Error; - - fn try_from(offset: Offset) -> Result { - EntryId::try_from(offset.0).map_err(|_| CastSnafu.build()) - } -} - -impl TryFrom for Offset { - type Error = crate::error::Error; - - fn try_from(entry_id: EntryId) -> Result { - i64::try_from(entry_id) - .map(Offset) - .map_err(|_| CastSnafu.build()) - } -} diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index fa6f77171645..db43c05a447f 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -23,24 +23,18 @@ use store_api::logstore::provider::{KafkaProvider, Provider}; use store_api::storage::RegionId; use crate::error::{ - DecodeJsonSnafu, EmptyEntriesSnafu, EncodeJsonSnafu, GetClientSnafu, IllegalSequenceSnafu, - MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, + DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu, + MissingKeySnafu, MissingValueSnafu, Result, }; -use crate::kafka::client_manager::ClientManagerRef; -use crate::kafka::util::offset::Offset; use crate::kafka::{EntryId, NamespaceImpl}; -use crate::metrics; /// The current version of Record. pub(crate) const VERSION: u32 = 0; /// The estimated size in bytes of a serialized RecordMeta. -/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_size - ESTIMATED_META_SIZE. +/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_byte - ESTIMATED_META_SIZE. pub(crate) const ESTIMATED_META_SIZE: usize = 256; -/// The minimum batch size -pub(crate) const MIN_BATCH_SIZE: usize = 4 * 1024; - /// The type of a record. /// /// - If the entry is able to fit into a Kafka record, it's converted into a Full record. @@ -96,6 +90,13 @@ impl TryFrom for KafkaRecord { fn try_from(record: Record) -> Result { let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?; + ensure!( + key.len() < ESTIMATED_META_SIZE, + MetaLengthExceededLimitSnafu { + limit: ESTIMATED_META_SIZE, + actual: key.len() + } + ); Ok(KafkaRecord { key: Some(key), value: Some(record.data), @@ -117,77 +118,9 @@ impl TryFrom for Record { } } -/// Produces a record to a kafka topic. -pub(crate) struct RecordProducer { - /// The provide of the entries. - provider: Arc, - /// Entries are buffered before being built into a record. - entries: Vec, -} - -impl RecordProducer { - /// Creates a new producer for producing entries with the given namespace. - pub(crate) fn new(provider: Arc) -> Self { - Self { - provider, - entries: Vec::new(), - } - } - - /// Pushes an entry into the entry buffer. - pub(crate) fn push(&mut self, entry: Entry) { - self.entries.push(entry); - } - - /// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records. - /// Returns the offset of the last successfully produced record. - // TODO(niebayes): maybe requires more fine-grained metrics to measure stages of writing to kafka. - pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { - ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); - - // Gets the producer in which a record buffer is maintained. - let producer = client_manager - .get_or_insert(&self.provider.topic) - .await - .map_err(|e| { - GetClientSnafu { - topic: &self.provider.topic, - error: e.to_string(), - } - .build() - })? - .producer; - - // Stores the offset of the last successfully produced record. - let mut last_offset = None; - for entry in self.entries { - for record in convert_to_records(entry) { - let kafka_record = KafkaRecord::try_from(record)?; - - metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc(); - metrics::METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL - .inc_by(kafka_record.approximate_size() as u64); - - // Records of a certain region cannot be produced in parallel since their order must be static. - let offset = producer - .produce(kafka_record.clone()) - .await - .map(Offset) - .with_context(|_| ProduceRecordSnafu { - topic: &self.provider.topic, - size: kafka_record.approximate_size(), - })?; - last_offset = Some(offset); - } - } - // Safety: there must be at least one record produced when the entries are guaranteed not empty. - Ok(last_offset.unwrap()) - } -} - -fn convert_to_records(entry: Entry) -> Vec { +pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result> { match entry { - Entry::Naive(entry) => vec![Record { + Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record { meta: RecordMeta { version: VERSION, tp: RecordType::Full, @@ -200,7 +133,7 @@ fn convert_to_records(entry: Entry) -> Vec { }, }, data: entry.data, - }], + })?]), Entry::MultiplePart(entry) => { let mut entries = Vec::with_capacity(entry.parts.len()); @@ -210,7 +143,7 @@ fn convert_to_records(entry: Entry) -> Vec { MultiplePartHeader::Middle(i) => RecordType::Middle(i), MultiplePartHeader::Last => RecordType::Last, }; - entries.push(Record { + entries.push(KafkaRecord::try_from(Record { meta: RecordMeta { version: VERSION, tp, @@ -222,9 +155,9 @@ fn convert_to_records(entry: Entry) -> Vec { }, }, data: part, - }) + })?) } - entries + Ok(entries) } } } @@ -511,4 +444,20 @@ mod tests { let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err(); assert_matches!(err, error::Error::IllegalSequence { .. }); } + + #[test] + fn test_meta_size() { + let meta = RecordMeta { + version: VERSION, + tp: RecordType::Middle(usize::MAX), + entry_id: u64::MAX, + ns: NamespaceImpl { + region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(), + topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()), + }, + }; + let serialized = serde_json::to_vec(&meta).unwrap(); + // The len of serialized data is 202. + assert!(serialized.len() < ESTIMATED_META_SIZE); + } } diff --git a/src/log-store/src/test_util.rs b/src/log-store/src/test_util.rs index ce5ba3eb854f..973d6d3f9720 100644 --- a/src/log-store/src/test_util.rs +++ b/src/log-store/src/test_util.rs @@ -12,6 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(test)] -pub mod kafka; pub mod log_store_util; diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs deleted file mode 100644 index d56d9b9405b2..000000000000 --- a/src/log-store/src/test_util/kafka.rs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering}; -use std::sync::Mutex; - -use rand::distributions::Alphanumeric; -use rand::rngs::ThreadRng; -use rand::{thread_rng, Rng}; -use rskafka::client::ClientBuilder; -use store_api::logstore::EntryId; - -use crate::kafka::{EntryImpl, NamespaceImpl}; - -/// Creates `num_topiocs` number of topics each will be decorated by the given decorator. -pub async fn create_topics( - num_topics: usize, - decorator: F, - broker_endpoints: &[String], -) -> Vec -where - F: Fn(usize) -> String, -{ - assert!(!broker_endpoints.is_empty()); - let client = ClientBuilder::new(broker_endpoints.to_vec()) - .build() - .await - .unwrap(); - let ctrl_client = client.controller_client().unwrap(); - let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) - .map(|i| { - let topic = decorator(i); - let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); - (topic, task) - }) - .unzip(); - futures::future::try_join_all(tasks).await.unwrap(); - topics -} - -/// Creates a new Kafka namespace with the given topic and region id. -pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl { - NamespaceImpl { - topic: topic.to_string(), - region_id, - } -} - -/// A builder for building entries for a namespace. -pub struct EntryBuilder { - /// The namespace of the entries. - ns: NamespaceImpl, - /// The next entry id to allocate. It starts from 0 by default. - next_entry_id: AtomicEntryId, - /// A generator for supporting random data generation. - /// Wrapped with Mutex> to provide interior mutability. - rng: Mutex>, -} - -impl EntryBuilder { - /// Creates an EntryBuilder for the given namespace. - pub fn new(ns: NamespaceImpl) -> Self { - Self { - ns, - next_entry_id: AtomicEntryId::new(0), - rng: Mutex::new(Some(thread_rng())), - } - } - - /// Sets the next entry id to the given entry id. - pub fn next_entry_id(self, entry_id: EntryId) -> Self { - Self { - next_entry_id: AtomicEntryId::new(entry_id), - ..self - } - } - - /// Skips the next `step` entry ids and returns the next entry id after the stepping. - pub fn skip(&mut self, step: EntryId) -> EntryId { - let old = self.next_entry_id.fetch_add(step, Ordering::Relaxed); - old + step - } - - /// Builds an entry with the given data. - pub fn with_data>(&self, data: D) -> EntryImpl { - EntryImpl { - data: data.as_ref().to_vec(), - id: self.alloc_entry_id(), - ns: self.ns.clone(), - } - } - - /// Builds an entry with random data. - pub fn with_random_data(&self) -> EntryImpl { - self.with_data(self.make_random_data()) - } - - fn alloc_entry_id(&self) -> EntryId { - self.next_entry_id.fetch_add(1, Ordering::Relaxed) - } - - fn make_random_data(&self) -> Vec { - let mut guard = self.rng.lock().unwrap(); - let rng = guard.as_mut().unwrap(); - (0..42).map(|_| rng.sample(Alphanumeric)).collect() - } -} - -/// Builds a batch of entries each with random data. -pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec { - (0..batch_size) - .map(|_| builder.with_random_data()) - .collect() -} diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index e714088d89ec..dacdf5088227 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::path::Path; -use std::time::Duration; use common_base::readable_size::ReadableSize; use common_wal::config::kafka::DatanodeKafkaConfig; @@ -36,7 +35,6 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { KafkaLogStore::try_new(&DatanodeKafkaConfig { broker_endpoints, - linger: Duration::from_millis(1), ..Default::default() }) .await diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs index f893a47df54f..16f907f3b439 100644 --- a/src/store-api/src/logstore/provider.rs +++ b/src/store-api/src/logstore/provider.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::storage::RegionId; // The Provider of kafka log store -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct KafkaProvider { pub topic: String, } diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index adceb0243c26..2cef9287aeb0 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -113,7 +113,6 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec