Skip to content

Commit

Permalink
deprecate(config): deprecate state store url on worker nodes (risingw…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR authored Mar 22, 2023
1 parent 9e774bb commit 86ffe99
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 185 deletions.
6 changes: 2 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ services:
- "0.0.0.0:1260"
- "--metrics-level"
- "1"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--meta-address"
- "http://meta-node-0:5690"
- "--config-path"
Expand Down Expand Up @@ -52,8 +50,6 @@ services:
- "0.0.0.0:1222"
- "--metrics-level"
- "1"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--meta-address"
- "http://meta-node-0:5690"
- "--connector-rpc-endpoint"
Expand Down Expand Up @@ -203,6 +199,8 @@ services:
- "etcd-0:2388"
- "--connector-rpc-endpoint"
- "connector-node:50051"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--config-path"
- /risingwave.toml
expose:
Expand Down
38 changes: 20 additions & 18 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -489,18 +489,16 @@ profile:
- use: etcd
unsafe-no-fsync: true
- use: meta-node
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5687
exporter-port: 1222
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-in-memory-kv-state-backend: true
- use: frontend
port: 4565
exporter-port: 2222
Expand Down Expand Up @@ -770,18 +768,9 @@ template:
# Jaeger used by this compute node
provide-jaeger: "jaeger*"

# Sanity check: should use shared storage if there're multiple compute nodes
provide-compute-node: "compute-node*"

# Sanity check: should start at lease one compactor if using shared object store
provide-compactor: "compactor*"

# If `user-managed` is true, this service will be started by user with the above config
user-managed: false

# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

Expand Down Expand Up @@ -826,6 +815,24 @@ template:
# Prometheus nodes used by dashboard service
provide-prometheus: "prometheus*"

# Sanity check: should use shared storage if there're multiple compute nodes
provide-compute-node: "compute-node*"

# Sanity check: should start at lease one compactor if using shared object store
provide-compactor: "compactor*"

# Minio instances used by the cluster
provide-minio: "minio*"

# OpenDAL storage backend used by the cluster
provide-opendal: "opendal*"

# AWS s3 bucket used by the cluster
provide-aws-s3: "aws-s3*"

# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

prometheus:
# Advertise address of Prometheus
address: "127.0.0.1"
Expand Down Expand Up @@ -910,14 +917,9 @@ template:
# Id of this instance
id: compactor-${port}

# Minio instances used by this compute node
# Minio instances used by this compactor
provide-minio: "minio*"

# OpenDAL storage backend used by this compute node
provide-opendal: "opendal*"
# AWS s3 bucket used by this compute node
provide-aws-s3: "aws-s3*"

# Meta-nodes used by this compute node
provide-meta-node: "meta-node*"

Expand Down
12 changes: 2 additions & 10 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_pb::meta::PbSystemParams;
use tracing::warn;

use super::system_params_to_kv;

Expand Down Expand Up @@ -53,15 +52,8 @@ impl SystemParamsReader {
self.prost.bloom_false_positive.unwrap()
}

// TODO(zhidong): Only read from system params in v0.1.18.
pub fn state_store(&self, from_local: String) -> String {
let from_prost = self.prost.state_store.as_ref().unwrap();
if from_prost.is_empty() {
warn!("--state-store is not specified on meta node, reading from CLI instead");
from_local
} else {
from_prost.clone()
}
pub fn state_store(&self) -> &str {
self.prost.state_store.as_ref().unwrap()
}

pub fn data_directory(&self) -> &str {
Expand Down
16 changes: 0 additions & 16 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")]
pub connector_rpc_sink_payload_format: Option<String>,

/// One of:
/// 1. `hummock+{object_store}` where `object_store`
/// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`,
/// `memory` or `memory-shared`.
/// 2. `in-memory`
/// 3. `sled://{path}`
#[clap(long, env = "RW_STATE_STORE")]
pub state_store: Option<String>,

/// The path of `risingwave.toml` configuration file.
///
/// If empty, default configuration values will be used.
Expand Down Expand Up @@ -173,7 +164,6 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
// slow compile in release mode.
Box::pin(async move {
tracing::info!("options: {:?}", opts);
warn_future_deprecate_options(&opts);
validate_opts(&opts);

let listen_addr = opts.listen_addr.parse().unwrap();
Expand Down Expand Up @@ -206,9 +196,3 @@ fn default_total_memory_bytes() -> usize {
fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

fn warn_future_deprecate_options(opts: &ComputeNodeOpts) {
if opts.state_store.is_some() {
tracing::warn!("`--state-store` will not be accepted by compute node in the next release. Please consider moving this argument to the meta node.");
}
}
12 changes: 3 additions & 9 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,10 @@ pub async fn compute_node_serve(
.unwrap();
let storage_opts = Arc::new(StorageOpts::from((&config, &system_params)));

let state_store_url = {
let from_local = opts
.state_store
.clone()
.unwrap_or_else(|| "hummock+memory".to_string());
system_params.state_store(from_local)
};
let state_store_url = system_params.state_store();

let embedded_compactor_enabled =
embedded_compactor_enabled(&state_store_url, config.storage.disable_remote_compactor);
embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
let storage_memory_bytes =
total_storage_memory_limit_bytes(&config.storage, embedded_compactor_enabled);
let compute_memory_bytes =
Expand Down Expand Up @@ -152,7 +146,7 @@ pub async fn compute_node_serve(
let mut join_handle_vec = vec![];

let state_store = StateStoreImpl::new(
&state_store_url,
state_store_url,
storage_opts.clone(),
hummock_meta_client.clone(),
state_store_metrics.clone(),
Expand Down
18 changes: 7 additions & 11 deletions src/risedevtool/src/compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,7 @@ fn health_check_port(port: u16) -> HealthCheck {
impl Compose for ComputeNodeConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("compute-node");
ComputeNodeService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
)?;
ComputeNodeService::apply_command_args(&mut command, self)?;
if self.enable_tiered_cache {
command.arg("--file-cache-dir").arg("/filecache");
}
Expand Down Expand Up @@ -201,7 +197,11 @@ impl Compose for ComputeNodeConfig {
impl Compose for MetaNodeConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("meta-node");
MetaNodeService::apply_command_args(&mut command, self)?;
MetaNodeService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
)?;

if let Some(c) = &config.rw_config_path {
let target = Path::new(&config.config_directory).join("risingwave.toml");
Expand Down Expand Up @@ -264,11 +264,7 @@ impl Compose for FrontendConfig {
impl Compose for CompactorConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("compactor-node");
CompactorService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
)?;
CompactorService::apply_command_args(&mut command, self)?;

if let Some(c) = &config.rw_config_path {
let target = Path::new(&config.config_directory).join("risingwave.toml");
Expand Down
12 changes: 8 additions & 4 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ pub struct ComputeNodeConfig {
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub provide_aws_s3: Option<Vec<AwsS3Config>>,
pub provide_jaeger: Option<Vec<JaegerConfig>>,
pub provide_compactor: Option<Vec<CompactorConfig>>,
pub user_managed: bool,
pub enable_in_memory_kv_state_backend: bool,
pub connector_rpc_endpoint: String,

pub total_memory_bytes: usize,
Expand Down Expand Up @@ -67,6 +65,14 @@ pub struct MetaNodeConfig {
pub connector_rpc_endpoint: String,
pub provide_etcd_backend: Option<Vec<EtcdConfig>>,
pub provide_prometheus: Option<Vec<PrometheusConfig>>,

pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
pub provide_compactor: Option<Vec<CompactorConfig>>,

pub provide_aws_s3: Option<Vec<AwsS3Config>>,
pub provide_minio: Option<Vec<MinioConfig>>,
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub enable_in_memory_kv_state_backend: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -103,8 +109,6 @@ pub struct CompactorConfig {
pub exporter_port: u16,

pub provide_minio: Option<Vec<MinioConfig>>,
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub provide_aws_s3: Option<Vec<AwsS3Config>>,

pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
pub user_managed: bool,
Expand Down
35 changes: 4 additions & 31 deletions src/risedevtool/src/task/compactor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use std::io::Write;
use std::path::Path;
use std::process::Command;

use anyhow::{anyhow, Result};
use anyhow::Result;

use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::{
add_meta_node, add_storage_backend, CompactorConfig, ExecuteContext, HummockInMemoryStrategy,
Task,
};
use crate::{add_meta_node, CompactorConfig, ExecuteContext, Task};

pub struct CompactorService {
config: CompactorConfig,
Expand All @@ -45,19 +42,7 @@ impl CompactorService {
}

/// Apply command args according to config
pub fn apply_command_args(
cmd: &mut Command,
config: &CompactorConfig,
hummock_in_memory_strategy: HummockInMemoryStrategy,
) -> Result<()> {
if matches!(
hummock_in_memory_strategy,
HummockInMemoryStrategy::Isolated
) {
return Err(anyhow!(
"compactor cannot use in-memory hummock if remote object store is not provided"
));
}
pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
cmd.arg("--listen-addr")
.arg(format!("{}:{}", config.listen_address, config.port))
.arg("--prometheus-listener-addr")
Expand All @@ -78,18 +63,6 @@ impl CompactorService {
.arg(format!("{}", compaction_worker_threads_number));
}

let provide_minio = config.provide_minio.as_ref().unwrap();
let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
let provide_opendal = config.provide_opendal.as_ref().unwrap();
add_storage_backend(
&config.id,
provide_opendal,
provide_minio,
provide_aws_s3,
hummock_in_memory_strategy,
cmd,
)?;

let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
add_meta_node(provide_meta_node, cmd)?;

Expand Down Expand Up @@ -124,7 +97,7 @@ impl Task for CompactorService {

cmd.arg("--config-path")
.arg(Path::new(&prefix_config).join("risingwave.toml"));
Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Disallowed)?;
Self::apply_command_args(&mut cmd, &self.config)?;

if !self.config.user_managed {
ctx.run_command(ctx.tmux_run(cmd)?)?;
Expand Down
Loading

0 comments on commit 86ffe99

Please sign in to comment.