Skip to content

Commit

Permalink
define clap attributes on config for MetaConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR committed Jan 27, 2023
1 parent 8d6f5f0 commit 470287c
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 107 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cmd/src/bin/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enable_jemalloc_on_linux!();
fn main() {
use clap::StructOpt;

let opts = risingwave_meta::MetaNodeOpts::parse();
let opts = risingwave_common::config::MetaConfig::parse();

risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new_default());

Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn main() -> Result<()> {
Box::new(move |args: Vec<String>| {
eprintln!("launching meta node");

let opts = risingwave_meta::MetaNodeOpts::parse_from(args);
let opts = risingwave_common::config::MetaConfig::parse_from(args);

risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new_default());

Expand Down
2 changes: 1 addition & 1 deletion src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub async fn playground() -> Result<()> {
RisingWaveService::Meta(mut opts) => {
opts.insert(0, "meta-node".into());
tracing::info!("starting meta-node thread with cli args: {:?}", opts);
let opts = risingwave_meta::MetaNodeOpts::parse_from(opts);
let opts = risingwave_common::config::MetaConfig::parse_from(opts);

tracing::info!("opts: {:#?}", opts);
let _meta_handle = tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ryu = "1.0"
serde = { version = "1", features = ["derive"] }
serde_derive = "1"
serde_json = "1"
serfig = "0.0.3"
smallvec = "1"
spin = "0.9"
static_assertions = "1"
Expand Down
48 changes: 44 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
use std::fs;

use clap::ArgEnum;
use clap::{ArgEnum, Parser};
use serde::{Deserialize, Serialize};
use serfig::collectors::from_self;
use serfig::Builder;

/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
/// streams on the same connection.
Expand Down Expand Up @@ -101,94 +103,123 @@ pub enum MetaBackend {

/// The section `[meta]` in `risingwave.toml`. This section only applies to the meta node.
/// A subset of the configs can be overwritten by CLI arguments.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Parser)]
#[serde(deny_unknown_fields)]
pub struct MetaConfig {
// Below configs are CLI configurable.
#[clap(long, default_value_t = default::meta::listen_addr())]
#[serde(default = "default::meta::listen_addr")]
pub listen_addr: String,

/// The endpoint for this meta node, which also serves as its unique identifier in cluster
/// membership and leader election.
#[clap(long)]
pub meta_endpoint: Option<String>,

#[clap(long)]
pub dashboard_host: Option<String>,

#[clap(long)]
pub prometheus_host: Option<String>,

#[clap(long, arg_enum, default_value_t = default::meta::backend())]
#[serde(default = "default::meta::backend")]
pub backend: MetaBackend,

#[clap(long, default_value_t = default::meta::etcd_endpoints())]
#[serde(default = "default::meta::etcd_endpoints")]
pub etcd_endpoints: String,

/// Whether to enable authentication with etcd. By default disabled.
#[clap(long, default_value_t)]
#[serde(default)]
pub etcd_auth: bool,

/// Username of etcd, required when --etcd-auth is enabled.
#[clap(long, default_value_t = default::meta::etcd_username())]
#[serde(default = "default::meta::etcd_username")]
pub etcd_username: String,

/// Password of etcd, required when --etcd-auth is enabled.
// TODO: it may be unsafe to put password in a file
#[clap(long, default_value_t = default::meta::etcd_password())]
#[serde(default = "default::meta::etcd_password")]
pub etcd_password: String,

#[clap(long)]
pub dashboard_ui_path: Option<String>,

/// For dashboard service to fetch cluster info.
#[clap(long)]
pub prometheus_endpoint: Option<String>,

/// Endpoint of the connector node, there will be a sidecar connector node
/// colocated with Meta node in the cloud environment
/// colocated with Meta node in the cloud environment.
#[clap(long)]
pub connector_rpc_endpoint: Option<String>,

// Below configs are NOT CLI configurable.
/// Threshold used by worker node to filter out new SSTs when scanning object store, during
/// full SST GC.
#[clap(skip = default::meta::min_sst_retention_time_sec())]
#[serde(default = "default::meta::min_sst_retention_time_sec")]
pub min_sst_retention_time_sec: u64,

/// The spin interval when collecting global GC watermark in hummock
#[clap(skip = default::meta::collect_gc_watermark_spin_interval_sec())]
#[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")]
pub collect_gc_watermark_spin_interval_sec: u64,

/// Schedule compaction for all compaction groups with this interval.
#[clap(skip = default::meta::periodic_compaction_interval_sec())]
#[serde(default = "default::meta::periodic_compaction_interval_sec")]
pub periodic_compaction_interval_sec: u64,

/// Interval of GC metadata in meta store and stale SSTs in object store.
#[clap(skip = default::meta::vacuum_interval_sec())]
#[serde(default = "default::meta::vacuum_interval_sec")]
pub vacuum_interval_sec: u64,

/// Maximum allowed heartbeat interval in seconds.
#[clap(skip = default::meta::max_heartbeat_interval_sec())]
#[serde(default = "default::meta::max_heartbeat_interval_sec")]
pub max_heartbeat_interval_secs: u32,

/// Whether to enable fail-on-recovery. Should only be used in e2e tests.
#[clap(skip)]
#[serde(default)]
pub disable_recovery: bool,

#[clap(skip = default::meta::meta_leader_lease_secs())]
#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

/// After specified seconds of idle (no mview or flush), the process will be exited.
/// It is mainly useful for playgrounds.
#[clap(skip)]
pub dangerous_max_idle_secs: Option<u64>,

/// Whether to enable deterministic compaction scheduling, which
/// will disable all auto scheduling of compaction tasks.
/// Should only be used in e2e tests.
#[clap(skip)]
#[serde(default)]
pub enable_compaction_deterministic: bool,

/// Enable sanity check when SSTs are committed.
#[clap(skip)]
#[serde(default)]
pub enable_committed_sst_sanity_check: bool,

#[clap(skip = default::meta::node_num_monitor_interval_sec())]
#[serde(default = "default::meta::node_num_monitor_interval_sec")]
pub node_num_monitor_interval_sec: u64,

/// The path of `risingwave.toml` configuration file.
///
/// If empty, default configuration values will be used.
#[clap(long, default_value = "")]
#[serde(skip)]
pub config_path: String,
}

impl Default for MetaConfig {
Expand All @@ -197,6 +228,15 @@ impl Default for MetaConfig {
}
}

impl OverwriteConfig for MetaConfig {
fn overwrite(self, config: &mut RwConfig) {
config.meta = Builder::default()
.collect(from_self(self))
.build_with(config.meta.clone())
.unwrap()
}
}

#[derive(Copy, Clone, Debug, ArgEnum, Serialize, Deserialize)]
pub enum AsyncStackTraceOption {
Off,
Expand Down
103 changes: 4 additions & 99 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,113 +46,18 @@ mod rpc;
pub mod storage;
mod stream;

use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use clap::Parser;
pub use error::{MetaError, MetaResult};
use risingwave_common::config::{load_config, MetaBackend, MetaConfig};

use crate::manager::MetaOpts;
use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend};

/// CLI arguments received by meta node. Overwrites fields in
/// [`risingwave_common::config::MetaConfig`].
#[derive(Debug, Clone, Parser)]
pub struct MetaNodeOpts {
#[clap(long)]
listen_addr: Option<String>,

#[clap(long)]
meta_endpoint: Option<String>,

#[clap(long)]
dashboard_host: Option<String>,

#[clap(long)]
prometheus_host: Option<String>,

#[clap(long, arg_enum)]
backend: Option<MetaBackend>,

#[clap(long)]
etcd_endpoints: Option<String>,

#[clap(long)]
etcd_auth: Option<bool>,

/// Default value is read from the 'ETCD_USERNAME' environment variable.
#[clap(long, env = "ETCD_USERNAME")]
etcd_username: Option<String>,

/// Default value is read from the 'ETCD_PASSWORD' environment variable.
#[clap(long, env = "ETCD_PASSWORD")]
etcd_password: Option<String>,

#[clap(long)]
dashboard_ui_path: Option<String>,

#[clap(long)]
prometheus_endpoint: Option<String>,

/// Default value is read from the 'META_CONNECTOR_RPC_ENDPOINT' environment variable.
#[clap(long, env = "META_CONNECTOR_RPC_ENDPOINT")]
pub connector_rpc_endpoint: Option<String>,

/// The path of `risingwave.toml` configuration file.
///
/// If empty, default configuration values will be used.
#[clap(long, default_value = "")]
pub config_path: String,
}

impl OverwriteConfig for MetaNodeOpts {
fn overwrite(self, config: &mut RwConfig) {
let mut c = &mut config.meta;
if let Some(v) = self.listen_addr {
c.listen_addr = v;
}
if self.meta_endpoint.is_some() {
c.meta_endpoint = self.meta_endpoint;
}
if self.dashboard_host.is_some() {
c.dashboard_host = self.dashboard_host;
}
if self.prometheus_host.is_some() {
c.prometheus_host = self.prometheus_host;
}
if let Some(v) = self.backend {
c.backend = v;
}
if let Some(v) = self.etcd_endpoints {
c.etcd_endpoints = v;
}
if let Some(v) = self.etcd_auth {
c.etcd_auth = v;
}
if let Some(v) = self.etcd_username {
c.etcd_username = v;
}
if let Some(v) = self.etcd_password {
c.etcd_password = v;
}
if self.dashboard_ui_path.is_some() {
c.dashboard_ui_path = self.dashboard_ui_path;
}
if self.prometheus_endpoint.is_some() {
c.prometheus_endpoint = self.prometheus_endpoint;
}
if self.connector_rpc_endpoint.is_some() {
c.connector_rpc_endpoint = self.connector_rpc_endpoint;
}
}
}

use std::future::Future;
use std::pin::Pin;

use risingwave_common::config::{load_config, MetaBackend, OverwriteConfig, RwConfig};

/// Start meta node
pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
pub fn start(opts: MetaConfig) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub async fn compaction_test_main(
}

pub async fn start_meta_node(listen_addr: String, config_path: String) {
let meta_opts = risingwave_meta::MetaNodeOpts::parse_from([
let meta_opts = risingwave_common::config::MetaConfig::parse_from([
"meta-node",
"--listen-addr",
&listen_addr,
Expand Down

0 comments on commit 470287c

Please sign in to comment.