Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate(config): deprecate state store url on worker nodes #8704

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ services:
- "0.0.0.0:1260"
- "--metrics-level"
- "1"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--meta-address"
- "http://meta-node-0:5690"
- "--config-path"
Expand Down Expand Up @@ -52,8 +50,6 @@ services:
- "0.0.0.0:1222"
- "--metrics-level"
- "1"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--meta-address"
- "http://meta-node-0:5690"
- "--connector-rpc-endpoint"
Expand Down Expand Up @@ -203,6 +199,8 @@ services:
- "etcd-0:2388"
- "--connector-rpc-endpoint"
- "connector-node:50051"
- "--state-store"
- "hummock+minio://hummockadmin:hummockadmin@minio-0:9301/hummock001"
- "--config-path"
- /risingwave.toml
expose:
Expand Down
38 changes: 20 additions & 18 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -489,18 +489,16 @@ profile:
- use: etcd
unsafe-no-fsync: true
- use: meta-node
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5687
exporter-port: 1222
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-in-memory-kv-state-backend: true
- use: frontend
port: 4565
exporter-port: 2222
Expand Down Expand Up @@ -770,18 +768,9 @@ template:
# Jaeger used by this compute node
provide-jaeger: "jaeger*"

# Sanity check: should use shared storage if there're multiple compute nodes
provide-compute-node: "compute-node*"

# Sanity check: should start at lease one compactor if using shared object store
provide-compactor: "compactor*"

# If `user-managed` is true, this service will be started by user with the above config
user-managed: false

# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

# Total available memory for the compute node in bytes
total-memory-bytes: 8589934592

Expand Down Expand Up @@ -826,6 +815,24 @@ template:
# Prometheus nodes used by dashboard service
provide-prometheus: "prometheus*"

# Sanity check: should use shared storage if there're multiple compute nodes
provide-compute-node: "compute-node*"

# Sanity check: should start at lease one compactor if using shared object store
provide-compactor: "compactor*"

# Minio instances used by the cluster
provide-minio: "minio*"

# OpenDAL storage backend used by the cluster
provide-opendal: "opendal*"

# AWS s3 bucket used by the cluster
provide-aws-s3: "aws-s3*"

# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

prometheus:
# Advertise address of Prometheus
address: "127.0.0.1"
Expand Down Expand Up @@ -910,14 +917,9 @@ template:
# Id of this instance
id: compactor-${port}

# Minio instances used by this compute node
# Minio instances used by this compactor
provide-minio: "minio*"

# OpenDAL storage backend used by this compute node
provide-opendal: "opendal*"
# AWS s3 bucket used by this compute node
provide-aws-s3: "aws-s3*"

# Meta-nodes used by this compute node
provide-meta-node: "meta-node*"

Expand Down
12 changes: 2 additions & 10 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_pb::meta::PbSystemParams;
use tracing::warn;

use super::system_params_to_kv;

Expand Down Expand Up @@ -53,15 +52,8 @@ impl SystemParamsReader {
self.prost.bloom_false_positive.unwrap()
}

// TODO(zhidong): Only read from system params in v0.1.18.
pub fn state_store(&self, from_local: String) -> String {
let from_prost = self.prost.state_store.as_ref().unwrap();
if from_prost.is_empty() {
warn!("--state-store is not specified on meta node, reading from CLI instead");
from_local
} else {
from_prost.clone()
}
pub fn state_store(&self) -> &str {
self.prost.state_store.as_ref().unwrap()
}

pub fn data_directory(&self) -> &str {
Expand Down
16 changes: 0 additions & 16 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT")]
pub connector_rpc_sink_payload_format: Option<String>,

/// One of:
/// 1. `hummock+{object_store}` where `object_store`
/// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`,
/// `memory` or `memory-shared`.
/// 2. `in-memory`
/// 3. `sled://{path}`
#[clap(long, env = "RW_STATE_STORE")]
pub state_store: Option<String>,

/// The path of `risingwave.toml` configuration file.
///
/// If empty, default configuration values will be used.
Expand Down Expand Up @@ -173,7 +164,6 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
// slow compile in release mode.
Box::pin(async move {
tracing::info!("options: {:?}", opts);
warn_future_deprecate_options(&opts);
validate_opts(&opts);

let listen_addr = opts.listen_addr.parse().unwrap();
Expand Down Expand Up @@ -206,9 +196,3 @@ fn default_total_memory_bytes() -> usize {
fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

fn warn_future_deprecate_options(opts: &ComputeNodeOpts) {
if opts.state_store.is_some() {
tracing::warn!("`--state-store` will not be accepted by compute node in the next release. Please consider moving this argument to the meta node.");
}
}
12 changes: 3 additions & 9 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,10 @@ pub async fn compute_node_serve(
.unwrap();
let storage_opts = Arc::new(StorageOpts::from((&config, &system_params)));

let state_store_url = {
let from_local = opts
.state_store
.clone()
.unwrap_or_else(|| "hummock+memory".to_string());
system_params.state_store(from_local)
};
let state_store_url = system_params.state_store();

let embedded_compactor_enabled =
embedded_compactor_enabled(&state_store_url, config.storage.disable_remote_compactor);
embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
let storage_memory_bytes =
total_storage_memory_limit_bytes(&config.storage, embedded_compactor_enabled);
let compute_memory_bytes =
Expand Down Expand Up @@ -152,7 +146,7 @@ pub async fn compute_node_serve(
let mut join_handle_vec = vec![];

let state_store = StateStoreImpl::new(
&state_store_url,
state_store_url,
storage_opts.clone(),
hummock_meta_client.clone(),
state_store_metrics.clone(),
Expand Down
18 changes: 7 additions & 11 deletions src/risedevtool/src/compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,7 @@ fn health_check_port(port: u16) -> HealthCheck {
impl Compose for ComputeNodeConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("compute-node");
ComputeNodeService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
)?;
ComputeNodeService::apply_command_args(&mut command, self)?;
if self.enable_tiered_cache {
command.arg("--file-cache-dir").arg("/filecache");
}
Expand Down Expand Up @@ -201,7 +197,11 @@ impl Compose for ComputeNodeConfig {
impl Compose for MetaNodeConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("meta-node");
MetaNodeService::apply_command_args(&mut command, self)?;
MetaNodeService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why disallow in memory store. #8714

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check the storage type for meta node? I thought meta use etcd as persistent store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check the storage type for meta node? I thought meta use etcd as persistent store?

We check the state store type instead of meta store.

)?;

if let Some(c) = &config.rw_config_path {
let target = Path::new(&config.config_directory).join("risingwave.toml");
Expand Down Expand Up @@ -264,11 +264,7 @@ impl Compose for FrontendConfig {
impl Compose for CompactorConfig {
fn compose(&self, config: &ComposeConfig) -> Result<ComposeService> {
let mut command = Command::new("compactor-node");
CompactorService::apply_command_args(
&mut command,
self,
HummockInMemoryStrategy::Disallowed,
)?;
CompactorService::apply_command_args(&mut command, self)?;

if let Some(c) = &config.rw_config_path {
let target = Path::new(&config.config_directory).join("risingwave.toml");
Expand Down
12 changes: 8 additions & 4 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ pub struct ComputeNodeConfig {
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub provide_aws_s3: Option<Vec<AwsS3Config>>,
pub provide_jaeger: Option<Vec<JaegerConfig>>,
pub provide_compactor: Option<Vec<CompactorConfig>>,
pub user_managed: bool,
pub enable_in_memory_kv_state_backend: bool,
pub connector_rpc_endpoint: String,

pub total_memory_bytes: usize,
Expand Down Expand Up @@ -67,6 +65,14 @@ pub struct MetaNodeConfig {
pub connector_rpc_endpoint: String,
pub provide_etcd_backend: Option<Vec<EtcdConfig>>,
pub provide_prometheus: Option<Vec<PrometheusConfig>>,

pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
pub provide_compactor: Option<Vec<CompactorConfig>>,

pub provide_aws_s3: Option<Vec<AwsS3Config>>,
pub provide_minio: Option<Vec<MinioConfig>>,
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub enable_in_memory_kv_state_backend: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -103,8 +109,6 @@ pub struct CompactorConfig {
pub exporter_port: u16,

pub provide_minio: Option<Vec<MinioConfig>>,
pub provide_opendal: Option<Vec<OpendalConfig>>,
pub provide_aws_s3: Option<Vec<AwsS3Config>>,

pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
pub user_managed: bool,
Expand Down
35 changes: 4 additions & 31 deletions src/risedevtool/src/task/compactor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ use std::io::Write;
use std::path::Path;
use std::process::Command;

use anyhow::{anyhow, Result};
use anyhow::Result;

use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
use crate::{
add_meta_node, add_storage_backend, CompactorConfig, ExecuteContext, HummockInMemoryStrategy,
Task,
};
use crate::{add_meta_node, CompactorConfig, ExecuteContext, Task};

pub struct CompactorService {
config: CompactorConfig,
Expand All @@ -45,19 +42,7 @@ impl CompactorService {
}

/// Apply command args according to config
pub fn apply_command_args(
cmd: &mut Command,
config: &CompactorConfig,
hummock_in_memory_strategy: HummockInMemoryStrategy,
) -> Result<()> {
if matches!(
hummock_in_memory_strategy,
HummockInMemoryStrategy::Isolated
) {
return Err(anyhow!(
"compactor cannot use in-memory hummock if remote object store is not provided"
));
}
pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
cmd.arg("--listen-addr")
.arg(format!("{}:{}", config.listen_address, config.port))
.arg("--prometheus-listener-addr")
Expand All @@ -78,18 +63,6 @@ impl CompactorService {
.arg(format!("{}", compaction_worker_threads_number));
}

let provide_minio = config.provide_minio.as_ref().unwrap();
let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
let provide_opendal = config.provide_opendal.as_ref().unwrap();
add_storage_backend(
&config.id,
provide_opendal,
provide_minio,
provide_aws_s3,
hummock_in_memory_strategy,
cmd,
)?;

let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
add_meta_node(provide_meta_node, cmd)?;

Expand Down Expand Up @@ -124,7 +97,7 @@ impl Task for CompactorService {

cmd.arg("--config-path")
.arg(Path::new(&prefix_config).join("risingwave.toml"));
Self::apply_command_args(&mut cmd, &self.config, HummockInMemoryStrategy::Disallowed)?;
Self::apply_command_args(&mut cmd, &self.config)?;

if !self.config.user_managed {
ctx.run_command(ctx.tmux_run(cmd)?)?;
Expand Down
Loading