Skip to content

Commit

Permalink
feat: update default size of bgworkers, add hbworkers (#4129)
Browse files Browse the repository at this point in the history
* feat: update default size of bgworkers, add hbworkers

* feat: update frontend heartbeat as well

* chore: update sample config files and default settings

* chore: update config docs

* Revert "chore: update config docs"

This reverts commit 8107f4c.

* Revert "chore: update sample config files and default settings"

This reverts commit f5ae701.

* feat: use default heartbeat runtime size

* chore: update config docs
  • Loading branch information
sunng87 authored Jun 18, 2024
1 parent cb657ae commit 70d113a
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 23 deletions.
8 changes: 4 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. |
Expand Down Expand Up @@ -161,7 +161,7 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
Expand Down Expand Up @@ -251,7 +251,7 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
Expand Down Expand Up @@ -316,7 +316,7 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
Expand Down
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
bg_rt_size = 4

## The heartbeat options.
[heartbeat]
Expand Down
2 changes: 1 addition & 1 deletion config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
bg_rt_size = 4

## The heartbeat options.
[heartbeat]
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
bg_rt_size = 4

## Procedure storage options.
[procedure]
Expand Down
2 changes: 1 addition & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
bg_rt_size = 4

## The HTTP server options.
[http]
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn test_load_datanode_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
},
component: DatanodeOptions {
node_id: Some(42),
Expand Down Expand Up @@ -107,7 +107,7 @@ fn test_load_frontend_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
},
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
Expand Down Expand Up @@ -155,7 +155,7 @@ fn test_load_metasrv_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
},
component: MetasrvOptions {
selector: SelectorType::LeaseBased,
Expand Down Expand Up @@ -188,7 +188,7 @@ fn test_load_standalone_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
Expand Down
28 changes: 23 additions & 5 deletions src/common/runtime/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::{Builder, JoinHandle, Runtime};

const READ_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 8;
const BG_WORKERS: usize = 8;
const BG_WORKERS: usize = 4;
const HB_WORKERS: usize = 2;

/// The options for the global runtimes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
Expand All @@ -44,7 +45,7 @@ impl Default for RuntimeOptions {
Self {
read_rt_size: cpus,
write_rt_size: cpus,
bg_rt_size: cpus,
bg_rt_size: usize::max(cpus / 2, 1),
}
}
}
Expand All @@ -63,6 +64,7 @@ struct GlobalRuntimes {
read_runtime: Runtime,
write_runtime: Runtime,
bg_runtime: Runtime,
hb_runtime: Runtime,
}

macro_rules! define_spawn {
Expand Down Expand Up @@ -96,15 +98,23 @@ impl GlobalRuntimes {
define_spawn!(read);
define_spawn!(write);
define_spawn!(bg);

fn new(read: Option<Runtime>, write: Option<Runtime>, background: Option<Runtime>) -> Self {
define_spawn!(hb);

fn new(
read: Option<Runtime>,
write: Option<Runtime>,
background: Option<Runtime>,
heartbeat: Option<Runtime>,
) -> Self {
Self {
read_runtime: read
.unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)),
write_runtime: write
.unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)),
bg_runtime: background
.unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)),
hb_runtime: heartbeat
.unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)),
}
}
}
Expand All @@ -114,6 +124,7 @@ struct ConfigRuntimes {
read_runtime: Option<Runtime>,
write_runtime: Option<Runtime>,
bg_runtime: Option<Runtime>,
hb_runtime: Option<Runtime>,
already_init: bool,
}

Expand All @@ -122,9 +133,10 @@ static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
let read = c.read_runtime.take();
let write = c.write_runtime.take();
let background = c.bg_runtime.take();
let heartbeat = c.hb_runtime.take();
c.already_init = true;

GlobalRuntimes::new(read, write, background)
GlobalRuntimes::new(read, write, background, heartbeat)
});

static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
Expand Down Expand Up @@ -155,6 +167,7 @@ pub fn init_global_runtimes(options: &RuntimeOptions) {
"global-bg-worker",
options.bg_rt_size,
));
c.hb_runtime = Some(create_runtime("global-hb", "global-hb-worker", HB_WORKERS));
});
}

Expand Down Expand Up @@ -195,6 +208,7 @@ macro_rules! define_global_runtime_spawn {
define_global_runtime_spawn!(read);
define_global_runtime_spawn!(write);
define_global_runtime_spawn!(bg);
define_global_runtime_spawn!(hb);

#[cfg(test)]
mod tests {
Expand All @@ -212,6 +226,9 @@ mod tests {

let handle = spawn_bg(async { 3 + 3 });
assert_eq!(6, block_on_bg(handle).unwrap());

let handle = spawn_bg(async { 4 + 4 });
assert_eq!(8, block_on_hb(handle).unwrap());
}

macro_rules! define_spawn_blocking_test {
Expand Down Expand Up @@ -239,4 +256,5 @@ mod tests {
define_spawn_blocking_test!(read);
define_spawn_blocking_test!(write);
define_spawn_blocking_test!(bg);
define_spawn_blocking_test!(hb);
}
4 changes: 2 additions & 2 deletions src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub mod runtime;

pub use global::{
bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes,
read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write,
spawn_read, spawn_write, write_runtime,
read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read,
spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl HeartbeatTask {

let mut last_received_lease = Instant::now();

let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_hb(async move {
while let Some(res) = rx.message().await.unwrap_or_else(|e| {
error!(e; "Error while reading heartbeat response");
None
Expand Down Expand Up @@ -215,7 +215,7 @@ impl HeartbeatTask {
self.region_alive_keeper.start(Some(event_receiver)).await?;
let mut last_sent = Instant::now();

common_runtime::spawn_bg(async move {
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
loop {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl HeartbeatTask {
let capture_self = self.clone();
let retry_interval = self.retry_interval;

let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_hb(async move {
loop {
match resp_stream.message().await {
Ok(Some(resp)) => {
Expand Down Expand Up @@ -132,7 +132,7 @@ impl HeartbeatTask {
addr: self.server_addr.clone(),
});

common_runtime::spawn_bg(async move {
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

Expand Down

0 comments on commit 70d113a

Please sign in to comment.