Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(config): bring back system params to config file #8623

Merged
merged 7 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (release mode)"
Expand Down
105 changes: 105 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::fs;

use clap::ValueEnum;
use risingwave_pb::meta::SystemParams;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -112,6 +113,9 @@ pub struct RwConfig {
#[serde(default)]
pub storage: StorageConfig,

#[serde(default)]
pub system: SystemConfig,

#[serde(flatten)]
pub unrecognized: HashMap<String, Value>,
}
Expand Down Expand Up @@ -451,6 +455,67 @@ impl Default for DeveloperConfig {
}
}

/// The section `[system]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SystemConfig {
/// The interval of periodic barrier.
#[serde(default = "default::system::barrier_interval_ms")]
pub barrier_interval_ms: u32,

/// There will be a checkpoint for every n barriers
#[serde(default = "default::system::checkpoint_frequency")]
pub checkpoint_frequency: u64,

/// Target size of the Sstable.
#[serde(default = "default::system::sstable_size_mb")]
pub sstable_size_mb: u32,

/// Size of each block in bytes in SST.
#[serde(default = "default::system::block_size_kb")]
pub block_size_kb: u32,

/// False positive probability of bloom filter.
#[serde(default = "default::system::bloom_false_positive")]
pub bloom_false_positive: f64,

#[serde(default = "default::system::state_store")]
pub state_store: String,

/// Remote directory for storing data and metadata objects.
#[serde(default = "default::system::data_directory")]
pub data_directory: String,

/// Remote storage url for storing snapshots.
#[serde(default = "default::system::backup_storage_url")]
pub backup_storage_url: String,

/// Remote directory for storing snapshots.
#[serde(default = "default::system::backup_storage_directory")]
pub backup_storage_directory: String,
}

impl Default for SystemConfig {
fn default() -> Self {
toml::from_str("").unwrap()
}
}

impl SystemConfig {
pub fn into_init_system_params(self) -> SystemParams {
SystemParams {
barrier_interval_ms: Some(self.barrier_interval_ms),
checkpoint_frequency: Some(self.checkpoint_frequency),
sstable_size_mb: Some(self.sstable_size_mb),
block_size_kb: Some(self.block_size_kb),
bloom_false_positive: Some(self.bloom_false_positive),
state_store: Some(self.state_store),
data_directory: Some(self.data_directory),
backup_storage_url: Some(self.backup_storage_url),
backup_storage_directory: Some(self.backup_storage_directory),
}
}
}

mod default {
pub mod meta {
use crate::config::MetaBackend;
Expand Down Expand Up @@ -662,4 +727,44 @@ mod default {
1024
}
}

pub mod system {
use crate::system_param;

pub fn barrier_interval_ms() -> u32 {
system_param::default::barrier_interval_ms()
}

pub fn checkpoint_frequency() -> u64 {
system_param::default::checkpoint_frequency()
}

pub fn sstable_size_mb() -> u32 {
system_param::default::sstable_size_mb()
}

pub fn block_size_kb() -> u32 {
system_param::default::block_size_kb()
}

pub fn bloom_false_positive() -> f64 {
system_param::default::bloom_false_positive()
}

pub fn state_store() -> String {
system_param::default::state_store()
}

pub fn data_directory() -> String {
system_param::default::data_directory()
}

pub fn backup_storage_url() -> String {
system_param::default::backup_storage_url()
}

pub fn backup_storage_directory() -> String {
system_param::default::backup_storage_directory()
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ macro_rules! for_all_undeprecated_params {
{ sstable_size_mb, u32, 256_u32 },
{ block_size_kb, u32, 64_u32 },
{ bloom_false_positive, f64, 0.001_f64 },
{ state_store, String, "hummock+memory".to_string() },
{ state_store, String, "".to_string() },
{ data_directory, String, "hummock_001".to_string() },
{ backup_storage_url, String, "memory".to_string() },
{ backup_storage_directory, String, "backup".to_string() },
Expand Down
6 changes: 6 additions & 0 deletions src/config/ci-compaction-test-meta.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,9 @@ total_buffer_capacity_mb = 128
cache_file_fallocate_unit_mb = 512
cache_meta_fallocate_unit_mb = 16
cache_file_max_write_size_mb = 4

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
6 changes: 6 additions & 0 deletions src/config/ci-compaction-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ total_buffer_capacity_mb = 128
cache_file_fallocate_unit_mb = 512
cache_meta_fallocate_unit_mb = 16
cache_file_max_write_size_mb = 4

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
4 changes: 3 additions & 1 deletion src/config/ci-iceberg-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 5000
in_flight_barrier_nums = 10

[system]
barrier_interval_ms = 5000
checkpoint_frequency = 1
4 changes: 4 additions & 0 deletions src/config/ci-meta-backup-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
min_sst_retention_time_sec = 0
collect_gc_watermark_spin_interval_sec = 1
vacuum_interval_sec = 10

[system]
backup_storage_url = "minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001"
backup_storage_directory = "backup"
6 changes: 4 additions & 2 deletions src/config/ci-recovery.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
in_flight_barrier_nums = 10
checkpoint_frequency = 5

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
6 changes: 4 additions & 2 deletions src/config/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
in_flight_barrier_nums = 10
checkpoint_frequency = 5

[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
6 changes: 6 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
unsafe_stream_extreme_cache_size = 1024
stream_chunk_size = 1024

[system]
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.001
data_directory = "hummock_001"
90 changes: 41 additions & 49 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ use std::time::Duration;

use clap::Parser;
pub use error::{MetaError, MetaResult};
use risingwave_common::system_param::default;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;

use crate::manager::MetaOpts;
use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend};
Expand Down Expand Up @@ -107,42 +105,6 @@ pub struct MetaNodeOpts {
#[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
prometheus_endpoint: Option<String>,

/// State store url.
#[clap(long, env = "RW_STATE_STORE")]
state_store: Option<String>,

/// The interval of periodic barrier.
#[clap(long, env = "RW_BARRIER_INTERVAL_MS", default_value_t = default::barrier_interval_ms())]
barrier_interval_ms: u32,

/// There will be a checkpoint for every n barriers
#[clap(long, env = "RW_CHECKPOINT_FREQUENCY", default_value_t = default::checkpoint_frequency())]
pub checkpoint_frequency: u64,

/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB", default_value_t = default::sstable_size_mb())]
sstable_size_mb: u32,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB", default_value_t = default::block_size_kb())]
block_size_kb: u32,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE", default_value_t = default::bloom_false_positive())]
bloom_false_positive: f64,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY", default_value_t = default::data_directory())]
data_directory: String,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL", default_value_t = default::backup_storage_url())]
backup_storage_url: String,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_STORAGE_DIRECTORY", default_value_t = default::backup_storage_directory())]
backup_storage_directory: String,

/// Endpoint of the connector node, there will be a sidecar connector node
/// colocated with Meta node in the cloud environment
#[clap(long, env = "RW_CONNECTOR_RPC_ENDPOINT")]
Expand All @@ -164,6 +126,46 @@ pub struct OverrideConfigOpts {
#[clap(long, env = "RW_BACKEND", value_enum)]
#[override_opts(path = meta.backend)]
backend: Option<MetaBackend>,

/// The interval of periodic barrier.
#[clap(long, env = "RW_BARRIER_INTERVAL_MS")]
#[override_opts(path = system.barrier_interval_ms)]
barrier_interval_ms: Option<u32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess some of these are not necessary to be overridden by CLI args. 🤔


/// Target size of the Sstable.
#[clap(long, env = "RW_SSTABLE_SIZE_MB")]
#[override_opts(path = system.sstable_size_mb)]
sstable_size_mb: Option<u32>,

/// Size of each block in bytes in SST.
#[clap(long, env = "RW_BLOCK_SIZE_KB")]
#[override_opts(path = system.block_size_kb)]
block_size_kb: Option<u32>,

/// False positive probability of bloom filter.
#[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")]
#[override_opts(path = system.bloom_false_positive)]
bloom_false_positive: Option<f64>,

/// State store url
#[clap(long, env = "RW_STATE_STORE")]
#[override_opts(path = system.state_store)]
state_store: Option<String>,

/// Remote directory for storing data and metadata objects.
#[clap(long, env = "RW_DATA_DIRECTORY")]
#[override_opts(path = system.data_directory)]
data_directory: Option<String>,

/// Remote storage url for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_URL")]
#[override_opts(path = system.backup_storage_url)]
backup_storage_url: Option<String>,

/// Remote directory for storing snapshots.
#[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")]
#[override_opts(path = system.backup_storage_directory)]
backup_storage_directory: Option<String>,
}

use std::future::Future;
Expand Down Expand Up @@ -244,17 +246,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.periodic_ttl_reclaim_compaction_interval_sec,
},
SystemParams {
barrier_interval_ms: Some(opts.barrier_interval_ms),
checkpoint_frequency: Some(opts.checkpoint_frequency),
sstable_size_mb: Some(opts.sstable_size_mb),
block_size_kb: Some(opts.block_size_kb),
bloom_false_positive: Some(opts.bloom_false_positive),
state_store: Some(opts.state_store.unwrap_or_default()),
data_directory: Some(opts.data_directory),
backup_storage_url: Some(opts.backup_storage_url),
backup_storage_directory: Some(opts.backup_storage_directory),
},
config.system.into_init_system_params(),
)
.await
.unwrap();
Expand Down
3 changes: 0 additions & 3 deletions src/storage/backup/integration_tests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ function clean_etcd_data() {

function start_cluster() {
cargo make d ci-meta-backup-test 1>/dev/null 2>&1
execute_sql_and_expect \
"alter system set backup_storage_url to \"minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001\";" \
"ALTER_SYSTEM"
sleep 5
}

Expand Down
9 changes: 6 additions & 3 deletions src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ pub async fn start_meta_node(listen_addr: String, config_path: String) {
"--config-path",
&config_path,
]);
// We set a large checkpoint frequency to prevent the embedded meta node
// to commit new epochs to avoid bumping the hummock version during version log replay.
assert_eq!(CHECKPOINT_FREQ_FOR_REPLAY, meta_opts.checkpoint_frequency);
let config = load_config(
&meta_opts.config_path,
Some(meta_opts.override_opts.clone()),
);
// We set a large checkpoint frequency to prevent the embedded meta node
// to commit new epochs to avoid bumping the hummock version during version log replay.
assert_eq!(
CHECKPOINT_FREQ_FOR_REPLAY,
config.system.checkpoint_frequency
);
assert!(
config.meta.enable_compaction_deterministic,
"enable_compaction_deterministic should be set"
Expand Down
Loading