diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index 1f6feadf4579..b7270e91fd93 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -204,6 +204,7 @@ export interface Function { name: string; owner: number; argTypes: DataType[]; + returnType: DataType | undefined; language: string; link: string; identifier: string; @@ -214,11 +215,9 @@ export interface Function { } export interface Function_ScalarFunction { - returnType: DataType | undefined; } export interface Function_TableFunction { - returnTypes: DataType[]; } export interface Function_AggregateFunction { @@ -824,6 +823,7 @@ function createBaseFunction(): Function { name: "", owner: 0, argTypes: [], + returnType: undefined, language: "", link: "", identifier: "", @@ -842,6 +842,7 @@ export const Function = { argTypes: Array.isArray(object?.argTypes) ? object.argTypes.map((e: any) => DataType.fromJSON(e)) : [], + returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined, language: isSet(object.language) ? String(object.language) : "", link: isSet(object.link) ? String(object.link) : "", identifier: isSet(object.identifier) ? String(object.identifier) : "", @@ -867,6 +868,8 @@ export const Function = { } else { obj.argTypes = []; } + message.returnType !== undefined && + (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); message.language !== undefined && (obj.language = message.language); message.link !== undefined && (obj.link = message.link); message.identifier !== undefined && (obj.identifier = message.identifier); @@ -889,6 +892,9 @@ export const Function = { message.name = object.name ?? ""; message.owner = object.owner ?? 0; message.argTypes = object.argTypes?.map((e) => DataType.fromPartial(e)) || []; + message.returnType = (object.returnType !== undefined && object.returnType !== null) + ? DataType.fromPartial(object.returnType) + : undefined; message.language = object.language ?? ""; message.link = object.link ?? ""; message.identifier = object.identifier ?? ""; @@ -906,54 +912,41 @@ export const Function = { }; function createBaseFunction_ScalarFunction(): Function_ScalarFunction { - return { returnType: undefined }; + return {}; } export const Function_ScalarFunction = { - fromJSON(object: any): Function_ScalarFunction { - return { returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined }; + fromJSON(_: any): Function_ScalarFunction { + return {}; }, - toJSON(message: Function_ScalarFunction): unknown { + toJSON(_: Function_ScalarFunction): unknown { const obj: any = {}; - message.returnType !== undefined && - (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); return obj; }, - fromPartial, I>>(object: I): Function_ScalarFunction { + fromPartial, I>>(_: I): Function_ScalarFunction { const message = createBaseFunction_ScalarFunction(); - message.returnType = (object.returnType !== undefined && object.returnType !== null) - ? DataType.fromPartial(object.returnType) - : undefined; return message; }, }; function createBaseFunction_TableFunction(): Function_TableFunction { - return { returnTypes: [] }; + return {}; } export const Function_TableFunction = { - fromJSON(object: any): Function_TableFunction { - return { - returnTypes: Array.isArray(object?.returnTypes) ? object.returnTypes.map((e: any) => DataType.fromJSON(e)) : [], - }; + fromJSON(_: any): Function_TableFunction { + return {}; }, - toJSON(message: Function_TableFunction): unknown { + toJSON(_: Function_TableFunction): unknown { const obj: any = {}; - if (message.returnTypes) { - obj.returnTypes = message.returnTypes.map((e) => e ? DataType.toJSON(e) : undefined); - } else { - obj.returnTypes = []; - } return obj; }, - fromPartial, I>>(object: I): Function_TableFunction { + fromPartial, I>>(_: I): Function_TableFunction { const message = createBaseFunction_TableFunction(); - message.returnTypes = object.returnTypes?.map((e) => DataType.fromPartial(e)) || []; return message; }, }; diff --git a/dashboard/proto/gen/expr.ts b/dashboard/proto/gen/expr.ts index 1b610347b05f..4faa1e414aa7 100644 --- a/dashboard/proto/gen/expr.ts +++ b/dashboard/proto/gen/expr.ts @@ -644,7 +644,9 @@ export function exprNode_TypeToJSON(object: ExprNode_Type): string { export interface TableFunction { functionType: TableFunction_Type; args: ExprNode[]; - returnTypes: DataType[]; + returnType: + | DataType + | undefined; /** optional. only used when the type is UDTF. */ udtf: UserDefinedTableFunction | undefined; } @@ -961,7 +963,7 @@ export const ExprNode = { }; function createBaseTableFunction(): TableFunction { - return { functionType: TableFunction_Type.UNSPECIFIED, args: [], returnTypes: [], udtf: undefined }; + return { functionType: TableFunction_Type.UNSPECIFIED, args: [], returnType: undefined, udtf: undefined }; } export const TableFunction = { @@ -973,7 +975,7 @@ export const TableFunction = { args: Array.isArray(object?.args) ? object.args.map((e: any) => ExprNode.fromJSON(e)) : [], - returnTypes: Array.isArray(object?.returnTypes) ? object.returnTypes.map((e: any) => DataType.fromJSON(e)) : [], + returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined, udtf: isSet(object.udtf) ? UserDefinedTableFunction.fromJSON(object.udtf) : undefined, }; }, @@ -986,11 +988,8 @@ export const TableFunction = { } else { obj.args = []; } - if (message.returnTypes) { - obj.returnTypes = message.returnTypes.map((e) => e ? DataType.toJSON(e) : undefined); - } else { - obj.returnTypes = []; - } + message.returnType !== undefined && + (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); message.udtf !== undefined && (obj.udtf = message.udtf ? UserDefinedTableFunction.toJSON(message.udtf) : undefined); return obj; }, @@ -999,7 +998,9 @@ export const TableFunction = { const message = createBaseTableFunction(); message.functionType = object.functionType ?? TableFunction_Type.UNSPECIFIED; message.args = object.args?.map((e) => ExprNode.fromPartial(e)) || []; - message.returnTypes = object.returnTypes?.map((e) => DataType.fromPartial(e)) || []; + message.returnType = (object.returnType !== undefined && object.returnType !== null) + ? DataType.fromPartial(object.returnType) + : undefined; message.udtf = (object.udtf !== undefined && object.udtf !== null) ? UserDefinedTableFunction.fromPartial(object.udtf) : undefined; diff --git a/dashboard/proto/gen/hummock.ts b/dashboard/proto/gen/hummock.ts index 09b7fa5e7a06..48e6aee33faa 100644 --- a/dashboard/proto/gen/hummock.ts +++ b/dashboard/proto/gen/hummock.ts @@ -795,6 +795,7 @@ export interface CompactionConfig { splitByStateTable: boolean; /** soft limit for max number of sub level number */ level0StopWriteThresholdSubLevelNumber: number; + level0MaxCompactFileNumber: number; } export const CompactionConfig_CompactionMode = { @@ -4361,6 +4362,7 @@ function createBaseCompactionConfig(): CompactionConfig { maxSpaceReclaimBytes: 0, splitByStateTable: false, level0StopWriteThresholdSubLevelNumber: 0, + level0MaxCompactFileNumber: 0, }; } @@ -4393,6 +4395,9 @@ export const CompactionConfig = { level0StopWriteThresholdSubLevelNumber: isSet(object.level0StopWriteThresholdSubLevelNumber) ? Number(object.level0StopWriteThresholdSubLevelNumber) : 0, + level0MaxCompactFileNumber: isSet(object.level0MaxCompactFileNumber) + ? Number(object.level0MaxCompactFileNumber) + : 0, }; }, @@ -4421,6 +4426,8 @@ export const CompactionConfig = { message.splitByStateTable !== undefined && (obj.splitByStateTable = message.splitByStateTable); message.level0StopWriteThresholdSubLevelNumber !== undefined && (obj.level0StopWriteThresholdSubLevelNumber = Math.round(message.level0StopWriteThresholdSubLevelNumber)); + message.level0MaxCompactFileNumber !== undefined && + (obj.level0MaxCompactFileNumber = Math.round(message.level0MaxCompactFileNumber)); return obj; }, @@ -4440,6 +4447,7 @@ export const CompactionConfig = { message.maxSpaceReclaimBytes = object.maxSpaceReclaimBytes ?? 0; message.splitByStateTable = object.splitByStateTable ?? false; message.level0StopWriteThresholdSubLevelNumber = object.level0StopWriteThresholdSubLevelNumber ?? 0; + message.level0MaxCompactFileNumber = object.level0MaxCompactFileNumber ?? 0; return message; }, }; diff --git a/proto/hummock.proto b/proto/hummock.proto index 445ddc2e154f..29c90aedeee2 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -605,6 +605,7 @@ message CompactionConfig { bool split_by_state_table = 14; // soft limit for max number of sub level number uint64 level0_stop_write_threshold_sub_level_number = 15; + uint64 level0_max_compact_file_number = 16; } message TableStats { diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index 97f2a6cb8db9..0c66402dcebb 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -17,7 +17,7 @@ use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::CompactionConfig; const DEFAULT_MAX_COMPACTION_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2GB -const DEFAULT_MIN_COMPACTION_BYTES: u64 = 256 * 1024 * 1024; // 256MB +const DEFAULT_MIN_COMPACTION_BYTES: u64 = 128 * 1024 * 1024; // 128MB const DEFAULT_MAX_BYTES_FOR_LEVEL_BASE: u64 = 512 * 1024 * 1024; // 512MB // decrease this configure when the generation of checkpoint barrier is not frequent. @@ -28,6 +28,7 @@ const MAX_LEVEL: u64 = 6; const DEFAULT_LEVEL_MULTIPLIER: u64 = 5; const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB; const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = u32::MAX as u64; +const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 96; pub struct CompactionConfigBuilder { config: CompactionConfig, @@ -65,6 +66,11 @@ impl CompactionConfigBuilder { split_by_state_table: false, level0_stop_write_threshold_sub_level_number: DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER, + // This configure variable shall be larger than level0_tier_compact_file_number, and + // it shall meet the following condition: + // level0_max_compact_file_number * target_file_size_base > + // max_bytes_for_level_base + level0_max_compact_file_number: DEFAULT_MAX_COMPACTION_FILE_COUNT, }, } } @@ -127,4 +133,5 @@ builder_field! { max_sub_compaction: u32, max_space_reclaim_bytes: u64, level0_stop_write_threshold_sub_level_number: u64, + level0_max_compact_file_number: u64, } diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 166b085218c9..12b73b2922e6 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; use risingwave_common::catalog::TableOption; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig}; @@ -200,13 +201,15 @@ impl DynamicLevelSelectorCore { let total_size = levels.l0.as_ref().unwrap().total_file_size - handlers[0].get_pending_output_file_size(ctx.base_level as u32); + let base_level_size = levels.get_level(ctx.base_level).total_file_size; if idle_file_count > 0 { // trigger intra-l0 compaction at first when the number of files is too large. let l0_score = idle_file_count as u64 * SCORE_BASE / self.config.level0_tier_compact_file_number; ctx.score_levels .push((std::cmp::min(l0_score, max_l0_score), 0, 0)); - let score = total_size * SCORE_BASE / self.config.max_bytes_for_level_base; + let score = total_size * SCORE_BASE + / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size); ctx.score_levels.push((score, 0, ctx.base_level)); } diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a6f2e2368b49..6386b174c216 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -109,6 +109,7 @@ impl LevelCompactionPicker { ) -> Option { let mut input_levels = vec![]; let mut l0_total_file_size = 0; + let mut l0_total_file_count = 0; for level in &l0.sub_levels { // This break is optional. We can include overlapping sub-level actually. if level.level_type() != LevelType::Nonoverlapping { @@ -118,6 +119,10 @@ impl LevelCompactionPicker { break; } + if l0_total_file_count > self.config.level0_max_compact_file_number { + break; + } + let mut pending_compact = false; let mut cur_level_size = 0; let mut select_level = InputLevel { @@ -144,6 +149,7 @@ impl LevelCompactionPicker { continue; } + l0_total_file_count += select_level.table_infos.len() as u64; l0_total_file_size += cur_level_size; input_levels.push(select_level); } @@ -163,9 +169,7 @@ impl LevelCompactionPicker { target_level_size += sst.file_size; } - if target_level_size > l0_total_file_size - && l0_total_file_size < self.config.max_compaction_bytes - { + if target_level_size > l0_total_file_size { stats.skip_by_write_amp_limit += 1; return None; } @@ -440,7 +444,7 @@ pub mod tests { .is_none()); } - // compact the whole level and upper sub-level when the write-amplification is more than 1.5. + // compact the whole level and upper sub-level when the write-amplification is more than 1.0. #[test] fn test_compact_whole_level_write_amplification_limit() { let config = CompactionConfigBuilder::new() @@ -497,10 +501,8 @@ pub mod tests { sub_level.table_infos[0].file_size += 1000 - sub_level.total_file_size; sub_level.total_file_size = 1000; levels.l0.as_mut().unwrap().sub_levels[1].total_file_size = 1000; - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - assert_eq!(ret.input_levels.len(), 2); + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); } #[test] diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index 2406dc59a9ee..59c431e29a3b 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -69,12 +69,15 @@ impl TierCompactionPicker { ); let mut compaction_bytes = level.total_file_size; let mut max_level_size = level.total_file_size; + let mut compact_file_count = level.table_infos.len() as u64; for other in &l0.sub_levels[idx + 1..] { if compaction_bytes > max_compaction_bytes { break; } - + if compact_file_count > self.config.level0_max_compact_file_number { + break; + } if other.level_type != non_overlapping_type || other.total_file_size > self.config.sub_level_max_compaction_bytes { @@ -85,6 +88,7 @@ impl TierCompactionPicker { } compaction_bytes += other.total_file_size; + compact_file_count += other.table_infos.len() as u64; max_level_size = std::cmp::max(max_level_size, other.total_file_size); select_level_inputs.push(InputLevel { level_idx: 0, @@ -107,6 +111,7 @@ impl TierCompactionPicker { if level.level_type == non_overlapping_type && is_write_amp_large && select_level_inputs.len() < self.config.level0_tier_compact_file_number as usize + && compact_file_count < self.config.level0_max_compact_file_number { stats.skip_by_write_amp_limit += 1; continue; @@ -303,7 +308,7 @@ impl TierCompactionPicker { ); let mut compaction_bytes = level.total_file_size; - let mut compact_file_count = level.table_infos.len(); + let mut compact_file_count = level.table_infos.len() as u64; let mut waiting_enough_files = true; for other in &l0.sub_levels[idx + 1..] { @@ -312,6 +317,10 @@ impl TierCompactionPicker { break; } + if compact_file_count > self.config.level0_max_compact_file_number { + break; + } + if other.level_type != overlapping_type { waiting_enough_files = false; break; @@ -322,7 +331,7 @@ impl TierCompactionPicker { } compaction_bytes += other.total_file_size; - compact_file_count += other.table_infos.len(); + compact_file_count += other.table_infos.len() as u64; select_level_inputs.push(InputLevel { level_idx: 0, level_type: other.level_type, @@ -330,7 +339,7 @@ impl TierCompactionPicker { }); } - if compact_file_count < self.config.level0_tier_compact_file_number as usize + if compact_file_count < self.config.level0_tier_compact_file_number && waiting_enough_files { stats.skip_by_count_limit += 1; diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 13b76b77cfbd..34ad67d8ec0b 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -338,6 +338,7 @@ pub async fn setup_compute_env( ) { let config = CompactionConfigBuilder::new() .level0_tier_compact_file_number(1) + .level0_max_compact_file_number(130) .build(); setup_compute_env_with_config(port, config).await }