From 470287c495a0638e361449bc5287832a21b98846 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Fri, 27 Jan 2023 17:53:11 +0000 Subject: [PATCH] define clap attributes on config for MetaConfig --- Cargo.lock | 38 +++++++ src/cmd/src/bin/meta_node.rs | 2 +- src/cmd_all/src/bin/risingwave.rs | 2 +- src/cmd_all/src/playground.rs | 2 +- src/common/Cargo.toml | 1 + src/common/src/config.rs | 48 +++++++- src/meta/src/lib.rs | 103 +----------------- .../src/compaction_test_runner.rs | 2 +- 8 files changed, 91 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b8f0aae4e830..0f9b6d90cfa6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5716,6 +5716,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serfig", "smallvec", "spin 0.9.4", "static_assertions", @@ -6900,6 +6901,28 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-bridge" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f68e451f87e0f0a51c565c907017f033ecce23a1aef9eb3faee0dfd6e578d84" +dependencies = [ + "anyhow", + "indexmap", + "serde", +] + +[[package]] +name = "serde-env" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68119a0846249fd6f4b38561b4b4727dbc4fd9fea074f1253bca7d50440ce58" +dependencies = [ + "anyhow", + "log", + "serde", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -7005,6 +7028,21 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serfig" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e56ca578e3bed3843488231c20a76f4dc227a713942f1a74b23a499879e887fc" +dependencies = [ + "anyhow", + "indexmap", + "log", + "serde", + "serde-bridge", + "serde-env", + "toml", +] + [[package]] name = "serial_test" version = "0.9.0" diff --git a/src/cmd/src/bin/meta_node.rs b/src/cmd/src/bin/meta_node.rs index 8c71639bfe31e..8b3aa44c98548 100644 --- a/src/cmd/src/bin/meta_node.rs +++ b/src/cmd/src/bin/meta_node.rs @@ -22,7 +22,7 @@ enable_jemalloc_on_linux!(); fn main() { use clap::StructOpt; - let opts = risingwave_meta::MetaNodeOpts::parse(); + let opts = risingwave_common::config::MetaConfig::parse(); risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new_default()); diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 6fd7627abd89d..387278d853eb3 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -56,7 +56,7 @@ fn main() -> Result<()> { Box::new(move |args: Vec| { eprintln!("launching meta node"); - let opts = risingwave_meta::MetaNodeOpts::parse_from(args); + let opts = risingwave_common::config::MetaConfig::parse_from(args); risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new_default()); diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index d0dec81f2ba36..cc4d6bd42558e 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -152,7 +152,7 @@ pub async fn playground() -> Result<()> { RisingWaveService::Meta(mut opts) => { opts.insert(0, "meta-node".into()); tracing::info!("starting meta-node thread with cli args: {:?}", opts); - let opts = risingwave_meta::MetaNodeOpts::parse_from(opts); + let opts = risingwave_common::config::MetaConfig::parse_from(opts); tracing::info!("opts: {:#?}", opts); let _meta_handle = tokio::spawn(async move { diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 5d6248398f87f..ca9f9801e64cf 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -51,6 +51,7 @@ ryu = "1.0" serde = { version = "1", features = ["derive"] } serde_derive = "1" serde_json = "1" +serfig = "0.0.3" smallvec = "1" spin = "0.9" static_assertions = "1" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 6589224a950df..1edcf1e0eb6f9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -19,8 +19,10 @@ use std::fs; -use clap::ArgEnum; +use clap::{ArgEnum, Parser}; use serde::{Deserialize, Serialize}; +use serfig::collectors::from_self; +use serfig::Builder; /// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed /// streams on the same connection. @@ -101,94 +103,123 @@ pub enum MetaBackend { /// The section `[meta]` in `risingwave.toml`. This section only applies to the meta node. /// A subset of the configs can be overwritten by CLI arguments. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Parser)] #[serde(deny_unknown_fields)] pub struct MetaConfig { - // Below configs are CLI configurable. + #[clap(long, default_value_t = default::meta::listen_addr())] #[serde(default = "default::meta::listen_addr")] pub listen_addr: String, /// The endpoint for this meta node, which also serves as its unique identifier in cluster /// membership and leader election. + #[clap(long)] pub meta_endpoint: Option, + #[clap(long)] pub dashboard_host: Option, + #[clap(long)] pub prometheus_host: Option, + #[clap(long, arg_enum, default_value_t = default::meta::backend())] #[serde(default = "default::meta::backend")] pub backend: MetaBackend, + #[clap(long, default_value_t = default::meta::etcd_endpoints())] #[serde(default = "default::meta::etcd_endpoints")] pub etcd_endpoints: String, /// Whether to enable authentication with etcd. By default disabled. + #[clap(long, default_value_t)] #[serde(default)] pub etcd_auth: bool, /// Username of etcd, required when --etcd-auth is enabled. + #[clap(long, default_value_t = default::meta::etcd_username())] #[serde(default = "default::meta::etcd_username")] pub etcd_username: String, /// Password of etcd, required when --etcd-auth is enabled. // TODO: it may be unsafe to put password in a file + #[clap(long, default_value_t = default::meta::etcd_password())] #[serde(default = "default::meta::etcd_password")] pub etcd_password: String, + #[clap(long)] pub dashboard_ui_path: Option, /// For dashboard service to fetch cluster info. + #[clap(long)] pub prometheus_endpoint: Option, /// Endpoint of the connector node, there will be a sidecar connector node - /// colocated with Meta node in the cloud environment + /// colocated with Meta node in the cloud environment. + #[clap(long)] pub connector_rpc_endpoint: Option, // Below configs are NOT CLI configurable. /// Threshold used by worker node to filter out new SSTs when scanning object store, during /// full SST GC. + #[clap(skip = default::meta::min_sst_retention_time_sec())] #[serde(default = "default::meta::min_sst_retention_time_sec")] pub min_sst_retention_time_sec: u64, /// The spin interval when collecting global GC watermark in hummock + #[clap(skip = default::meta::collect_gc_watermark_spin_interval_sec())] #[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")] pub collect_gc_watermark_spin_interval_sec: u64, /// Schedule compaction for all compaction groups with this interval. + #[clap(skip = default::meta::periodic_compaction_interval_sec())] #[serde(default = "default::meta::periodic_compaction_interval_sec")] pub periodic_compaction_interval_sec: u64, /// Interval of GC metadata in meta store and stale SSTs in object store. + #[clap(skip = default::meta::vacuum_interval_sec())] #[serde(default = "default::meta::vacuum_interval_sec")] pub vacuum_interval_sec: u64, /// Maximum allowed heartbeat interval in seconds. + #[clap(skip = default::meta::max_heartbeat_interval_sec())] #[serde(default = "default::meta::max_heartbeat_interval_sec")] pub max_heartbeat_interval_secs: u32, /// Whether to enable fail-on-recovery. Should only be used in e2e tests. + #[clap(skip)] #[serde(default)] pub disable_recovery: bool, + #[clap(skip = default::meta::meta_leader_lease_secs())] #[serde(default = "default::meta::meta_leader_lease_secs")] pub meta_leader_lease_secs: u64, /// After specified seconds of idle (no mview or flush), the process will be exited. /// It is mainly useful for playgrounds. + #[clap(skip)] pub dangerous_max_idle_secs: Option, /// Whether to enable deterministic compaction scheduling, which /// will disable all auto scheduling of compaction tasks. /// Should only be used in e2e tests. + #[clap(skip)] #[serde(default)] pub enable_compaction_deterministic: bool, /// Enable sanity check when SSTs are committed. + #[clap(skip)] #[serde(default)] pub enable_committed_sst_sanity_check: bool, + #[clap(skip = default::meta::node_num_monitor_interval_sec())] #[serde(default = "default::meta::node_num_monitor_interval_sec")] pub node_num_monitor_interval_sec: u64, + + /// The path of `risingwave.toml` configuration file. + /// + /// If empty, default configuration values will be used. + #[clap(long, default_value = "")] + #[serde(skip)] + pub config_path: String, } impl Default for MetaConfig { @@ -197,6 +228,15 @@ impl Default for MetaConfig { } } +impl OverwriteConfig for MetaConfig { + fn overwrite(self, config: &mut RwConfig) { + config.meta = Builder::default() + .collect(from_self(self)) + .build_with(config.meta.clone()) + .unwrap() + } +} + #[derive(Copy, Clone, Debug, ArgEnum, Serialize, Deserialize)] pub enum AsyncStackTraceOption { Off, diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index a300fd2d1e67b..8381f4538b66d 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -46,113 +46,18 @@ mod rpc; pub mod storage; mod stream; +use std::future::Future; +use std::pin::Pin; use std::time::Duration; -use clap::Parser; pub use error::{MetaError, MetaResult}; +use risingwave_common::config::{load_config, MetaBackend, MetaConfig}; use crate::manager::MetaOpts; use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend}; -/// CLI arguments received by meta node. Overwrites fields in -/// [`risingwave_common::config::MetaConfig`]. -#[derive(Debug, Clone, Parser)] -pub struct MetaNodeOpts { - #[clap(long)] - listen_addr: Option, - - #[clap(long)] - meta_endpoint: Option, - - #[clap(long)] - dashboard_host: Option, - - #[clap(long)] - prometheus_host: Option, - - #[clap(long, arg_enum)] - backend: Option, - - #[clap(long)] - etcd_endpoints: Option, - - #[clap(long)] - etcd_auth: Option, - - /// Default value is read from the 'ETCD_USERNAME' environment variable. - #[clap(long, env = "ETCD_USERNAME")] - etcd_username: Option, - - /// Default value is read from the 'ETCD_PASSWORD' environment variable. - #[clap(long, env = "ETCD_PASSWORD")] - etcd_password: Option, - - #[clap(long)] - dashboard_ui_path: Option, - - #[clap(long)] - prometheus_endpoint: Option, - - /// Default value is read from the 'META_CONNECTOR_RPC_ENDPOINT' environment variable. - #[clap(long, env = "META_CONNECTOR_RPC_ENDPOINT")] - pub connector_rpc_endpoint: Option, - - /// The path of `risingwave.toml` configuration file. - /// - /// If empty, default configuration values will be used. - #[clap(long, default_value = "")] - pub config_path: String, -} - -impl OverwriteConfig for MetaNodeOpts { - fn overwrite(self, config: &mut RwConfig) { - let mut c = &mut config.meta; - if let Some(v) = self.listen_addr { - c.listen_addr = v; - } - if self.meta_endpoint.is_some() { - c.meta_endpoint = self.meta_endpoint; - } - if self.dashboard_host.is_some() { - c.dashboard_host = self.dashboard_host; - } - if self.prometheus_host.is_some() { - c.prometheus_host = self.prometheus_host; - } - if let Some(v) = self.backend { - c.backend = v; - } - if let Some(v) = self.etcd_endpoints { - c.etcd_endpoints = v; - } - if let Some(v) = self.etcd_auth { - c.etcd_auth = v; - } - if let Some(v) = self.etcd_username { - c.etcd_username = v; - } - if let Some(v) = self.etcd_password { - c.etcd_password = v; - } - if self.dashboard_ui_path.is_some() { - c.dashboard_ui_path = self.dashboard_ui_path; - } - if self.prometheus_endpoint.is_some() { - c.prometheus_endpoint = self.prometheus_endpoint; - } - if self.connector_rpc_endpoint.is_some() { - c.connector_rpc_endpoint = self.connector_rpc_endpoint; - } - } -} - -use std::future::Future; -use std::pin::Pin; - -use risingwave_common::config::{load_config, MetaBackend, OverwriteConfig, RwConfig}; - /// Start meta node -pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { +pub fn start(opts: MetaConfig) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 8100c7988a8fe..b0a3fcaa9d53a 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -124,7 +124,7 @@ pub async fn compaction_test_main( } pub async fn start_meta_node(listen_addr: String, config_path: String) { - let meta_opts = risingwave_meta::MetaNodeOpts::parse_from([ + let meta_opts = risingwave_common::config::MetaConfig::parse_from([ "meta-node", "--listen-addr", &listen_addr,