diff --git a/Cargo.lock b/Cargo.lock index 1b3daaa1bd..e9e56c3b8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,7 +85,7 @@ dependencies = [ "async-trait", "base64 0.13.1", "bytes", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "datafusion", @@ -1093,7 +1093,7 @@ checksum = "6a2c1699cb154e97cfccd3d6a0679f561c6214a33d86b3eacb78685c7479d022" dependencies = [ "arrow 23.0.0", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "dashmap 5.4.0", "futures 0.3.28", "paste 1.0.12", @@ -1130,6 +1130,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "ceresdbproto" +version = "1.0.4" +source = "git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02#53f5c74a54d8a08ebb08c41e8b862b2369df4a02" +dependencies = [ + "prost", + "protoc-bin-vendored", + "tonic 0.8.3", + "tonic-build", + "walkdir", +] + [[package]] name = "cexpr" version = "0.4.0" @@ -1274,7 +1286,7 @@ name = "cluster" version = "1.2.2" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "etcd-client", @@ -1327,7 +1339,7 @@ dependencies = [ "arrow_ext", "byteorder", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "chrono", "datafusion", "murmur3", @@ -1346,7 +1358,7 @@ version = "1.2.2" dependencies = [ "arrow 38.0.0", "backtrace", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "chrono", "common_types", "crossbeam-utils 0.8.15", @@ -3470,7 +3482,7 @@ name = "meta_client" version = "1.2.2" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "futures 0.3.28", @@ -3979,7 +3991,7 @@ version = "1.2.2" dependencies = [ "async-trait", "bytes", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "chrono", "clru", "common_types", @@ -4775,7 +4787,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "clru", "cluster", "common_types", @@ -4888,7 +4900,7 @@ dependencies = [ "arrow 38.0.0", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "cluster", "common_types", "common_util", @@ -5196,7 +5208,7 @@ version = "1.2.2" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "futures 0.3.28", @@ -5322,7 +5334,7 @@ name = "router" version = "1.2.2" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "cluster", "common_types", "common_util", @@ -5677,7 +5689,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "clru", "cluster", "common_types", @@ -6216,7 +6228,7 @@ dependencies = [ "arrow 38.0.0", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "futures 0.3.28", @@ -6235,7 +6247,7 @@ dependencies = [ "arrow 38.0.0", "arrow_ext", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "common_types", "common_util", "datafusion", @@ -7049,7 +7061,7 @@ name = "wal" version = "1.2.2" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.4 (git+https://github.com/tanruixiang/ceresdbproto.git?rev=53f5c74a54d8a08ebb08c41e8b862b2369df4a02)", "chrono", "common_types", "common_util", diff --git a/Cargo.toml b/Cargo.toml index 2f2036df2f..5e432163bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ bytes = "1.1.0" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.4" +ceresdbproto = { git = "https://github.com/tanruixiang/ceresdbproto.git", rev = "53f5c74a54d8a08ebb08c41e8b862b2369df4a02" } chrono = "0.4" clap = "3.0" clru = "0.6.1" diff --git a/analytic_engine/src/compaction/mod.rs b/analytic_engine/src/compaction/mod.rs index bcbead4af9..e0485522b1 100644 --- a/analytic_engine/src/compaction/mod.rs +++ b/analytic_engine/src/compaction/mod.rs @@ -318,13 +318,26 @@ pub struct ExpiredFiles { #[derive(Default, Clone)] pub struct CompactionTask { - pub compaction_inputs: Vec, - pub expired: Vec, + inputs: Vec, + expired: Vec, +} + +impl Drop for CompactionTask { + fn drop(&mut self) { + // When a CompactionTask is dropped, it means + // 1. the task finished successfully, or + // 2. the task is cancelled for some reason, like memory limit + // + // In case 2, we need to mark files as not compacted in order for them to be + // scheduled again. In case 1, the files will be moved out of level controller, + // so it doesn't care what the flag is, so it's safe to set false here. + self.mark_files_being_compacted(false); + } } impl CompactionTask { - pub fn mark_files_being_compacted(&self, being_compacted: bool) { - for input in &self.compaction_inputs { + fn mark_files_being_compacted(&self, being_compacted: bool) { + for input in &self.inputs { for file in &input.files { file.set_being_compacted(being_compacted); } @@ -337,9 +350,10 @@ impl CompactionTask { } // Estimate the size of the total input files. + #[inline] pub fn estimated_total_input_file_size(&self) -> usize { let total_input_size: u64 = self - .compaction_inputs + .inputs .iter() .map(|v| v.files.iter().map(|f| f.size()).sum::()) .sum(); @@ -347,19 +361,65 @@ impl CompactionTask { total_input_size as usize } + #[inline] pub fn num_compact_files(&self) -> usize { - self.compaction_inputs.iter().map(|v| v.files.len()).sum() + self.inputs.iter().map(|v| v.files.len()).sum() } - pub fn num_expired_files(&self) -> usize { - self.expired.iter().map(|v| v.files.len()).sum() + #[inline] + pub fn is_empty(&self) -> bool { + self.is_input_empty() && self.expired.is_empty() + } + + #[inline] + pub fn is_input_empty(&self) -> bool { + self.inputs.is_empty() + } + + #[inline] + pub fn expired(&self) -> &[ExpiredFiles] { + &self.expired + } + + #[inline] + pub fn inputs(&self) -> &[CompactionInputFiles] { + &self.inputs + } +} + +pub struct CompactionTaskBuilder { + expired: Vec, + inputs: Vec, +} + +impl CompactionTaskBuilder { + pub fn with_expired(expired: Vec) -> Self { + Self { + expired, + inputs: Vec::new(), + } + } + + pub fn add_inputs(&mut self, files: CompactionInputFiles) { + self.inputs.push(files); + } + + pub fn build(self) -> CompactionTask { + let task = CompactionTask { + expired: self.expired, + inputs: self.inputs, + }; + + task.mark_files_being_compacted(true); + + task } } impl fmt::Debug for CompactionTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CompactionTask") - .field("inputs", &self.compaction_inputs) + .field("inputs", &self.inputs) .field( "expired", &self @@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask { } } -pub struct PickerManager { - default_picker: CompactionPickerRef, - time_window_picker: CompactionPickerRef, - size_tiered_picker: CompactionPickerRef, -} - -impl Default for PickerManager { - fn default() -> Self { - let size_tiered_picker = Arc::new(CommonCompactionPicker::new( - CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()), - )); - let time_window_picker = Arc::new(CommonCompactionPicker::new( - CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()), - )); - - Self { - default_picker: time_window_picker.clone(), - size_tiered_picker, - time_window_picker, - } - } -} +#[derive(Default)] +pub struct PickerManager; impl PickerManager { pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef { - match strategy { - CompactionStrategy::Default => self.default_picker.clone(), - CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(), - CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(), - } + Arc::new(CommonCompactionPicker::new(strategy)) } } diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 96600199f0..784f0ba894 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -15,8 +15,8 @@ use snafu::Snafu; use crate::{ compaction::{ - CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions, - TimeWindowCompactionOptions, + CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder, + SizeTieredCompactionOptions, TimeWindowCompactionOptions, }, sst::{ file::{FileHandle, Level}, @@ -60,7 +60,7 @@ pub trait CompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result; } @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker { impl CommonCompactionPicker { pub fn new(strategy: CompactionStrategy) -> Self { let level_picker: LevelPickerRef = match strategy { - CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => { - Arc::new(SizeTieredPicker::default()) + CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()), + CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => { + Arc::new(TimeWindowPicker::default()) } - CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()), }; Self { level_picker } } @@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result { let expire_time = ctx.ttl.map(Timestamp::expire_time); - let mut compaction_task = CompactionTask { - expired: levels_controller.expired_ssts(expire_time), - ..Default::default() - }; + let mut builder = + CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time)); if let Some(input_files) = self.pick_compact_candidates(&ctx, levels_controller, expire_time) @@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker { ctx.strategy, input_files ); - compaction_task.compaction_inputs = vec![input_files]; + builder.add_inputs(input_files); } - Ok(compaction_task) + Ok(builder.build()) } } @@ -734,39 +732,39 @@ mod tests { }; let now = Timestamp::now(); { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 2); - assert_eq!(task.compaction_inputs[0].files[0].id(), 0); - assert_eq!(task.compaction_inputs[0].files[1].id(), 1); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 2); + assert_eq!(task.inputs[0].files[0].id(), 0); + assert_eq!(task.inputs[0].files[1].id(), 1); assert_eq!(task.expired[0].files.len(), 1); assert_eq!(task.expired[0].files[0].id(), 3); } { - let lc = build_newest_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 4); - assert_eq!(task.compaction_inputs[0].files[0].id(), 2); - assert_eq!(task.compaction_inputs[0].files[1].id(), 3); - assert_eq!(task.compaction_inputs[0].files[2].id(), 4); - assert_eq!(task.compaction_inputs[0].files[3].id(), 5); + let mut lc = build_newest_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 4); + assert_eq!(task.inputs[0].files[0].id(), 2); + assert_eq!(task.inputs[0].files[1].id(), 3); + assert_eq!(task.inputs[0].files[2].id(), 4); + assert_eq!(task.inputs[0].files[3].id(), 5); } { - let lc = build_newest_bucket_no_match_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs.len(), 0); + let mut lc = build_newest_bucket_no_match_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs.len(), 0); } // If ttl is None, then no file is expired. ctx.ttl = None; { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx, &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 2); - assert_eq!(task.compaction_inputs[0].files[0].id(), 0); - assert_eq!(task.compaction_inputs[0].files[1].id(), 1); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx, &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 2); + assert_eq!(task.inputs[0].files[0].id(), 0); + assert_eq!(task.inputs[0].files[1].id(), 1); assert!(task.expired[0].files.is_empty()); } } diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 30cf277521..866629678d 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -237,7 +237,7 @@ impl OngoingTaskLimit { if dropped > 0 { warn!( - "Too many compaction pending tasks, limit: {}, dropped {} older tasks.", + "Too many compaction pending tasks, limit:{}, dropped:{}.", self.max_pending_compaction_tasks, dropped, ); } @@ -462,10 +462,7 @@ impl ScheduleWorker { waiter_notifier: WaiterNotifier, token: MemoryUsageToken, ) { - // Mark files being in compaction. - compaction_task.mark_files_being_compacted(true); - - let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty(); + let keep_scheduling_compaction = !compaction_task.is_input_empty(); let runtime = self.runtime.clone(); let space_store = self.space_store.clone(); @@ -503,9 +500,6 @@ impl ScheduleWorker { .await; if let Err(e) = &res { - // Compaction is failed, we need to unset the compaction mark. - compaction_task.mark_files_being_compacted(false); - error!( "Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}", table_data.name, table_data.id, request_id, e @@ -656,7 +650,16 @@ impl ScheduleWorker { self.max_unflushed_duration, ); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await { + v + } else { + warn!( + "Table is closed, ignore this periodical flush, table:{}", + table_data.name + ); + continue; + }; + let flush_scheduler = serial_exec.flush_scheduler(); // Instance flush the table asynchronously. if let Err(e) = flusher diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index f45199c164..d933b01214 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -3,12 +3,12 @@ //! Close table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::CloseTableRequest; use crate::{ instance::{ - engine::{DoManifestSnapshot, FlushTable, Result}, + engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result}, flush_compaction::{Flusher, TableFlushOptions}, }, manifest::{ManifestRef, SnapshotRequest}, @@ -37,8 +37,11 @@ impl Closer { // Flush table. let opts = TableFlushOptions::default(); - let mut serial_exec = table_data.serial_exec.lock().await; - let flush_scheduler = serial_exec.flush_scheduler(); + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) @@ -67,9 +70,10 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); + serial_exec_ctx.invalidate(); info!( - "table:{}-{} has been removed from the space_id:{}", - table_data.name, table_data.id, self.space.id + "table:{} has been removed from the space_id:{}, table_id:{}", + table_data.name, self.space.id, table_data.id, ); Ok(()) } diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index ac6d1653fd..08d673f820 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -3,12 +3,12 @@ //! Drop table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::DropTableRequest; use crate::{ instance::{ - engine::{FlushTable, Result, WriteManifest}, + engine::{FlushTable, OperateClosedTable, Result, WriteManifest}, flush_compaction::{Flusher, TableFlushOptions}, SpaceStoreRef, }, @@ -36,7 +36,10 @@ impl Dropper { } }; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; if table_data.is_dropped() { warn!( @@ -51,7 +54,7 @@ impl Dropper { // be avoided. let opts = TableFlushOptions::default(); - let flush_scheduler = serial_exec.flush_scheduler(); + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) .await diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index e32fa064d5..562c97ff2e 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -23,29 +23,21 @@ use crate::{ #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { - #[snafu(display( - "The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}", - space_id, - table, - backtrace, - ))] + #[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))] SpaceNotExist { space_id: SpaceId, table: String, backtrace: Backtrace, }, - #[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))] + #[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))] ReadMetaUpdate { table_id: TableId, source: GenericError, }, #[snafu(display( - "Failed to recover table data, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}" ))] RecoverTableData { space_id: SpaceId, @@ -53,14 +45,11 @@ pub enum Error { source: crate::table::data::Error, }, - #[snafu(display("Failed to read wal, err:{}", source))] + #[snafu(display("Failed to read wal, err:{source}"))] ReadWal { source: wal::manager::Error }, #[snafu(display( - "Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}", - table, - table_id, - source + "Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}", ))] ApplyMemTable { space_id: SpaceId, @@ -70,11 +59,7 @@ pub enum Error { }, #[snafu(display( - "Flush failed, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] FlushTable { space_id: SpaceId, @@ -84,11 +69,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteManifest { space_id: SpaceId, @@ -98,11 +79,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteWal { space_id: SpaceId, @@ -112,11 +89,7 @@ pub enum Error { }, #[snafu(display( - "Invalid options, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] InvalidOptions { space_id: SpaceId, @@ -126,11 +99,7 @@ pub enum Error { }, #[snafu(display( - "Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] CreateTableData { space_id: SpaceId, @@ -140,11 +109,8 @@ pub enum Error { }, #[snafu(display( - "Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}", - table, - current_version, - given_version, - backtrace, + "Try to update schema to elder version, table:{table}, current_version:{current_version}, \ + given_version:{given_version}.\nBacktrace:\n{backtrace}", ))] InvalidSchemaVersion { table: String, @@ -154,11 +120,8 @@ pub enum Error { }, #[snafu(display( - "Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}", - table, - current_version, - pre_version, - backtrace, + "Invalid previous schema version, table:{table}, current_version:{current_version}, \ + pre_version:{pre_version}.\nBacktrace:\n{backtrace}", ))] InvalidPreVersion { table: String, @@ -167,21 +130,14 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Alter schema of a dropped table:{}.\nBacktrace:\n{}", - table, - backtrace - ))] + #[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))] AlterDroppedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to store version edit, err:{}", source))] + #[snafu(display("Failed to store version edit, err:{source}"))] StoreVersionEdit { source: GenericError }, #[snafu(display( - "Failed to encode payloads, table:{}, wal_location:{:?}, err:{}", - table, - wal_location, - source + "Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}" ))] EncodePayloads { table: String, @@ -190,10 +146,7 @@ pub enum Error { }, #[snafu(display( - "Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}", ))] DoManifestSnapshot { space_id: SpaceId, @@ -202,30 +155,31 @@ pub enum Error { }, #[snafu(display( - "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", - table, - backtrace, + "Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}", ))] CreateOpenFailedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to open manifest, err:{}", source))] + #[snafu(display("Failed to open manifest, err:{source}"))] OpenManifest { source: crate::manifest::details::Error, }, - #[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableNotExist { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))] ReplayWalWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] ReplayWalNoCause { msg: Option, backtrace: Backtrace, @@ -264,6 +218,7 @@ impl From for table_engine::engine::Error { | Error::TableNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } + | Error::OperateClosedTable { .. } | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 4e860def74..03b2f30337 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -669,22 +669,23 @@ impl SpaceStore { "Begin compact table, table_name:{}, id:{}, task:{:?}", table_data.name, table_data.id, task ); + let inputs = task.inputs(); let mut edit_meta = VersionEditMeta { space_id: table_data.space_id, table_id: table_data.id, flushed_sequence: 0, // Use the number of compaction inputs as the estimated number of files to add. - files_to_add: Vec::with_capacity(task.compaction_inputs.len()), + files_to_add: Vec::with_capacity(inputs.len()), files_to_delete: vec![], mems_to_remove: vec![], }; - if task.num_expired_files() == 0 && task.num_compact_files() == 0 { + if task.is_empty() { // Nothing to compact. return Ok(()); } - for files in &task.expired { + for files in task.expired() { self.delete_expired_files(table_data, request_id, files, &mut edit_meta); } @@ -696,7 +697,7 @@ impl SpaceStore { task.num_compact_files(), ); - for input in &task.compaction_inputs { + for input in inputs { self.compact_input_files( request_id, table_data, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 1faf254f08..492178b41c 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -28,14 +28,14 @@ use common_util::{ }; use log::{error, info}; use mem_collector::MemUsageCollector; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; use tokio::sync::oneshot::{self, error::RecvError}; use wal::manager::{WalLocation, WalManagerRef}; -use self::flush_compaction::{Flusher, TableFlushOptions}; use crate::{ compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest}, + instance::flush_compaction::{Flusher, TableFlushOptions}, manifest::ManifestRef, row_iter::IterOptions, space::{SpaceId, SpaceRef, SpacesRef}, @@ -66,6 +66,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { table: String, backtrace: Backtrace }, + #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] RecvManualOpResult { op: String, @@ -195,7 +198,13 @@ impl Instance { }; let flusher = self.make_flusher(); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = + table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable { + table: &table_data.name, + })?; let flush_scheduler = serial_exec.flush_scheduler(); flusher .schedule_flush(flush_scheduler, table_data, flush_opts) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 5d494450e4..0144833ec6 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; use log::{debug, error, info, trace}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -25,13 +25,13 @@ use wal::{ use crate::{ instance::{ self, - engine::{Error, ReplayWalWithCause, Result}, + engine::{Error, OperateClosedTable, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, payload::{ReadPayload, WalDecoder}, - table::data::TableDataRef, + table::data::{SerialExecContext, TableDataRef}, }; /// Wal replayer supporting both table based and region based @@ -182,7 +182,10 @@ impl TableBasedReplay { .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf @@ -264,14 +267,17 @@ impl RegionBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. - let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut replay_table_ctxs = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { - let serial_exec = table_data.serial_exec.lock().await; - let serial_exec_ctx = SerialExecContext { + let serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let replay_table_ctx = TableReplayContext { table_data: table_data.clone(), - serial_exec, + serial_exec_ctx, }; - serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + replay_table_ctxs.insert(table_data.id, replay_table_ctx); } // Split and replay logs. @@ -287,7 +293,7 @@ impl RegionBasedReplay { break; } - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) .await?; } @@ -297,7 +303,7 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &mut HashMap>, + serial_exec_ctxs: &mut HashMap>, faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); @@ -317,7 +323,7 @@ impl RegionBasedReplay { let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, - &mut ctx.serial_exec, + &mut ctx.serial_exec_ctx, &ctx.table_data, log_batch.range(table_batch.range), ) @@ -391,9 +397,9 @@ struct TableBatch { range: Range, } -struct SerialExecContext<'a> { +struct TableReplayContext<'a> { table_data: TableDataRef, - serial_exec: MutexGuard<'a, TableOpSerialExecutor>, + serial_exec_ctx: MutexGuard<'a, SerialExecContext>, } /// Replay all log entries into memtable and flush if necessary diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 219e86102c..6017686b9a 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -608,9 +608,9 @@ impl<'a> Writer<'a> { "Try to trigger flush of other table:{} from the write procedure of table:{}", table_data.name, self.table_data.name ); - match table_data.serial_exec.try_lock() { - Ok(mut serial_exec) => { - let flush_scheduler = serial_exec.flush_scheduler(); + match table_data.try_acquire_serial_exec_ctx() { + Some(mut serial_exec_ctx) => { + let flush_scheduler = serial_exec_ctx.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. flusher .schedule_flush(flush_scheduler, table_data, opts) @@ -619,7 +619,7 @@ impl<'a> Writer<'a> { table: &table_data.name, }) } - Err(_) => { + None => { warn!( "Failed to acquire write lock for flush table:{}", table_data.name, diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 296c4e2476..5e2bacdcbd 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ fmt::Debug, @@ -7,7 +7,7 @@ use std::{ use lru::LruCache; use parquet::file::metadata::FileMetaData; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::sst::{ meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result}, @@ -39,14 +39,24 @@ impl MetaData { let kv_metas = file_meta_data .key_value_metadata() .context(KvMetaDataNotFound)?; - let kv_meta = kv_metas - .iter() - .find(|kv| kv.key == encoding::META_KEY) - .context(KvMetaDataNotFound)?; + + ensure!(!kv_metas.is_empty(), KvMetaDataNotFound); + let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1); + let mut custom_kv_meta = None; + for kv_meta in kv_metas { + // Remove our extended custom meta data from the parquet metadata for small + // memory consumption in the cache. + if kv_meta.key == encoding::META_KEY { + custom_kv_meta = Some(kv_meta); + } else { + other_kv_metas.push(kv_meta.clone()); + } + } let custom = { + let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?; let mut sst_meta = - encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?; + encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?; if ignore_sst_filter { sst_meta.parquet_filter = None; } @@ -56,13 +66,17 @@ impl MetaData { // let's build a new parquet metadata without the extended key value // metadata. + let other_kv_metas = if other_kv_metas.is_empty() { + None + } else { + Some(other_kv_metas) + }; let parquet = { let thin_file_meta_data = FileMetaData::new( file_meta_data.version(), file_meta_data.num_rows(), file_meta_data.created_by().map(|v| v.to_string()), - // Remove the key value metadata. - None, + other_kv_metas, file_meta_data.schema_descr_ptr(), file_meta_data.column_orders().cloned(), ); @@ -111,3 +125,153 @@ impl MetaCache { self.cache.write().unwrap().put(key, value); } } + +#[cfg(test)] +mod tests { + use std::{fs::File, path::Path, sync::Arc}; + + use arrow::{ + array::UInt64Builder, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }; + use bytes::Bytes; + use common_types::{ + column_schema::Builder as ColumnSchemaBuilder, + schema::Builder as CustomSchemaBuilder, + time::{TimeRange, Timestamp}, + }; + use parquet::{arrow::ArrowWriter, file::footer}; + use parquet_ext::ParquetMetaData; + + use super::MetaData; + use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData}; + + fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) { + assert_eq!(original.page_indexes(), processed.page_indexes()); + assert_eq!(original.offset_indexes(), processed.offset_indexes()); + assert_eq!(original.num_row_groups(), processed.num_row_groups()); + assert_eq!(original.row_groups(), processed.row_groups()); + + let original_file_md = original.file_metadata(); + let processed_file_md = processed.file_metadata(); + assert_eq!(original_file_md.num_rows(), processed_file_md.num_rows()); + assert_eq!(original_file_md.version(), processed_file_md.version()); + assert_eq!( + original_file_md.created_by(), + processed_file_md.created_by() + ); + assert_eq!(original_file_md.schema(), processed_file_md.schema()); + assert_eq!( + original_file_md.schema_descr(), + processed_file_md.schema_descr() + ); + assert_eq!( + original_file_md.schema_descr_ptr(), + processed_file_md.schema_descr_ptr() + ); + assert_eq!( + original_file_md.column_orders(), + processed_file_md.column_orders() + ); + + if let Some(kv_metas) = original_file_md.key_value_metadata() { + let processed_kv_metas = processed_file_md.key_value_metadata().unwrap(); + assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1); + let mut idx_for_processed = 0; + for kv in kv_metas { + if kv.key == encoding::META_KEY { + continue; + } + assert_eq!(kv, &processed_kv_metas[idx_for_processed]); + idx_for_processed += 1; + } + } else { + assert!(processed_file_md.key_value_metadata().is_none()); + } + } + + fn write_parquet_file_with_metadata( + parquet_file_path: &Path, + custom_meta_data: &CustomParquetMetaData, + ) { + let tsid_array = { + let mut builder = UInt64Builder::new(); + builder.append_value(10); + builder.append_null(); + builder.append_value(11); + builder.finish() + }; + let timestamp_array = { + let mut builder = UInt64Builder::new(); + builder.append_value(1000); + builder.append_null(); + builder.append_value(1001); + builder.finish() + }; + let file = File::create(parquet_file_path).unwrap(); + let schema = Schema::new(vec![ + Field::new("tsid", DataType::UInt64, true), + Field::new("timestamp", DataType::UInt64, true), + ]); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(tsid_array), Arc::new(timestamp_array)], + ) + .unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); + + let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap(); + writer.append_key_value_metadata(encoded_meta_data); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + fn test_arrow_meta_data() { + let temp_dir = tempfile::tempdir().unwrap(); + let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par"); + let schema = { + let tsid_column_schema = ColumnSchemaBuilder::new( + "tsid".to_string(), + common_types::datum::DatumKind::UInt64, + ) + .build() + .unwrap(); + let timestamp_column_schema = ColumnSchemaBuilder::new( + "timestamp".to_string(), + common_types::datum::DatumKind::Timestamp, + ) + .build() + .unwrap(); + CustomSchemaBuilder::new() + .auto_increment_column_id(true) + .add_key_column(tsid_column_schema) + .unwrap() + .add_key_column(timestamp_column_schema) + .unwrap() + .build() + .unwrap() + }; + let custom_meta_data = CustomParquetMetaData { + min_key: Bytes::from_static(&[0, 1]), + max_key: Bytes::from_static(&[2, 2]), + time_range: TimeRange::new_unchecked(Timestamp::new(0), Timestamp::new(10)), + max_sequence: 1001, + schema, + parquet_filter: None, + collapsible_cols_idx: vec![], + }; + write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data); + + let parquet_file = File::open(parquet_file_path.as_path()).unwrap(); + let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap(); + + let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap(); + + assert_eq!(**meta_data.custom(), custom_meta_data); + check_parquet_meta_data(&parquet_meta_data, meta_data.parquet()); + } +} diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index dac48bff44..47478eca6a 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Sst reader implementation based on parquet. @@ -340,7 +340,7 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self) -> Result { + async fn load_meta_data_from_storage(&self) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); @@ -351,7 +351,7 @@ impl<'a> Reader<'a> { file_path: self.path.to_string(), })?; - Ok(Arc::new(meta_data)) + Ok(meta_data) } fn need_update_cache(&self) -> bool { diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 2effc6bc36..d65f75dcd1 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -1030,11 +1030,11 @@ mod tests { ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap(); let input_record_batch2 = ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap(); - let row_nums = encoder + let num_rows = encoder .encode(vec![input_record_batch, input_record_batch2]) .await .unwrap(); - assert_eq!(2, row_nums); + assert_eq!(2, num_rows); // read encoded records back, and then compare with input records encoder.close().await.unwrap(); diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 8bba1b41a2..8418983f8b 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -333,7 +333,7 @@ mod tests { use common_types::{ bytes::Bytes, projected_schema::ProjectedSchema, - tests::{build_row, build_schema}, + tests::{build_row, build_row_for_dictionary, build_schema, build_schema_for_dictionary}, time::{TimeRange, Timestamp}, }; use common_util::{ @@ -360,6 +360,144 @@ mod tests { // TODO(xikai): add test for reverse reader + #[test] + fn test_parquet_use_dictionary() { + let runtime = Arc::new(runtime::Builder::default().build().unwrap()); + let num_rows_per_row_group = 4; + let expected_num_rows = vec![4, 4, 4, 4, 4]; + runtime.block_on(async { + let sst_factory = FactoryImpl; + let sst_write_options = SstWriteOptions { + storage_format_hint: StorageFormatHint::Auto, + num_rows_per_row_group, + compression: table_options::Compression::Uncompressed, + max_buffer_size: 0, + }; + + let dir = tempdir().unwrap(); + let root = dir.path(); + let store: ObjectStoreRef = Arc::new(LocalFileSystem::new_with_prefix(root).unwrap()); + let store_picker: ObjectStorePickerRef = Arc::new(store); + let sst_file_path = Path::from("test_dictionary.par"); + + let schema = build_schema_for_dictionary(); + let reader_projected_schema = ProjectedSchema::no_projection(schema.clone()); + let sst_meta = MetaData { + min_key: Bytes::from_static(b"100"), + max_key: Bytes::from_static(b"200"), + time_range: TimeRange::new_unchecked(Timestamp::new(1), Timestamp::new(2)), + max_sequence: 200, + schema: schema.clone(), + }; + + let mut counter = 5; + let record_batch_stream = Box::new(stream::poll_fn(move |_| -> Poll> { + if counter == 0 { + return Poll::Ready(None); + } + counter -= 1; + + let ts = 100 + counter; + let rows = vec![ + build_row_for_dictionary(1, ts, Some("tagv1"), "tagv2", 1), + build_row_for_dictionary(2, ts, Some("tagv2"), "tagv2", 2), + build_row_for_dictionary(3, ts, None, "tagv3", 3), + build_row_for_dictionary(4, ts, Some("tagv3"), "tagv2", 2), + ]; + let batch = build_record_batch_with_key(schema.clone(), rows); + Poll::Ready(Some(Ok(batch))) + })); + let mut writer = sst_factory + .create_writer( + &sst_write_options, + &sst_file_path, + &store_picker, + Level::MAX, + ) + .await + .unwrap(); + let sst_info = writer + .write(RequestId::next_id(), &sst_meta, record_batch_stream) + .await + .unwrap(); + + assert_eq!(20, sst_info.row_num); + + let scan_options = ScanOptions::default(); + // read sst back to test + let sst_read_options = SstReadOptions { + reverse: false, + frequency: ReadFrequency::Frequent, + num_rows_per_row_group: 5, + projected_schema: reader_projected_schema, + predicate: Arc::new(Predicate::empty()), + meta_cache: None, + scan_options, + runtime: runtime.clone(), + }; + + let mut reader: Box = { + let mut reader = AsyncParquetReader::new( + &sst_file_path, + &sst_read_options, + None, + &store_picker, + None, + ); + let mut sst_meta_readback = reader + .meta_data() + .await + .unwrap() + .as_parquet() + .unwrap() + .as_ref() + .clone(); + // sst filter is built insider sst writer, so overwrite to default for + // comparison. + sst_meta_readback.parquet_filter = Default::default(); + assert_eq!(&sst_meta_readback, &ParquetMetaData::from(sst_meta)); + assert_eq!( + expected_num_rows, + reader + .row_groups() + .await + .iter() + .map(|g| g.num_rows()) + .collect::>() + ); + + Box::new(reader) + }; + let mut stream = reader.read().await.unwrap(); + let mut expect_rows = vec![]; + for counter in &[4, 3, 2, 1, 0] { + expect_rows.push(build_row_for_dictionary( + 1, + 100 + counter, + Some("tagv1"), + "tagv2", + 1, + )); + expect_rows.push(build_row_for_dictionary( + 2, + 100 + counter, + Some("tagv2"), + "tagv2", + 2, + )); + expect_rows.push(build_row_for_dictionary(3, 100 + counter, None, "tagv3", 3)); + expect_rows.push(build_row_for_dictionary( + 4, + 100 + counter, + Some("tagv3"), + "tagv2", + 2, + )); + } + check_stream(&mut stream, expect_rows).await; + }); + } + #[test] fn test_parquet_build_and_read() { init_log_for_test(); @@ -391,7 +529,7 @@ mod tests { let sst_file_path = Path::from("data.par"); let schema = build_schema(); - let projected_schema = ProjectedSchema::no_projection(schema.clone()); + let reader_projected_schema = ProjectedSchema::no_projection(schema.clone()); let sst_meta = MetaData { min_key: Bytes::from_static(b"100"), max_key: Bytes::from_static(b"200"), @@ -440,7 +578,7 @@ mod tests { reverse: false, frequency: ReadFrequency::Frequent, num_rows_per_row_group: 5, - projected_schema, + projected_schema: reader_projected_schema, predicate: Arc::new(Predicate::empty()), meta_cache: None, scan_options, diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 49d9d6cae8..01ba153fd8 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -7,6 +7,7 @@ use std::{ convert::TryInto, fmt, fmt::Formatter, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Mutex, @@ -28,6 +29,7 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; +use tokio::sync::MutexGuard; use crate::{ instance::serial_executor::TableOpSerialExecutor, @@ -86,6 +88,36 @@ impl TableShardInfo { } } +/// The context for execution of serial operation on the table. +pub struct SerialExecContext { + /// Denotes whether `serial_exec` is valid. + /// + /// The `serial_exec` will be invalidated if the table is closed. + invalid: bool, + serial_exec: TableOpSerialExecutor, +} + +impl SerialExecContext { + #[inline] + pub fn invalidate(&mut self) { + self.invalid = true; + } +} + +impl Deref for SerialExecContext { + type Target = TableOpSerialExecutor; + + fn deref(&self) -> &Self::Target { + &self.serial_exec + } +} + +impl DerefMut for SerialExecContext { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.serial_exec + } +} + /// Data of a table pub struct TableData { /// Id of this table @@ -143,14 +175,13 @@ pub struct TableData { /// No write/alter is allowed if the table is dropped. dropped: AtomicBool, + serial_exec_ctx: tokio::sync::Mutex, + /// Metrics of this table pub metrics: Metrics, /// Shard info of the table pub shard_info: TableShardInfo, - - /// The table operation serial_exec - pub serial_exec: tokio::sync::Mutex, } impl fmt::Debug for TableData { @@ -217,6 +248,10 @@ impl TableData { preflush_write_buffer_size_ratio, )); + let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { + invalid: false, + serial_exec: TableOpSerialExecutor::new(table_id), + }); Ok(Self { id: table_id, name: table_name, @@ -235,7 +270,7 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), + serial_exec_ctx, }) } @@ -249,35 +284,17 @@ impl TableData { preflush_write_buffer_size_ratio: f32, mem_usage_collector: CollectorRef, ) -> Result { - let memtable_factory = Arc::new(SkiplistMemTableFactory); - let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); - let current_version = TableVersion::new(purge_queue); - let metrics = Metrics::default(); - let mutable_limit = AtomicU32::new(compute_mutable_limit( - add_meta.opts.write_buffer_size, + Self::new( + add_meta.space_id, + add_meta.table_id, + add_meta.table_name, + add_meta.schema, + shard_id, + add_meta.opts, + purger, preflush_write_buffer_size_ratio, - )); - - Ok(Self { - id: add_meta.table_id, - name: add_meta.table_name, - schema: Mutex::new(add_meta.schema), - space_id: add_meta.space_id, - mutable_limit, - mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio, - opts: ArcSwap::new(Arc::new(add_meta.opts)), - memtable_factory, mem_usage_collector, - current_version, - last_sequence: AtomicU64::new(0), - last_memtable_id: AtomicU64::new(0), - last_file_id: AtomicU64::new(0), - last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), - metrics, - shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), - }) + ) } /// Get current schema of the table. @@ -352,6 +369,34 @@ impl TableData { self.dropped.store(true, Ordering::SeqCst); } + /// Acquire the [`SerialExecContext`] if the table is not closed. + pub async fn acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.lock().await; + if v.invalid { + None + } else { + Some(v) + } + } + + /// Try to acquire the [SerialExecContext]. + /// + /// [None] will be returned if the serial_exec_ctx has been acquired + /// already, or the table is closed. + pub fn try_acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.try_lock(); + match v { + Ok(ctx) => { + if ctx.invalid { + None + } else { + Some(ctx) + } + } + Err(_) => None, + } + } + /// Returns total memtable memory usage in bytes. #[inline] pub fn memtable_memory_usage(&self) -> usize { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 82b2b54b9c..780b9cafcc 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -21,9 +21,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder, - ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, - WaitForPendingWrites, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, OperateClosedTable, + ReadOptions, ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, + TooManyPendingWrites, WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -255,7 +255,7 @@ impl TableImpl { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec = self.table_data.serial_exec.lock().await; + let serial_exec_ctx = self.table_data.acquire_serial_exec_ctx().await; // The `serial_exec` is acquired, let's merge the pending requests and write // them all. let pending_writes = { @@ -266,9 +266,22 @@ impl TableImpl { !pending_writes.is_empty(), "The pending writes should contain at least the one just pushed." ); - let merged_write_request = - merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); - (merged_write_request, serial_exec, pending_writes.notifiers) + match serial_exec_ctx { + Some(v) => { + let merged_write_request = merge_pending_write_requests( + pending_writes.writes, + pending_writes.num_rows, + ); + (merged_write_request, v, pending_writes.notifiers) + } + None => { + // The table has been closed, notify all the waiters in + // the queue. + let write_err = OperateClosedTable.fail(); + self.notify_waiters(pending_writes.notifiers, &write_err); + return write_err; + } + } } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -303,12 +316,18 @@ impl TableImpl { .box_err() .context(Write { table: self.name() }); - // There is no waiter for pending writes, return the write result. - if notifiers.is_empty() { - return write_res; - } - // Notify the waiters for the pending writes. + self.notify_waiters(notifiers, &write_res); + + write_res.map(|_| num_rows) + } + + #[inline] + fn should_queue_write_request(&self, request: &WriteRequest) -> bool { + request.row_group.num_rows() < self.instance.max_rows_in_write_queue + } + + fn notify_waiters(&self, notifiers: Vec>>, write_res: &Result) { match write_res { Ok(_) => { for notifier in notifiers { @@ -319,7 +338,6 @@ impl TableImpl { ); } } - Ok(num_rows) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -332,15 +350,9 @@ impl TableImpl { ); } } - Err(e) } } } - - #[inline] - fn should_queue_write_request(&self, request: &WriteRequest) -> bool { - request.row_group.num_rows() < self.instance.max_rows_in_write_queue - } } #[async_trait] @@ -384,11 +396,15 @@ impl Table for TableImpl { return self.write_with_pending_queue(request).await; } - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut writer = Writer::new( self.instance.clone(), self.space_table.clone(), - &mut serial_exec, + &mut serial_exec_ctx, ); writer .write(request) @@ -501,10 +517,14 @@ impl Table for TableImpl { } async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; @@ -518,10 +538,14 @@ impl Table for TableImpl { } async fn alter_options(&self, options: HashMap) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index 329c09677a..ddb7ffd21d 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -727,9 +727,9 @@ impl TableVersion { picker_ctx: PickerContext, picker: &CompactionPickerRef, ) -> picker::Result { - let inner = self.inner.read().unwrap(); + let mut inner = self.inner.write().unwrap(); - picker.pick_compaction(picker_ctx, &inner.levels_controller) + picker.pick_compaction(picker_ctx, &mut inner.levels_controller) } pub fn has_expired_sst(&self, expire_time: Option) -> bool { diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 4c09a84644..ab322ffba5 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -5,16 +5,17 @@ use std::sync::Arc; use arrow::{ array::{ - Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, - Date32Array as DateArray, Date32Builder as DateBuilder, Float32Array as FloatArray, - Float32Builder as FloatBuilder, Float64Array as DoubleArray, + Array, ArrayAccessor, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, + BooleanBuilder, Date32Array as DateArray, Date32Builder as DateBuilder, DictionaryArray, + Float32Array as FloatArray, Float32Builder as FloatBuilder, Float64Array as DoubleArray, Float64Builder as DoubleBuilder, Int16Array, Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder, Int8Array, Int8Builder, NullArray, StringArray, StringBuilder, - Time64NanosecondArray as TimeArray, Time64NanosecondBuilder as TimeBuilder, - TimestampMillisecondArray, TimestampMillisecondBuilder, UInt16Array, UInt16Builder, - UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, + StringDictionaryBuilder, Time64NanosecondArray as TimeArray, + Time64NanosecondBuilder as TimeBuilder, TimestampMillisecondArray, + TimestampMillisecondBuilder, UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, + UInt64Array, UInt64Builder, UInt8Array, UInt8Builder, }, - datatypes::{DataType, TimeUnit}, + datatypes::{DataType, Int32Type, TimeUnit}, error::ArrowError, }; use datafusion::physical_plan::{ @@ -142,6 +143,9 @@ pub struct VarbinaryColumn(BinaryArray); #[derive(Debug)] pub struct StringColumn(StringArray); +#[derive(Debug)] +pub struct StringDictionaryColumn(DictionaryArray); + #[derive(Debug)] pub struct DateColumn(DateArray); @@ -287,6 +291,58 @@ impl_column!( ); impl_column!(StringColumn, get_string_datum, get_string_datum_view); +// TODO +// impl_column!(StringDictionaryColumn, get_string_datum, +// get_string_datum_view); +impl StringDictionaryColumn { + #[doc = " Get datum by index."] + pub fn datum_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + Some(self.datum(index)) + } + + pub fn datum_view_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + Some(self.datum_view(index)) + } + + pub fn datum_view(&self, index: usize) -> DatumView { + if self.0.is_null(index) { + return DatumView::Null; + } + // TODO : Is this the efficient way? + DatumView::String(self.0.downcast_dict::().unwrap().value(index)) + } + + pub fn datum(&self, index: usize) -> Datum { + if self.0.is_null(index) { + return Datum::Null; + } + // TODO : Is this the efficient way? + Datum::String( + self.0 + .downcast_dict::() + .unwrap() + .value(index) + .into(), + ) + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.0.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } +} + macro_rules! impl_dedup { ($Column: ident) => { impl $Column { @@ -320,6 +376,31 @@ macro_rules! impl_dedup { impl_dedup!(TimestampColumn); impl_dedup!(VarbinaryColumn); impl_dedup!(StringColumn); +// impl_dedup!(StringDictionaryColumn); +// TODO +impl StringDictionaryColumn { + #[doc = " If datum i is not equal to previous datum i - 1, mark `selected[i]` to"] + #[doc = " true."] + #[doc = ""] + #[doc = " The first datum is marked to true."] + #[doc = ""] + #[doc = " The size of selected must equal to the size of this column and"] + #[doc = " initialized to false."] + #[allow(clippy::float_cmp)] + pub fn dedup(&self, selected: &mut [bool]) { + if self.0.is_empty() { + return; + } + selected[0] = true; + for i in 1..self.0.len() { + let current = self.0.key(i); + let prev = self.0.key(i - 1); + if current != prev { + selected[i] = true; + } + } + } +} macro_rules! impl_new_null { ($Column: ident, $Builder: ident) => { @@ -388,6 +469,38 @@ impl_from_array_and_slice!(NullColumn, NullArray); impl_from_array_and_slice!(TimestampColumn, TimestampMillisecondArray); impl_from_array_and_slice!(VarbinaryColumn, BinaryArray); impl_from_array_and_slice!(StringColumn, StringArray); +// impl_from_array_and_slice!(StringDictionaryColumn, +// DictionaryArray); + +impl From> for StringDictionaryColumn { + fn from(array: DictionaryArray) -> Self { + Self(array) + } +} +impl From<&DictionaryArray> for StringDictionaryColumn { + fn from(array_ref: &DictionaryArray) -> Self { + let array_data = array_ref.into_data(); + let array = DictionaryArray::::from(array_data); + Self(array) + } +} +impl StringDictionaryColumn { + fn to_arrow_array(&self) -> DictionaryArray { + let array_data = self.0.clone().into_data(); + DictionaryArray::::from(array_data) + } + + #[doc = " Returns a zero-copy slice of this array with the indicated offset and"] + #[doc = " length."] + #[doc = ""] + #[doc = " Panics if offset with length is greater than column length."] + fn slice(&self, offset: usize, length: usize) -> Self { + let array_slice = self.0.slice(offset, length); + let array_data = array_slice.into_data(); + let array = DictionaryArray::::from(array_data); + Self(array) + } +} macro_rules! impl_iter { ($Column: ident, $Value: ident) => { @@ -437,6 +550,18 @@ impl StringColumn { Self(array) } } +impl StringDictionaryColumn { + /// Create a column that all values are null. + fn new_null(num_rows: usize) -> Self { + let mut builder = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + builder.append_null(); + } + let array = builder.finish(); + + Self(array) + } +} macro_rules! impl_numeric_column { ($(($Kind: ident, $type: ty)), *) => { @@ -538,23 +663,40 @@ impl StringColumn { } } +// impl StringDictionaryColumn { +// pub fn iter(&self) -> impl Iterator> + '_ { +// self.0.iter() +// } + +// pub fn value(&self, index: usize) -> Option<&str> { +// if self.0.is_valid(index) { +// unsafe { Some(self.0.value_unchecked(index)) } +// } else { +// None +// } +// } +// } + macro_rules! impl_column_block { ($($Kind: ident), *) => { impl ColumnBlock { pub fn datum_kind(&self) -> DatumKind { match self { + ColumnBlock::StringDictionary(_) => DatumKind::String, $(ColumnBlock::$Kind(_) => DatumKind::$Kind,)* } } pub fn datum_opt(&self, index: usize) -> Option { match self { + ColumnBlock::StringDictionary(col) => col.datum_opt(index), $(ColumnBlock::$Kind(col) => col.datum_opt(index),)* } } pub fn datum_view_opt(&self, index: usize) -> Option { match self { + ColumnBlock::StringDictionary(col) => col.datum_view_opt(index), $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* } } @@ -562,6 +704,7 @@ macro_rules! impl_column_block { /// Panic if index is out fo bound. pub fn datum_view(&self, index: usize) -> DatumView { match self { + ColumnBlock::StringDictionary(col) => col.datum_view(index), $(ColumnBlock::$Kind(col) => col.datum_view(index),)* } } @@ -569,18 +712,21 @@ macro_rules! impl_column_block { /// Panic if index is out fo bound. pub fn datum(&self, index: usize) -> Datum { match self { + ColumnBlock::StringDictionary(col) => col.datum(index), $(ColumnBlock::$Kind(col) => col.datum(index),)* } } pub fn num_rows(&self) -> usize { match self { + ColumnBlock::StringDictionary(col) => col.num_rows(), $(ColumnBlock::$Kind(col) => col.num_rows(),)* } } pub fn to_arrow_array_ref(&self) -> ArrayRef { match self { + ColumnBlock::StringDictionary(col) => Arc::new(col.to_arrow_array()), $(ColumnBlock::$Kind(col) => Arc::new(col.to_arrow_array()),)* } } @@ -590,6 +736,7 @@ macro_rules! impl_column_block { /// The first datum is not marked to true. pub fn dedup(&self, selected: &mut [bool]) { match self { + ColumnBlock::StringDictionary(col) => col.dedup(selected), $(ColumnBlock::$Kind(col) => col.dedup(selected),)* } } @@ -600,6 +747,7 @@ macro_rules! impl_column_block { #[must_use] pub fn slice(&self, offset: usize, length: usize) -> Self { match self { + ColumnBlock::StringDictionary(col) => ColumnBlock::StringDictionary(col.slice(offset, length)), $(ColumnBlock::$Kind(col) => ColumnBlock::$Kind(col.slice(offset, length)),)* } } @@ -612,6 +760,12 @@ macro_rules! impl_column_block { } } })* + + impl From for ColumnBlock { + fn from(column: StringDictionaryColumn) -> Self { + Self::StringDictionary(column) + } + } }; } @@ -628,6 +782,8 @@ macro_rules! define_column_block { #[derive(Debug)] pub enum ColumnBlock { Null(NullColumn), + StringDictionary(StringDictionaryColumn), + String(StringColumn), $( $Kind([<$Kind Column>]), )* @@ -635,8 +791,37 @@ macro_rules! define_column_block { impl ColumnBlock { pub fn try_from_arrow_array_ref(datum_kind: &DatumKind, array: &ArrayRef) -> Result { + let is_dictionary : bool = if let DataType::Dictionary(..) = array.data_type() { + true + } else { + false + }; + // todo! let column = match datum_kind { DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(array.len())), + DatumKind::String => { + if !is_dictionary { + let mills_array; + let cast_column = match array.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + mills_array = cast_nanosecond_to_mills(array)?; + cast_array(datum_kind, &mills_array)? + } + _ => cast_array(datum_kind, array)?, + }; + ColumnBlock::String(StringColumn::from(cast_column)) + } else { + let mills_array; + let cast_column = match array.data_type() { + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + mills_array = cast_nanosecond_to_mills(array)?; + cast_array(datum_kind, &mills_array)? + } + _ => cast_array(datum_kind, array)?, + }; + ColumnBlock::StringDictionary(StringDictionaryColumn::from(cast_column)) + } + }, $( DatumKind::$Kind => { let mills_array; @@ -660,6 +845,7 @@ macro_rules! define_column_block { pub fn new_null_with_type(kind: &DatumKind, rows: usize) -> Result { let block = match kind { DatumKind::Null => ColumnBlock::Null(NullColumn::new_null(rows)), + DatumKind::String => ColumnBlock::String(StringColumn::new_null(rows)), $( DatumKind::$Kind => ColumnBlock::$Kind([<$Kind Column>]::new_null(rows)), )* @@ -674,8 +860,8 @@ macro_rules! define_column_block { // Define column blocks, Null is defined explicitly in macro. define_column_block!( - Timestamp, Double, Float, Varbinary, String, UInt64, UInt32, UInt16, UInt8, Int64, Int32, - Int16, Int8, Boolean, Date, Time + Timestamp, Double, Float, Varbinary, UInt64, UInt32, UInt16, UInt8, Int64, Int32, Int16, Int8, + Boolean, Date, Time ); impl ColumnBlock { @@ -796,7 +982,7 @@ macro_rules! append_block { macro_rules! define_column_block_builder { ($(($Kind: ident, $Builder: ident)), *) => { paste! { - #[derive(Debug)] + // #[derive(Debug)] pub enum ColumnBlockBuilder { Null { rows: usize }, Timestamp(TimestampMillisecondBuilder), @@ -804,6 +990,7 @@ macro_rules! define_column_block_builder { String(StringBuilder), Date(DateBuilder), Time(TimeBuilder), + Dictionary(StringDictionaryBuilder::), $( $Kind($Builder), )* @@ -811,13 +998,15 @@ macro_rules! define_column_block_builder { impl ColumnBlockBuilder { /// Create by data type with initial capacity - pub fn with_capacity(data_type: &DatumKind, item_capacity: usize) -> Self { + pub fn with_capacity(data_type: &DatumKind, item_capacity: usize, is_dictionary : bool) -> Self { match data_type { DatumKind::Null => Self::Null { rows: 0 }, DatumKind::Timestamp => Self::Timestamp(TimestampMillisecondBuilder::with_capacity(item_capacity)), // The data_capacity is set as 1024, because the item is variable-size type. DatumKind::Varbinary => Self::Varbinary(BinaryBuilder::with_capacity(item_capacity, 1024)), - DatumKind::String => Self::String(StringBuilder::with_capacity(item_capacity, 1024)), + DatumKind::String if !is_dictionary => Self::String(StringBuilder::with_capacity(item_capacity, 1024)), + DatumKind::String if is_dictionary => Self::Dictionary(StringDictionaryBuilder::::new()), + DatumKind::String => Self::Dictionary(StringDictionaryBuilder::::new()), DatumKind::Date => Self::Date(DateBuilder::with_capacity(item_capacity)), DatumKind::Time => Self::Time(TimeBuilder::with_capacity(item_capacity)), $( @@ -847,6 +1036,17 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_datum!(String, builder, Datum, datum), Self::Date(builder) => append_datum!(Date, builder, Datum, datum), Self::Time(builder) => append_datum!(Time, builder, Datum, datum), + Self::Dictionary(builder) => { + match datum { + Datum::Null => Ok(builder.append_null()), + Datum::String(v) => Ok(builder.append_value(v)), + _ => ConflictType { + expect: DatumKind::String, + given: datum.kind(), + } + .fail() + } + }, $( Self::$Kind(builder) => append_datum!($Kind, builder, Datum, datum), )* @@ -874,6 +1074,17 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_datum!(String, builder, DatumView, datum), Self::Date(builder) => append_datum!(Date, builder, DatumView, datum), Self::Time(builder) => append_datum!(Time, builder, DatumView, datum), + Self::Dictionary(builder) => { + match datum { + DatumView::Null => Ok(builder.append_null()), + DatumView::String(v) => Ok(builder.append_value(v)), + _ => ConflictType { + expect: DatumKind::String, + given: datum.kind(), + } + .fail() + } + }, $( Self::$Kind(builder) => append_datum!($Kind, builder, DatumView, datum), )* @@ -898,6 +1109,48 @@ macro_rules! define_column_block_builder { Self::String(builder) => append_block!(String, builder, ColumnBlock, block, start, len), Self::Date(builder) => append_block!(Date, builder, ColumnBlock, block, start, len), Self::Time(builder) => append_block!(Time, builder, ColumnBlock, block, start, len), + Self::Dictionary(builder) => { + match block { + ColumnBlock::Null(v) => { + let end = std::cmp::min(start + len, v.num_rows()); + for _ in start..end { + builder.append_null(); + } + Ok(()) + } + ColumnBlock::StringDictionary(v) => { + // There is no convenient api to copy a range of data from array to builder, so + // we still need to clone value one by one using a for loop. + let end = std::cmp::min(start + len, v.num_rows()); + for i in start..end { + // let value_opt = v.0.keys().value(i); + if v.0.is_null(i) { + builder.append_null(); + } else { + let value = v.datum(i); + builder.append_value(value.as_str().unwrap()); + // let rd_buf: &StringArray = + // v.0.values().as_any().downcast_ref::().unwrap(); + // let value_opt = rd_buf.value(i); + } + // match value_opt { + // Some(value) => { + // builder.append_value(value.as_str().unwrap()); + // } + // None => { + // builder.append_null(); + // } + // } + } + Ok(()) + } + _ => ConflictType { + expect: DatumKind::String, + given: block.datum_kind(), + } + .fail(), + } + }, $( Self::$Kind(builder) => append_block!($Kind, builder, ColumnBlock, block, start, len), )* @@ -912,6 +1165,7 @@ macro_rules! define_column_block_builder { Self::String(builder) => builder.len(), Self::Date(builder) => builder.len(), Self::Time(builder) => builder.len(), + Self::Dictionary(builder) => builder.len(), $( Self::$Kind(builder) => builder.len(), )* @@ -931,6 +1185,9 @@ macro_rules! define_column_block_builder { Self::String(builder) => StringColumn::from(builder.finish()).into(), Self::Date(builder) => DateColumn::from(builder.finish()).into(), Self::Time(builder) => TimeColumn::from(builder.finish()).into(), + Self::Dictionary(builder) => { + StringDictionaryColumn::from(builder.finish()).into() + }, $( Self::$Kind(builder) => [<$Kind Column>]::from(builder.finish()).into(), )* @@ -959,8 +1216,8 @@ define_column_block_builder!( impl ColumnBlockBuilder { /// Create by data type - pub fn new(data_type: &DatumKind) -> Self { - Self::with_capacity(data_type, 0) + pub fn new(data_type: &DatumKind, is_dictionry: bool) -> Self { + Self::with_capacity(data_type, 0, is_dictionry) } pub fn is_empty(&self) -> bool { @@ -976,7 +1233,9 @@ impl ColumnBlockBuilder { #[cfg(test)] mod tests { use super::*; - use crate::tests::{build_rows, build_schema}; + use crate::tests::{ + build_row_for_dictionary, build_rows, build_schema, build_schema_for_dictionary, + }; #[test] fn test_column_block_builder() { @@ -984,7 +1243,7 @@ mod tests { let rows = build_rows(); // DatumKind::Varbinary let column = schema.column(0); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2, false); // append builder.append(rows[0][0].clone()).unwrap(); @@ -998,7 +1257,7 @@ mod tests { let column_block = builder.build(); assert_eq!(column_block.num_rows(), 2); - let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2); + let mut builder = ColumnBlockBuilder::with_capacity(&column.data_type, 2, false); // append_block_range builder.append_block_range(&column_block, 0, 1).unwrap(); @@ -1015,4 +1274,65 @@ mod tests { Datum::Varbinary(Bytes::copy_from_slice(b"binary key1")) ); } + + #[test] + fn test_column_block_string_dictionary_builder() { + let schema = build_schema_for_dictionary(); + let rows = vec![ + build_row_for_dictionary(1, 1, Some("tag1_1"), "tag2_1", 1), + build_row_for_dictionary(2, 2, Some("tag1_2"), "tag2_2", 2), + build_row_for_dictionary(3, 3, Some("tag1_3"), "tag2_3", 3), + build_row_for_dictionary(4, 4, Some("tag1_1"), "tag2_4", 3), + build_row_for_dictionary(5, 5, Some("tag1_3"), "tag2_4", 4), + build_row_for_dictionary(6, 6, None, "tag2_4", 4), + ]; + // DatumKind::String , is_dictionary = true + let column = schema.column(2); + println!("{column:?}"); + let mut builder = + ColumnBlockBuilder::with_capacity(&column.data_type, 0, column.is_dictionary); + // append + (0..rows.len()).for_each(|i| builder.append(rows[i][2].clone()).unwrap()); + + let ret = builder.append(rows[0][0].clone()); + assert!(ret.is_err()); + + // append_view + builder.append_view(rows[5][2].as_view()).unwrap(); + let ret = builder.append_view(rows[1][0].as_view()); + + assert!(ret.is_err()); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 7); + let mut builder = + ColumnBlockBuilder::with_capacity(&column.data_type, 2, column.is_dictionary); + + // append_block_range + (0..rows.len()).for_each(|i| builder.append_block_range(&column_block, i, 1).unwrap()); + + let column_block = builder.build(); + assert_eq!(column_block.num_rows(), 6); + assert_eq!( + column_block.datum(0), + Datum::String(StringBytes::from("tag1_1")) + ); + assert_eq!( + column_block.datum(1), + Datum::String(StringBytes::from("tag1_2")) + ); + assert_eq!( + column_block.datum(2), + Datum::String(StringBytes::from("tag1_3")) + ); + assert_eq!( + column_block.datum(3), + Datum::String(StringBytes::from("tag1_1")) + ); + assert_eq!( + column_block.datum(4), + Datum::String(StringBytes::from("tag1_3")) + ); + assert_eq!(column_block.datum(5), Datum::Null); + } } diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 2f1a48cbbd..31bbbea355 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -119,6 +119,7 @@ pub enum ReadOp { struct ArrowFieldMeta { id: u32, is_tag: bool, + is_dictionary: bool, comment: String, } @@ -126,6 +127,7 @@ struct ArrowFieldMeta { pub enum ArrowFieldMetaKey { Id, IsTag, + IsDictionary, Comment, } @@ -134,6 +136,7 @@ impl ArrowFieldMetaKey { match self { ArrowFieldMetaKey::Id => "field::id", ArrowFieldMetaKey::IsTag => "field::is_tag", + ArrowFieldMetaKey::IsDictionary => "field::is_dictionary", ArrowFieldMetaKey::Comment => "field::comment", } } @@ -159,6 +162,8 @@ pub struct ColumnSchema { /// Is tag, tag is just a hint for a column, there is no restriction that a /// tag column must be a part of primary key pub is_tag: bool, + // Whether to use dictionary types for parquet store + pub is_dictionary: bool, /// Comment of the column pub comment: String, /// Column name in response @@ -273,6 +278,7 @@ impl TryFrom for ColumnSchema { data_type: DatumKind::from(data_type), is_nullable: column_schema.is_nullable, is_tag: column_schema.is_tag, + is_dictionary: column_schema.is_dictionary, comment: column_schema.comment, escaped_name, default_value, @@ -287,6 +293,7 @@ impl TryFrom<&Arc> for ColumnSchema { let ArrowFieldMeta { id, is_tag, + is_dictionary, comment, } = decode_arrow_field_meta_data(field.metadata())?; Ok(Self { @@ -299,6 +306,7 @@ impl TryFrom<&Arc> for ColumnSchema { )?, is_nullable: field.is_nullable(), is_tag, + is_dictionary, comment, escaped_name: field.name().escape_debug().to_string(), default_value: None, @@ -309,11 +317,13 @@ impl TryFrom<&Arc> for ColumnSchema { impl From<&ColumnSchema> for Field { fn from(col_schema: &ColumnSchema) -> Self { let metadata = encode_arrow_field_meta_data(col_schema); - let mut field = Field::new( - &col_schema.name, - col_schema.data_type.into(), - col_schema.is_nullable, - ); + // If the column sholud use dictionary, create correspond dictionary type. + let data_type: DataType = if col_schema.is_dictionary { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + } else { + col_schema.data_type.into() + }; + let mut field = Field::new(&col_schema.name, data_type, col_schema.is_nullable); field.set_metadata(metadata); field @@ -343,6 +353,7 @@ fn decode_arrow_field_meta_data(meta: &HashMap) -> Result HashMap, } @@ -385,6 +401,7 @@ impl Builder { data_type, is_nullable: false, is_tag: false, + is_dictionary: false, comment: String::new(), default_value: None, } @@ -407,6 +424,12 @@ impl Builder { self } + /// Set this column is tag, default is false (not a tag). + pub fn is_dictionary(mut self, is_dictionary: bool) -> Self { + self.is_dictionary = is_dictionary; + self + } + pub fn comment(mut self, comment: String) -> Self { self.comment = comment; self @@ -439,6 +462,7 @@ impl Builder { data_type: self.data_type, is_nullable: self.is_nullable, is_tag: self.is_tag, + is_dictionary: self.is_dictionary, comment: self.comment, escaped_name, default_value: self.default_value, @@ -460,6 +484,7 @@ impl From for schema_pb::ColumnSchema { is_nullable: src.is_nullable, id: src.id, is_tag: src.is_tag, + is_dictionary: src.is_dictionary, comment: src.comment, default_value, } @@ -494,6 +519,7 @@ mod tests { data_type: DatumKind::Boolean, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "Comment of this column".to_string(), escaped_name: "test_column_schema".escape_debug().to_string(), default_value: Some(Expr::Value(Value::Boolean(true))), diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index a7a73c9381..a56d1d1794 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -477,7 +477,13 @@ impl RecordBatchWithKeyBuilder { let builders = schema_with_key .columns() .iter() - .map(|column_schema| ColumnBlockBuilder::with_capacity(&column_schema.data_type, 0)) + .map(|column_schema| { + ColumnBlockBuilder::with_capacity( + &column_schema.data_type, + 0, + column_schema.is_dictionary, + ) + }) .collect(); Self { schema_with_key, @@ -490,7 +496,11 @@ impl RecordBatchWithKeyBuilder { .columns() .iter() .map(|column_schema| { - ColumnBlockBuilder::with_capacity(&column_schema.data_type, capacity) + ColumnBlockBuilder::with_capacity( + &column_schema.data_type, + capacity, + column_schema.is_dictionary, + ) }) .collect(); Self { @@ -632,12 +642,10 @@ impl ArrowRecordBatchProjector { let schema_with_key = self.row_projector.schema_with_key().clone(); let source_projection = self.row_projector.source_projection(); let mut column_blocks = Vec::with_capacity(schema_with_key.num_columns()); - let num_rows = arrow_record_batch.num_rows(); // ensure next_arrow_column_idx < num_columns let mut next_arrow_column_idx = 0; let num_columns = arrow_record_batch.num_columns(); - for (source_idx, column_schema) in source_projection.iter().zip(schema_with_key.columns()) { match source_idx { Some(_) => { @@ -651,7 +659,6 @@ impl ArrowRecordBatchProjector { let array = arrow_record_batch.column(next_arrow_column_idx); next_arrow_column_idx += 1; - let column_block = ColumnBlock::try_from_arrow_array_ref(&column_schema.data_type, array) .context(CreateColumnBlock)?; diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index c5db4d0636..d94f10e52a 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -690,7 +690,7 @@ impl Schema { self.column_schemas.num_columns() } - /// Returns true if idx is primary key idnex + /// Returns true if idx is primary key index pub fn is_primary_key_index(&self, idx: &usize) -> bool { self.primary_key_indexes.contains(idx) } diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index 0703d39d1e..fd81255da2 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -129,7 +129,8 @@ fn default_value_schema_builder() -> schema::Builder { } /// Build a schema for testing: -/// (key1(varbinary), key2(timestamp), field1(double), field2(string)) +/// (key1(varbinary), key2(timestamp), field1(double), field2(string), +/// field3(date), field4(time)) pub fn build_schema() -> Schema { base_schema_builder().build().unwrap() } @@ -144,6 +145,50 @@ pub fn build_schema() -> Schema { pub fn build_default_value_schema() -> Schema { default_value_schema_builder().build().unwrap() } +/// Build a schema for testing: +/// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(int8), +/// field2(float)) +pub fn build_schema_for_dictionary() -> Schema { + let builder = schema::Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .build() + .unwrap(), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("time".to_string(), DatumKind::Timestamp) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag1".to_string(), DatumKind::String) + .is_tag(true) + .is_dictionary(true) + .is_nullable(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("tag2".to_string(), DatumKind::String) + .is_tag(true) + .is_dictionary(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("value".to_string(), DatumKind::Int8) + .build() + .unwrap(), + ) + .unwrap(); + + builder.build().unwrap() +} /// Build a schema for testing: /// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(int8), @@ -193,6 +238,23 @@ pub fn build_schema_for_cpu() -> Schema { builder.build().unwrap() } +pub fn build_row_for_dictionary( + key1: u64, + key2: i64, + tag1: Option<&str>, + tag2: &str, + value: i8, +) -> Row { + let datums = vec![ + Datum::UInt64(key1), + Datum::Timestamp(Timestamp::new(key2)), + tag1.map(|v| Datum::String(StringBytes::from(v))) + .unwrap_or(Datum::Null), + Datum::String(StringBytes::from(tag2)), + Datum::Int8(value), + ]; + Row::from_datums(datums) +} pub fn build_projected_schema() -> ProjectedSchema { let schema = build_schema(); assert!(schema.num_columns() > 1); diff --git a/df_operator/src/udfs/time_bucket.rs b/df_operator/src/udfs/time_bucket.rs index 1ea693d954..bb4c6b29bb 100644 --- a/df_operator/src/udfs/time_bucket.rs +++ b/df_operator/src/udfs/time_bucket.rs @@ -141,8 +141,9 @@ impl<'a> TimeBucket<'a> { } fn call(&self) -> Result { + // TODO mising is_dictionary params let mut out_column_builder = - ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, self.column.num_rows()); + ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, self.column.num_rows(), false); for ts_opt in self.column.iter() { match ts_opt { Some(ts) => { diff --git a/docs/minimal.toml b/docs/minimal.toml index 53b77abe5b..5d7fe28b9e 100644 --- a/docs/minimal.toml +++ b/docs/minimal.toml @@ -4,7 +4,7 @@ http_port = 5440 grpc_port = 8831 [logger] -level = "info" +level = "debug" [tracing] dir = "/tmp/ceresdb" diff --git a/interpreters/src/insert.rs b/interpreters/src/insert.rs index 95bc47a0cb..782b07b6f3 100644 --- a/interpreters/src/insert.rs +++ b/interpreters/src/insert.rs @@ -341,7 +341,11 @@ fn get_or_extract_column_from_row_groups( .unwrap_or_else(|| { let data_type = row_groups.schema().column(column_idx).data_type; let iter = row_groups.iter_column(column_idx); - let mut builder = ColumnBlockBuilder::with_capacity(&data_type, iter.size_hint().0); + let mut builder = ColumnBlockBuilder::with_capacity( + &data_type, + iter.size_hint().0, + row_groups.schema().column(column_idx).is_dictionary, + ); for datum in iter { builder.append(datum.clone()).context(BuildColumnBlock)?; diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 4ef9ebeecf..14eedfd93e 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -395,26 +395,26 @@ mod tests { build_row(1000000, 2, 10.0, "v4"), build_row(1000000, 3, 10.0, "v3"), ]; - - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 2); + // TODO missing is_dictionary paramms + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 2, false); for row in &rows { builder.append(row[0].clone()).unwrap(); } let timestamp_block = builder.build(); - - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 2); + // TODO missing is_dictionary paramms + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 2, false); for row in &rows { builder.append(row[1].clone()).unwrap(); } let tsid_block = builder.build(); - - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Double, 2); + // TODO missing is_dictionary paramms + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::Double, 2, false); for row in &rows { builder.append(row[2].clone()).unwrap(); } let field_block = builder.build(); - - let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 2); + // TODO missing is_dictionary paramms + let mut builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 2, false); for row in &rows { builder.append(row[3].clone()).unwrap(); } diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index 58cba675ab..85681d95c5 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -811,11 +811,13 @@ mod tests { } fn build_test_column_blocks() -> Vec { - let mut measurement_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3); - let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3); - let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3); + // TODO missing is_dictionary paramms + let mut measurement_builder = + ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut tag_builder = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut time_builder = ColumnBlockBuilder::with_capacity(&DatumKind::Timestamp, 3, false); + let mut field_builder1 = ColumnBlockBuilder::with_capacity(&DatumKind::String, 3, false); + let mut field_builder2 = ColumnBlockBuilder::with_capacity(&DatumKind::UInt64, 3, false); // Data in measurement1 let measurement1 = Datum::String(StringBytes::copy_from_str("m1")); diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index 694bc0ee0e..df3f350f85 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -1441,6 +1441,7 @@ mod tests { data_type: String, is_nullable: false, is_tag: true, + is_dictionary: false, comment: "", escaped_name: "c1", default_value: None, @@ -1451,6 +1452,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "ts", default_value: None, @@ -1461,6 +1463,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c3", default_value: None, @@ -1471,6 +1474,7 @@ mod tests { data_type: UInt32, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c4", default_value: Some( @@ -1488,6 +1492,7 @@ mod tests { data_type: UInt32, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c5", default_value: Some( @@ -1514,6 +1519,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "c6", default_value: Some( @@ -1612,6 +1618,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1622,6 +1629,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1632,6 +1640,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1642,6 +1651,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1652,6 +1662,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1662,6 +1673,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1687,6 +1699,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1697,6 +1710,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1707,6 +1721,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1717,6 +1732,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1727,6 +1743,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1737,6 +1754,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1851,6 +1869,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1861,6 +1880,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1871,6 +1891,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1881,6 +1902,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1891,6 +1913,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1901,6 +1924,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -1946,6 +1970,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -1956,6 +1981,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -1966,6 +1992,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -1976,6 +2003,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -1986,6 +2014,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -1996,6 +2025,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -2017,6 +2047,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "add_col", default_value: None, @@ -2055,6 +2086,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -2065,6 +2097,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -2075,6 +2108,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -2085,6 +2119,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -2095,6 +2130,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -2105,6 +2141,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, @@ -2156,6 +2193,7 @@ mod tests { data_type: Varbinary, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key1", default_value: None, @@ -2166,6 +2204,7 @@ mod tests { data_type: Timestamp, is_nullable: false, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "key2", default_value: None, @@ -2176,6 +2215,7 @@ mod tests { data_type: Double, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field1", default_value: None, @@ -2186,6 +2226,7 @@ mod tests { data_type: String, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field2", default_value: None, @@ -2196,6 +2237,7 @@ mod tests { data_type: Date, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field3", default_value: None, @@ -2206,6 +2248,7 @@ mod tests { data_type: Time, is_nullable: true, is_tag: false, + is_dictionary: false, comment: "", escaped_name: "field4", default_value: None, diff --git a/server/src/mysql/writer.rs b/server/src/mysql/writer.rs index e2af7880e7..af01fa09ec 100644 --- a/server/src/mysql/writer.rs +++ b/server/src/mysql/writer.rs @@ -150,6 +150,7 @@ mod tests { name: "id".to_string(), data_type: DatumKind::Int32, is_nullable: false, + is_dictionary: false, is_tag: false, comment: "".to_string(), escaped_name: "id".to_string(), @@ -163,6 +164,7 @@ mod tests { name: "name".to_string(), data_type: DatumKind::String, is_nullable: true, + is_dictionary: false, is_tag: true, comment: "".to_string(), escaped_name: "name".to_string(), @@ -177,6 +179,7 @@ mod tests { data_type: DatumKind::Timestamp, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "birthday".to_string(), default_value: None, @@ -190,6 +193,7 @@ mod tests { data_type: DatumKind::Boolean, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "is_show".to_string(), default_value: None, @@ -203,6 +207,7 @@ mod tests { data_type: DatumKind::Double, is_nullable: true, is_tag: true, + is_dictionary: false, comment: "".to_string(), escaped_name: "money".to_string(), default_value: None, diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 4e79a291fb..efeb5d76e7 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -244,7 +244,8 @@ fn build_column_block<'a, I: Iterator>( data_type: &DatumKind, iter: I, ) -> stream::Result { - let mut builder = ColumnBlockBuilder::with_capacity(data_type, iter.size_hint().0); + // TODO missing is_dictionary paramms + let mut builder = ColumnBlockBuilder::with_capacity(data_type, iter.size_hint().0, false); for datum in iter { builder .append(datum.clone()) diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 821bdb6195..aea9f64cf5 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -135,6 +135,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + #[snafu(display( "Failed to wait for pending writes, table:{table}.\nBacktrace:\n{backtrace}" ))]