Skip to content

Commit

Permalink
feat!: Divide flush and compaction job pool (#4871)
Browse files Browse the repository at this point in the history
* feat: divide flush/compact job pool

* feat!: divide bg jobs config

* docs: update config examples

* test: fix tests
  • Loading branch information
evenyag authored Oct 25, 2024
1 parent 2485f66 commit 1008af5
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 48 deletions.
12 changes: 8 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
| `wal.backoff_max` | String | `10s` | The maximum backoff delay.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_base` | Integer | `2` | The exponential backoff rate, i.e. next backoff = base * current backoff.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
Expand Down Expand Up @@ -116,7 +116,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
Expand Down Expand Up @@ -410,7 +412,7 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system <br/>can still successfully replay memtable data without throwing an <br/>out-of-range error. <br/>However, enabling this option might lead to unexpected data loss, <br/>as the system will skip over missing entries instead of treating <br/>them as critical errors. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
Expand All @@ -437,7 +439,9 @@
| `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. |
| `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. |
| `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). |
| `region_engine.mito.max_background_jobs` | Integer | `4` | Max number of running background jobs |
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
Expand Down
25 changes: 17 additions & 8 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ dump_index_interval = "60s"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

Expand Down Expand Up @@ -416,8 +416,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false

## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4

## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2

## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8

## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
Expand Down
25 changes: 17 additions & 8 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ backoff_deadline = "5mins"

## Ignore missing entries during read WAL.
## **It's only used when the provider is `kafka`**.
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
##
## This option ensures that when Kafka messages are deleted, the system
## can still successfully replay memtable data without throwing an
## out-of-range error.
## However, enabling this option might lead to unexpected data loss,
## as the system will skip over missing entries instead of treating
## them as critical errors.
overwrite_entry_start_id = false

Expand Down Expand Up @@ -454,8 +454,17 @@ manifest_checkpoint_distance = 10
## Whether to compress manifest and checkpoint file by gzip (default false).
compress_manifest = false

## Max number of running background jobs
max_background_jobs = 4
## Max number of running background flush jobs (default: 1/2 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_flushes = 4

## Max number of running background compaction jobs (default: 1/4 of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_compactions = 2

## Max number of running background purge jobs (default: number of cpu cores).
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8

## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn open_compaction_region(
));

let file_purger = {
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),
Expand Down
38 changes: 29 additions & 9 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;

const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
Expand Down Expand Up @@ -69,8 +66,12 @@ pub struct MitoConfig {
pub compress_manifest: bool,

// Background job configs:
/// Max number of running background jobs (default 4).
pub max_background_jobs: usize,
/// Max number of running background flush jobs (default: 1/2 of cpu cores).
pub max_background_flushes: usize,
/// Max number of running background compaction jobs (default: 1/4 of cpu cores).
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,

// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
Expand Down Expand Up @@ -137,7 +138,9 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
max_background_jobs: DEFAULT_MAX_BG_JOB,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
Expand Down Expand Up @@ -185,9 +188,26 @@ impl MitoConfig {
self.worker_channel_size = 1;
}

if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
if self.max_background_flushes == 0 {
warn!(
"Sanitize max background flushes 0 to {}",
divide_num_cpus(2)
);
self.max_background_flushes = divide_num_cpus(2);
}
if self.max_background_compactions == 0 {
warn!(
"Sanitize max background compactions 0 to {}",
divide_num_cpus(4)
);
self.max_background_compactions = divide_num_cpus(4);
}
if self.max_background_purges == 0 {
warn!(
"Sanitize max background purges 0 to {}",
common_config::utils::get_cpus()
);
self.max_background_purges = common_config::utils::get_cpus();
}

if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() {
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
max_background_purges: 1,
..Default::default()
},
None,
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() {
listener.wake();

let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// We already sets max background purges to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
Expand Down
41 changes: 26 additions & 15 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Flush background job pool.
flush_job_pool: SchedulerRef,
/// Compaction background job pool.
compact_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
Expand Down Expand Up @@ -146,10 +148,10 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
Expand Down Expand Up @@ -178,7 +180,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
Expand All @@ -195,7 +198,8 @@ impl WorkerGroup {

Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
Expand All @@ -205,8 +209,11 @@ impl WorkerGroup {
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");

// TODO(yingwen): Do we need to stop gracefully?
// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
self.compact_job_pool.stop(true).await?;
// Stops the scheduler gracefully.
self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;

Expand Down Expand Up @@ -275,8 +282,9 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Expand Down Expand Up @@ -310,7 +318,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
Expand All @@ -327,7 +336,8 @@ impl WorkerGroup {

Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
Expand Down Expand Up @@ -382,7 +392,8 @@ struct WorkerStarter<S> {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
compact_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
Expand Down Expand Up @@ -423,9 +434,9 @@ impl<S: LogStore> WorkerStarter<S> {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.scheduler,
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
compress_manifest = false
max_background_jobs = 4
auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
Expand Down Expand Up @@ -939,6 +938,9 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"content_cache_size =",
"name =",
"recovery_parallelism =",
"max_background_flushes =",
"max_background_compactions =",
"max_background_purges =",
];

input
Expand Down

0 comments on commit 1008af5

Please sign in to comment.