Skip to content

Commit

Permalink
feat(wal): increase recovery parallelism (GreptimeTeam#4689)
Browse files Browse the repository at this point in the history
* Refactor RaftEngineLogStore to use references for config

 - Updated `RaftEngineLogStore::try_new` to accept a reference to `RaftEngineConfig` instead of taking ownership.
 - Replaced direct usage of `config` with individual fields (`sync_write`, `sync_period`, `read_batch_size`).
 - Adjusted test cases to pass references to `RaftEngineConfig`.

* Add parallelism configuration for WAL recovery

 - Introduced `recovery_parallelism` setting in `datanode.example.toml` and `standalone.example.toml` for configuring parallelism during WAL recovery.
 - Updated `Cargo.lock` and `Cargo.toml` to include `num_cpus` dependency.
 - Modified `RaftEngineConfig` to include `recovery_parallelism` with a default value set to the number of CP

* feat/wal-recovery-parallelism:
 Add `wal.recovery_parallelism` configuration option

 - Introduced `wal.recovery_parallelism` to config.md for specifying parallelism during WAL recovery.
 - Updated `RaftEngineLogStore` to include `recovery_threads` from the new configuration.

* fix: ut
  • Loading branch information
v0y4g3r authored and CookiePieWw committed Sep 17, 2024
1 parent e960dee commit 70f64cb
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
Expand Down Expand Up @@ -381,6 +382,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
Expand Down
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"

## Parallelism during WAL recovery.
recovery_parallelism = 2

## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
Expand Down
3 changes: 3 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"

## Parallelism during WAL recovery.
recovery_parallelism = 2

## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ fn test_load_datanode_example_config() {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
Expand Down Expand Up @@ -207,6 +208,7 @@ fn test_load_standalone_example_config() {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
region_engine: vec![
Expand Down
1 change: 1 addition & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-macro.workspace = true
common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
rskafka.workspace = true
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls-native-certs = "0.7"
Expand Down
3 changes: 3 additions & 0 deletions src/common/wal/src/config/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct RaftEngineConfig {
/// Duration for fsyncing log files.
#[serde(with = "humantime_serde")]
pub sync_period: Option<Duration>,
/// Parallelism during log recovery.
pub recovery_parallelism: usize,
}

impl Default for RaftEngineConfig {
Expand All @@ -55,6 +57,7 @@ impl Default for RaftEngineConfig {
enable_log_recycle: true,
prefill_log_files: false,
sync_period: None,
recovery_parallelism: num_cpus::get(),
}
}
}
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ impl DatanodeBuilder {
"Creating raft-engine logstore with config: {:?} and storage path: {}",
config, &wal_dir
);
let logstore = RaftEngineLogStore::try_new(wal_dir, config.clone())
let logstore = RaftEngineLogStore::try_new(wal_dir, config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)?;
Expand Down
34 changes: 21 additions & 13 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{hash_map, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use async_stream::stream;
use common_runtime::{RepeatedTask, TaskFunction};
Expand All @@ -40,7 +41,9 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
const NAMESPACE_PREFIX: &str = "$sys/";

pub struct RaftEngineLogStore {
config: RaftEngineConfig,
sync_write: bool,
sync_period: Option<Duration>,
read_batch_size: usize,
engine: Arc<Engine>,
gc_task: RepeatedTask<Error>,
last_sync_time: AtomicI64,
Expand Down Expand Up @@ -76,7 +79,7 @@ impl TaskFunction<Error> for PurgeExpiredFilesFunction {
}

impl RaftEngineLogStore {
pub async fn try_new(dir: String, config: RaftEngineConfig) -> Result<Self> {
pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result<Self> {
let raft_engine_config = Config {
dir,
purge_threshold: ReadableSize(config.purge_threshold.0),
Expand All @@ -85,6 +88,7 @@ impl RaftEngineLogStore {
target_file_size: ReadableSize(config.file_size.0),
enable_log_recycle: config.enable_log_recycle,
prefill_for_recycle: config.prefill_log_files,
recovery_threads: config.recovery_parallelism,
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
Expand All @@ -96,7 +100,9 @@ impl RaftEngineLogStore {
);

let log_store = Self {
config,
sync_write: config.sync_write,
sync_period: config.sync_period,
read_batch_size: config.read_batch_size,
engine,
gc_task,
last_sync_time: AtomicI64::new(0),
Expand Down Expand Up @@ -196,7 +202,9 @@ impl RaftEngineLogStore {
impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
.field("config", &self.config)
.field("sync_write", &self.sync_write)
.field("sync_period", &self.sync_period)
.field("read_batch_size", &self.read_batch_size)
.field("started", &self.gc_task.started())
.finish()
}
Expand Down Expand Up @@ -228,9 +236,9 @@ impl LogStore for RaftEngineLogStore {

let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;

let mut sync = self.config.sync_write;
let mut sync = self.sync_write;

if let Some(sync_period) = &self.config.sync_period {
if let Some(sync_period) = &self.sync_period {
let now = common_time::util::current_time_millis();
if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 {
self.last_sync_time.store(now, Ordering::Relaxed);
Expand Down Expand Up @@ -276,7 +284,7 @@ impl LogStore for RaftEngineLogStore {
entry_id,
self.span(ns)
);
let max_batch_size = self.config.read_batch_size;
let max_batch_size = self.read_batch_size;
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
let _handle = common_runtime::spawn_global(async move {
while start_index <= last_index {
Expand Down Expand Up @@ -489,7 +497,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
RaftEngineConfig::default(),
&RaftEngineConfig::default(),
)
.await
.unwrap();
Expand All @@ -502,7 +510,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
RaftEngineConfig::default(),
&RaftEngineConfig::default(),
)
.await
.unwrap();
Expand All @@ -528,7 +536,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
RaftEngineConfig::default(),
&RaftEngineConfig::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -570,7 +578,7 @@ mod tests {
{
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
RaftEngineConfig::default(),
&RaftEngineConfig::default(),
)
.await
.unwrap();
Expand All @@ -590,7 +598,7 @@ mod tests {

let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
RaftEngineConfig::default(),
&RaftEngineConfig::default(),
)
.await
.unwrap();
Expand Down Expand Up @@ -634,7 +642,7 @@ mod tests {
..Default::default()
};

RaftEngineLogStore::try_new(path, config).await.unwrap()
RaftEngineLogStore::try_new(path, &config).await.unwrap()
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/test_util/log_store_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng
file_size: ReadableSize::kb(128),
..Default::default()
};
RaftEngineLogStore::try_new(path, cfg).await.unwrap()
RaftEngineLogStore::try_new(path, &cfg).await.unwrap()
}

/// Create a [KafkaLogStore].
Expand Down
1 change: 1 addition & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"metadata_cache_size =",
"content_cache_size =",
"name =",
"recovery_parallelism =",
];

input
Expand Down

0 comments on commit 70f64cb

Please sign in to comment.