diff --git a/ci/workflows/main.yml b/ci/workflows/main.yml index e4e10c95d73b..92906e25df9c 100644 --- a/ci/workflows/main.yml +++ b/ci/workflows/main.yml @@ -104,7 +104,7 @@ steps: files: "*-junit.xml" format: "junit" - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 10 retry: *auto-retry - label: "end-to-end test (release mode)" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 77c4cc2d0fb4..f8f35e787cd1 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::fs; use clap::ValueEnum; +use risingwave_pb::meta::SystemParams; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -112,6 +113,9 @@ pub struct RwConfig { #[serde(default)] pub storage: StorageConfig, + #[serde(default)] + pub system: SystemConfig, + #[serde(flatten)] pub unrecognized: HashMap, } @@ -451,6 +455,67 @@ impl Default for DeveloperConfig { } } +/// The section `[system]` in `risingwave.toml`. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SystemConfig { + /// The interval of periodic barrier. + #[serde(default = "default::system::barrier_interval_ms")] + pub barrier_interval_ms: u32, + + /// There will be a checkpoint for every n barriers + #[serde(default = "default::system::checkpoint_frequency")] + pub checkpoint_frequency: u64, + + /// Target size of the Sstable. + #[serde(default = "default::system::sstable_size_mb")] + pub sstable_size_mb: u32, + + /// Size of each block in bytes in SST. + #[serde(default = "default::system::block_size_kb")] + pub block_size_kb: u32, + + /// False positive probability of bloom filter. + #[serde(default = "default::system::bloom_false_positive")] + pub bloom_false_positive: f64, + + #[serde(default = "default::system::state_store")] + pub state_store: String, + + /// Remote directory for storing data and metadata objects. + #[serde(default = "default::system::data_directory")] + pub data_directory: String, + + /// Remote storage url for storing snapshots. + #[serde(default = "default::system::backup_storage_url")] + pub backup_storage_url: String, + + /// Remote directory for storing snapshots. + #[serde(default = "default::system::backup_storage_directory")] + pub backup_storage_directory: String, +} + +impl Default for SystemConfig { + fn default() -> Self { + toml::from_str("").unwrap() + } +} + +impl SystemConfig { + pub fn into_init_system_params(self) -> SystemParams { + SystemParams { + barrier_interval_ms: Some(self.barrier_interval_ms), + checkpoint_frequency: Some(self.checkpoint_frequency), + sstable_size_mb: Some(self.sstable_size_mb), + block_size_kb: Some(self.block_size_kb), + bloom_false_positive: Some(self.bloom_false_positive), + state_store: Some(self.state_store), + data_directory: Some(self.data_directory), + backup_storage_url: Some(self.backup_storage_url), + backup_storage_directory: Some(self.backup_storage_directory), + } + } +} + mod default { pub mod meta { use crate::config::MetaBackend; @@ -662,4 +727,44 @@ mod default { 1024 } } + + pub mod system { + use crate::system_param; + + pub fn barrier_interval_ms() -> u32 { + system_param::default::barrier_interval_ms() + } + + pub fn checkpoint_frequency() -> u64 { + system_param::default::checkpoint_frequency() + } + + pub fn sstable_size_mb() -> u32 { + system_param::default::sstable_size_mb() + } + + pub fn block_size_kb() -> u32 { + system_param::default::block_size_kb() + } + + pub fn bloom_false_positive() -> f64 { + system_param::default::bloom_false_positive() + } + + pub fn state_store() -> String { + system_param::default::state_store() + } + + pub fn data_directory() -> String { + system_param::default::data_directory() + } + + pub fn backup_storage_url() -> String { + system_param::default::backup_storage_url() + } + + pub fn backup_storage_directory() -> String { + system_param::default::backup_storage_directory() + } + } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index a0759c9a1d7d..712391fa6d81 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -38,7 +38,7 @@ macro_rules! for_all_undeprecated_params { { sstable_size_mb, u32, 256_u32 }, { block_size_kb, u32, 64_u32 }, { bloom_false_positive, f64, 0.001_f64 }, - { state_store, String, "hummock+memory".to_string() }, + { state_store, String, "".to_string() }, { data_directory, String, "hummock_001".to_string() }, { backup_storage_url, String, "memory".to_string() }, { backup_storage_directory, String, "backup".to_string() }, diff --git a/src/config/ci-compaction-test-meta.toml b/src/config/ci-compaction-test-meta.toml index d8248d283f61..e75ce33af3e6 100644 --- a/src/config/ci-compaction-test-meta.toml +++ b/src/config/ci-compaction-test-meta.toml @@ -21,3 +21,9 @@ total_buffer_capacity_mb = 128 cache_file_fallocate_unit_mb = 512 cache_meta_fallocate_unit_mb = 16 cache_file_max_write_size_mb = 4 + +[system] +sstable_size_mb = 256 +block_size_kb = 1024 +bloom_false_positive = 0.001 +data_directory = "hummock_001" \ No newline at end of file diff --git a/src/config/ci-compaction-test.toml b/src/config/ci-compaction-test.toml index 797ec68a7678..1ab4f6b1286b 100644 --- a/src/config/ci-compaction-test.toml +++ b/src/config/ci-compaction-test.toml @@ -19,3 +19,9 @@ total_buffer_capacity_mb = 128 cache_file_fallocate_unit_mb = 512 cache_meta_fallocate_unit_mb = 16 cache_file_max_write_size_mb = 4 + +[system] +sstable_size_mb = 256 +block_size_kb = 1024 +bloom_false_positive = 0.001 +data_directory = "hummock_001" \ No newline at end of file diff --git a/src/config/ci-iceberg-test.toml b/src/config/ci-iceberg-test.toml index a7a5db0a603f..4321dbc93d78 100644 --- a/src/config/ci-iceberg-test.toml +++ b/src/config/ci-iceberg-test.toml @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true max_heartbeat_interval_secs = 600 [streaming] -barrier_interval_ms = 5000 in_flight_barrier_nums = 10 + +[system] +barrier_interval_ms = 5000 checkpoint_frequency = 1 \ No newline at end of file diff --git a/src/config/ci-meta-backup-test.toml b/src/config/ci-meta-backup-test.toml index 732b948e476d..9aa5e2ec5a9a 100644 --- a/src/config/ci-meta-backup-test.toml +++ b/src/config/ci-meta-backup-test.toml @@ -2,3 +2,7 @@ min_sst_retention_time_sec = 0 collect_gc_watermark_spin_interval_sec = 1 vacuum_interval_sec = 10 + +[system] +backup_storage_url = "minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001" +backup_storage_directory = "backup" \ No newline at end of file diff --git a/src/config/ci-recovery.toml b/src/config/ci-recovery.toml index b2fecb8d0e35..3df87a304464 100644 --- a/src/config/ci-recovery.toml +++ b/src/config/ci-recovery.toml @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true max_heartbeat_interval_secs = 600 [streaming] -barrier_interval_ms = 250 in_flight_barrier_nums = 10 -checkpoint_frequency = 5 + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 5 \ No newline at end of file diff --git a/src/config/ci.toml b/src/config/ci.toml index 3d93081ac2d3..719d7644e30a 100644 --- a/src/config/ci.toml +++ b/src/config/ci.toml @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true max_heartbeat_interval_secs = 600 [streaming] -barrier_interval_ms = 250 in_flight_barrier_nums = 10 -checkpoint_frequency = 5 + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 5 \ No newline at end of file diff --git a/src/config/example.toml b/src/config/example.toml index 04c5d8f7a79a..b8e25ff93276 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -37,3 +37,9 @@ stream_enable_executor_row_count = false stream_connector_message_buffer_size = 16 unsafe_stream_extreme_cache_size = 1024 stream_chunk_size = 1024 + +[system] +sstable_size_mb = 256 +block_size_kb = 1024 +bloom_false_positive = 0.001 +data_directory = "hummock_001" \ No newline at end of file diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 2c91ace5ca79..e56f4592f264 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -51,10 +51,8 @@ use std::time::Duration; use clap::Parser; pub use error::{MetaError, MetaResult}; -use risingwave_common::system_param::default; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_proc_macro::OverrideConfig; -use risingwave_pb::meta::SystemParams; use crate::manager::MetaOpts; use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend}; @@ -107,42 +105,6 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] prometheus_endpoint: Option, - /// State store url. - #[clap(long, env = "RW_STATE_STORE")] - state_store: Option, - - /// The interval of periodic barrier. - #[clap(long, env = "RW_BARRIER_INTERVAL_MS", default_value_t = default::barrier_interval_ms())] - barrier_interval_ms: u32, - - /// There will be a checkpoint for every n barriers - #[clap(long, env = "RW_CHECKPOINT_FREQUENCY", default_value_t = default::checkpoint_frequency())] - pub checkpoint_frequency: u64, - - /// Target size of the Sstable. - #[clap(long, env = "RW_SSTABLE_SIZE_MB", default_value_t = default::sstable_size_mb())] - sstable_size_mb: u32, - - /// Size of each block in bytes in SST. - #[clap(long, env = "RW_BLOCK_SIZE_KB", default_value_t = default::block_size_kb())] - block_size_kb: u32, - - /// False positive probability of bloom filter. - #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE", default_value_t = default::bloom_false_positive())] - bloom_false_positive: f64, - - /// Remote directory for storing data and metadata objects. - #[clap(long, env = "RW_DATA_DIRECTORY", default_value_t = default::data_directory())] - data_directory: String, - - /// Remote storage url for storing snapshots. - #[clap(long, env = "RW_BACKUP_STORAGE_URL", default_value_t = default::backup_storage_url())] - backup_storage_url: String, - - /// Remote directory for storing snapshots. - #[clap(long, env = "RW_STORAGE_DIRECTORY", default_value_t = default::backup_storage_directory())] - backup_storage_directory: String, - /// Endpoint of the connector node, there will be a sidecar connector node /// colocated with Meta node in the cloud environment #[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")] @@ -164,6 +126,46 @@ pub struct OverrideConfigOpts { #[clap(long, env = "RW_BACKEND", value_enum)] #[override_opts(path = meta.backend)] backend: Option, + + /// The interval of periodic barrier. + #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] + #[override_opts(path = system.barrier_interval_ms)] + barrier_interval_ms: Option, + + /// Target size of the Sstable. + #[clap(long, env = "RW_SSTABLE_SIZE_MB")] + #[override_opts(path = system.sstable_size_mb)] + sstable_size_mb: Option, + + /// Size of each block in bytes in SST. + #[clap(long, env = "RW_BLOCK_SIZE_KB")] + #[override_opts(path = system.block_size_kb)] + block_size_kb: Option, + + /// False positive probability of bloom filter. + #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] + #[override_opts(path = system.bloom_false_positive)] + bloom_false_positive: Option, + + /// State store url + #[clap(long, env = "RW_STATE_STORE")] + #[override_opts(path = system.state_store)] + state_store: Option, + + /// Remote directory for storing data and metadata objects. + #[clap(long, env = "RW_DATA_DIRECTORY")] + #[override_opts(path = system.data_directory)] + data_directory: Option, + + /// Remote storage url for storing snapshots. + #[clap(long, env = "RW_BACKUP_STORAGE_URL")] + #[override_opts(path = system.backup_storage_url)] + backup_storage_url: Option, + + /// Remote directory for storing snapshots. + #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] + #[override_opts(path = system.backup_storage_directory)] + backup_storage_directory: Option, } use std::future::Future; @@ -244,17 +246,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .periodic_ttl_reclaim_compaction_interval_sec, }, - SystemParams { - barrier_interval_ms: Some(opts.barrier_interval_ms), - checkpoint_frequency: Some(opts.checkpoint_frequency), - sstable_size_mb: Some(opts.sstable_size_mb), - block_size_kb: Some(opts.block_size_kb), - bloom_false_positive: Some(opts.bloom_false_positive), - state_store: Some(opts.state_store.unwrap_or_default()), - data_directory: Some(opts.data_directory), - backup_storage_url: Some(opts.backup_storage_url), - backup_storage_directory: Some(opts.backup_storage_directory), - }, + config.system.into_init_system_params(), ) .await .unwrap(); diff --git a/src/storage/backup/integration_tests/common.sh b/src/storage/backup/integration_tests/common.sh index 8b4048776d11..163f4736f3ab 100644 --- a/src/storage/backup/integration_tests/common.sh +++ b/src/storage/backup/integration_tests/common.sh @@ -19,9 +19,6 @@ function clean_etcd_data() { function start_cluster() { cargo make d ci-meta-backup-test 1>/dev/null 2>&1 - execute_sql_and_expect \ - "alter system set backup_storage_url to \"minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001\";" \ - "ALTER_SYSTEM" sleep 5 } diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index dba58cff8ff0..31da6b876edd 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -138,13 +138,16 @@ pub async fn start_meta_node(listen_addr: String, config_path: String) { "--config-path", &config_path, ]); - // We set a large checkpoint frequency to prevent the embedded meta node - // to commit new epochs to avoid bumping the hummock version during version log replay. - assert_eq!(CHECKPOINT_FREQ_FOR_REPLAY, meta_opts.checkpoint_frequency); let config = load_config( &meta_opts.config_path, Some(meta_opts.override_opts.clone()), ); + // We set a large checkpoint frequency to prevent the embedded meta node + // to commit new epochs to avoid bumping the hummock version during version log replay. + assert_eq!( + CHECKPOINT_FREQ_FOR_REPLAY, + config.system.checkpoint_frequency + ); assert!( config.meta.enable_compaction_deterministic, "enable_compaction_deterministic should be set" diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 5224f389e1b6..d0a905feb046 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -29,6 +29,14 @@ use sqllogictest::AsyncDB; use crate::client::RisingWave; +/// Embed the config file and create a temporary file at runtime. +static CONFIG_PATH: LazyLock = LazyLock::new(|| { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("risingwave.toml")) + .expect("failed to write config file"); + file.into_temp_path() +}); + /// RisingWave cluster configuration. #[derive(Debug, Clone)] pub struct Configuration { @@ -46,9 +54,6 @@ pub struct Configuration { /// The number of meta nodes. pub meta_nodes: usize, - /// Extra CLI arguments for meta nodes. - pub meta_node_extra_args: Vec<&'static str>, - /// The number of compactor nodes. pub compactor_nodes: usize, @@ -68,16 +73,10 @@ impl Configuration { /// Returns the config for scale test. pub fn for_scale() -> Self { Configuration { - config_path: "".to_string(), + config_path: CONFIG_PATH.as_os_str().to_string_lossy().into(), frontend_nodes: 2, compute_nodes: 3, meta_nodes: 1, - meta_node_extra_args: vec![ - "--barrier-interval-ms", - "250", - "--checkpoint-frequency", - "4", - ], compactor_nodes: 2, compute_node_cores: 2, etcd_timeout_rate: 0.0, @@ -196,25 +195,19 @@ impl Cluster { // meta node for i in 1..=conf.meta_nodes { - let opts = risingwave_meta::MetaNodeOpts::parse_from( - [ - vec![ - "meta-node", - "--config-path", - &conf.config_path, - "--listen-addr", - "0.0.0.0:5690", - "--advertise-addr", - &format!("meta-{i}:5690"), - "--backend", - "etcd", - "--etcd-endpoints", - "etcd:2388", - ], - conf.meta_node_extra_args.clone(), - ] - .concat(), - ); + let opts = risingwave_meta::MetaNodeOpts::parse_from([ + "meta-node", + "--config-path", + &conf.config_path, + "--listen-addr", + "0.0.0.0:5690", + "--advertise-addr", + &format!("meta-{i}:5690"), + "--backend", + "etcd", + "--etcd-endpoints", + "etcd:2388", + ]); handle .create_node() .name(format!("meta-{i}")) diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 92607c205b8f..219db2ae4844 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -159,7 +159,6 @@ async fn main() { compactor_nodes: args.compactor_nodes, compute_node_cores: args.compute_node_cores, meta_nodes: args.meta_nodes, - meta_node_extra_args: vec![], etcd_timeout_rate: args.etcd_timeout_rate, etcd_data_path: args.etcd_data, }; diff --git a/src/tests/simulation/src/risingwave.toml b/src/tests/simulation/src/risingwave.toml new file mode 100644 index 000000000000..72e6f7bcf228 --- /dev/null +++ b/src/tests/simulation/src/risingwave.toml @@ -0,0 +1,7 @@ +# The configuration for scaling simulation test. +# +# Note: this file is embedded in the binary and cannot be changed without recompiling. + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 4 \ No newline at end of file