From 4888f80c537c8d65d771abf16f384b743972ab2b Mon Sep 17 00:00:00 2001 From: Hugh Chern <60749105+alicorn0618@users.noreply.github.com> Date: Tue, 9 Jul 2024 14:56:08 +0800 Subject: [PATCH 1/3] refactor: partitioned_lock's elaboration (#1540) ## Rationale Extended the `try_new` interface while keeping the old one for compatibility. ## Detailed Changes * Implemented the `try_new_suggest_cap` method, while changing the old `try_new` method to `try_new_bit_len` to ensure compatibility. * Modified structs and functions that call old interfaces. ## Test Plan * Added new unit tests * Passed CI test --------- Co-authored-by: chunhao.ch --- src/components/object_store/src/disk_cache.rs | 4 +- src/components/object_store/src/mem_cache.rs | 2 +- src/components/partitioned_lock/src/lib.rs | 216 +++++++++++++++--- 3 files changed, 182 insertions(+), 40 deletions(-) diff --git a/src/components/object_store/src/disk_cache.rs b/src/components/object_store/src/disk_cache.rs index 981e6d0648..b0c7ba13ac 100644 --- a/src/components/object_store/src/disk_cache.rs +++ b/src/components/object_store/src/disk_cache.rs @@ -296,7 +296,7 @@ impl DiskCache { Ok(Self { root_dir, - meta_cache: Arc::new(PartitionedMutex::try_new( + meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, SeaHasherBuilder {}, @@ -545,7 +545,7 @@ impl DiskCacheStore { assert!(cap_per_part > 0); Ok(LruCache::new(cap_per_part)) }; - let meta_cache = PartitionedMutex::try_new( + let meta_cache = PartitionedMutex::try_new_with_bit_len( init_size_lru, FILE_SIZE_CACHE_PARTITION_BITS, SeaHasherBuilder, diff --git a/src/components/object_store/src/mem_cache.rs b/src/components/object_store/src/mem_cache.rs index 001be2ab8a..f602eee66e 100644 --- a/src/components/object_store/src/mem_cache.rs +++ b/src/components/object_store/src/mem_cache.rs @@ -81,7 +81,7 @@ impl MemCache { )) }; - let inner = PartitionedMutex::try_new( + let inner = PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, build_fixed_seed_ahasher_builder(), diff --git a/src/components/partitioned_lock/src/lib.rs b/src/components/partitioned_lock/src/lib.rs index de7ba3454e..22273b9709 100644 --- a/src/components/partitioned_lock/src/lib.rs +++ b/src/components/partitioned_lock/src/lib.rs @@ -36,20 +36,30 @@ impl PartitionedRwLock where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (1..partition_num) - .map(|_| init_fn(partition_num).map(RwLock::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) } pub fn read(&self, key: &K) -> RwLockReadGuard<'_, T> { @@ -68,6 +78,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(RwLock::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &RwLock { &self.partitions[index] @@ -89,20 +115,30 @@ impl PartitionedMutex where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (0..partition_num) - .map(|_| init_fn(partition_num).map(Mutex::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) } pub fn lock(&self, key: &K) -> MutexGuard<'_, T> { @@ -115,6 +151,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(Mutex::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &Mutex { &self.partitions[index] @@ -140,11 +192,43 @@ impl PartitionedMutexAsync where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = 1 << partition_bit_len; + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { + let mutex = self.get_partition(key); + + mutex.lock().await + } + + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; let partitions = (0..partition_num) .map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new)) .collect::>, E>>()?; @@ -156,12 +240,6 @@ where }) } - pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { - let mutex = self.get_partition(key); - - mutex.lock().await - } - fn get_partition(&self, key: &K) -> &tokio::sync::Mutex { &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } @@ -181,11 +259,66 @@ mod tests { use super::*; + #[test] + fn test_new_equivalence() { + let init_42 = |_: usize| Ok::<_, ()>(42); + + let test_rwlock_42_bit_len = + PartitionedRwLock::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_rwlock_42_suggest_cap = PartitionedRwLock::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_42_bit_len = + PartitionedMutex::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_mutex_42_suggest_cap = PartitionedMutex::try_new_with_suggest_cap( + init_42, + 16, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_async_42_bit_len = PartitionedMutexAsync::try_new_with_bit_len( + init_42, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + let test_mutex_async_42_suggest_cap = PartitionedMutexAsync::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + assert_eq!( + test_rwlock_42_bit_len.partition_mask, + test_rwlock_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_42_bit_len.partition_mask, + test_mutex_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_async_42_bit_len.partition_mask, + test_mutex_async_42_suggest_cap.partition_mask + ); + } + #[test] fn test_partitioned_rwlock() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -203,8 +336,12 @@ mod tests { #[test] fn test_partitioned_mutex() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedMutex::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedMutex::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -223,7 +360,7 @@ mod tests { async fn test_partitioned_mutex_async() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_hmap, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4, SeaHasherBuilder).unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -242,7 +379,8 @@ mod tests { fn test_partitioned_mutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutex::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + PartitionedMutex::try_new_with_bit_len(init_vec, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp_data = mutex_first.lock().unwrap(); @@ -256,8 +394,12 @@ mod tests { #[test] fn test_partitioned_rwmutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_vec, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp = mutex_first.write().unwrap(); assert!(mutex_first.try_write().is_err()); @@ -271,7 +413,7 @@ mod tests { async fn test_partitioned_mutex_async_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_vec, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4, SeaHasherBuilder).unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0).await; let mut _tmp_data = mutex_first.lock().await; From fa5c286eefb912f2c8c27394dd76a5b497252744 Mon Sep 17 00:00:00 2001 From: Draco Date: Mon, 15 Jul 2024 17:15:39 +0800 Subject: [PATCH 2/3] feat: support INSERT INTO SELECT (#1536) ## Rationale Close #557. ## Detailed Changes When generating the insert logical plan, alse generate the select logical plan and store it in the insert plan. Then execute the select logical plan in the insert interpreter, convert the result records into RowGroup and then insert it. ## Test Plan CI --- .../env/local/dml/insert_into_select.result | 79 ++++++ .../env/local/dml/insert_into_select.sql | 57 ++++ src/interpreters/src/factory.rs | 4 +- src/interpreters/src/insert.rs | 218 ++++++++++++--- src/proxy/src/write.rs | 4 +- src/query_frontend/src/plan.rs | 21 +- src/query_frontend/src/planner.rs | 262 ++++++++++-------- 7 files changed, 487 insertions(+), 158 deletions(-) create mode 100644 integration_tests/cases/env/local/dml/insert_into_select.result create mode 100644 integration_tests/cases/env/local/dml/insert_into_select.sql diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result b/integration_tests/cases/env/local/dml/insert_into_select.result new file mode 100644 index 0000000000..77b648a269 --- /dev/null +++ b/integration_tests/cases/env/local/dml/insert_into_select.result @@ -0,0 +1,79 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- +DROP TABLE IF EXISTS `insert_into_select_table1`; + +affected_rows: 0 + +CREATE TABLE `insert_into_select_table1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +affected_rows: 0 + +INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) +VALUES + (1, 100, "s1"), + (2, 200, "s2"), + (3, 300, "s3"); + +affected_rows: 3 + +DROP TABLE IF EXISTS `insert_into_select_table2`; + +affected_rows: 0 + +CREATE TABLE `insert_into_select_table2` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string NULL, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +affected_rows: 0 + +INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) +SELECT `timestamp`, `value` +FROM `insert_into_select_table1`; + +affected_rows: 3 + +SELECT `timestamp`, `value`, `name` +FROM `insert_into_select_table2`; + +timestamp,value,name, +Timestamp(1),Int32(100),String(""), +Timestamp(2),Int32(200),String(""), +Timestamp(3),Int32(300),String(""), + + +DROP TABLE `insert_into_select_table1`; + +affected_rows: 0 + +DROP TABLE `insert_into_select_table2`; + +affected_rows: 0 + diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql b/integration_tests/cases/env/local/dml/insert_into_select.sql new file mode 100644 index 0000000000..dbe157609d --- /dev/null +++ b/integration_tests/cases/env/local/dml/insert_into_select.sql @@ -0,0 +1,57 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +DROP TABLE IF EXISTS `insert_into_select_table1`; + +CREATE TABLE `insert_into_select_table1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) +VALUES + (1, 100, "s1"), + (2, 200, "s2"), + (3, 300, "s3"); + +DROP TABLE IF EXISTS `insert_into_select_table2`; + +CREATE TABLE `insert_into_select_table2` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string NULL, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) +SELECT `timestamp`, `value` +FROM `insert_into_select_table1`; + +SELECT `timestamp`, `value`, `name` +FROM `insert_into_select_table2`; + +DROP TABLE `insert_into_select_table1`; + +DROP TABLE `insert_into_select_table2`; diff --git a/src/interpreters/src/factory.rs b/src/interpreters/src/factory.rs index 1b10661c60..a47a86b259 100644 --- a/src/interpreters/src/factory.rs +++ b/src/interpreters/src/factory.rs @@ -82,7 +82,9 @@ impl Factory { self.physical_planner, self.query_runtime, ), - Plan::Insert(p) => InsertInterpreter::create(ctx, p), + Plan::Insert(p) => { + InsertInterpreter::create(ctx, p, self.query_executor, self.physical_planner) + } Plan::Create(p) => { CreateInterpreter::create(ctx, p, self.table_engine, self.table_manipulator) } diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index cc455b3fb6..d6307bf125 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -30,7 +30,8 @@ use common_types::{ column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, - row::RowGroup, + row::{Row, RowBuilder, RowGroup}, + schema::Schema, }; use datafusion::{ common::ToDFSchema, @@ -42,15 +43,23 @@ use datafusion::{ }, }; use df_operator::visitor::find_columns_by_expr; +use futures::TryStreamExt; +use generic_error::{BoxError, GenericError}; use hash_ext::hash64; use macros::define_result; -use query_frontend::plan::InsertPlan; -use snafu::{OptionExt, ResultExt, Snafu}; +use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; +use query_frontend::{ + plan::{InsertPlan, InsertSource, QueryPlan}, + planner::InsertMode, +}; +use runtime::Priority; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use table_engine::table::{TableRef, WriteRequest}; use crate::{ context::Context, interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, + RecordBatchVec, }; #[derive(Debug, Snafu)] @@ -94,6 +103,18 @@ pub enum Error { BuildColumnBlock { source: common_types::column_block::Error, }, + + #[snafu(display("Failed to create query context, err:{}", source))] + CreateQueryContext { source: crate::context::Error }, + + #[snafu(display("Failed to execute select physical plan, msg:{}, err:{}", msg, source))] + ExecuteSelectPlan { msg: String, source: GenericError }, + + #[snafu(display("Failed to build row, err:{}", source))] + BuildRow { source: common_types::row::Error }, + + #[snafu(display("Record columns not enough, len:{}, index:{}", len, index))] + RecordColumnsNotEnough { len: usize, index: usize }, } define_result!(Error); @@ -101,11 +122,23 @@ define_result!(Error); pub struct InsertInterpreter { ctx: Context, plan: InsertPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, } impl InsertInterpreter { - pub fn create(ctx: Context, plan: InsertPlan) -> InterpreterPtr { - Box::new(Self { ctx, plan }) + pub fn create( + ctx: Context, + plan: InsertPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, + ) -> InterpreterPtr { + Box::new(Self { + ctx, + plan, + executor, + physical_planner, + }) } } @@ -113,19 +146,42 @@ impl InsertInterpreter { impl Interpreter for InsertInterpreter { async fn execute(mut self: Box) -> InterpreterResult { // Generate tsid if needed. - self.maybe_generate_tsid().context(Insert)?; let InsertPlan { table, - mut rows, + source, default_value_map, } = self.plan; + let mut rows = match source { + InsertSource::Values { row_group } => row_group, + InsertSource::Select { + query: query_plan, + column_index_in_insert, + } => { + // TODO: support streaming insert + let record_batches = exec_select_logical_plan( + self.ctx, + query_plan, + self.executor, + self.physical_planner, + ) + .await + .context(Insert)?; + + if record_batches.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + convert_records_to_row_group(record_batches, column_index_in_insert, table.schema()) + .context(Insert)? + } + }; + + maybe_generate_tsid(&mut rows).context(Insert)?; + // Fill default values fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?; - // Context is unused now - let _ctx = self.ctx; - let request = WriteRequest { row_group: rows }; let num_rows = table @@ -138,42 +194,128 @@ impl Interpreter for InsertInterpreter { } } -impl InsertInterpreter { - fn maybe_generate_tsid(&mut self) -> Result<()> { - let schema = self.plan.rows.schema(); - let tsid_idx = schema.index_of_tsid(); - - if let Some(idx) = tsid_idx { - // Vec of (`index of tag`, `column id of tag`). - let tag_idx_column_ids: Vec<_> = schema - .columns() - .iter() - .enumerate() - .filter_map(|(i, column)| { - if column.is_tag { - Some((i, column.id)) - } else { - None - } - }) - .collect(); +async fn exec_select_logical_plan( + ctx: Context, + query_plan: QueryPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, +) -> Result { + let priority = Priority::High; + + let query_ctx = ctx + .new_query_context(priority) + .context(CreateQueryContext)?; + + // Create select physical plan. + let physical_plan = physical_planner + .plan(&query_ctx, query_plan) + .await + .box_err() + .context(ExecuteSelectPlan { + msg: "failed to build select physical plan", + })?; - let mut hash_bytes = Vec::new(); - for i in 0..self.plan.rows.num_rows() { - let row = self.plan.rows.get_row_mut(i).unwrap(); + // Execute select physical plan. + let record_batch_stream = executor + .execute(&query_ctx, physical_plan) + .await + .box_err() + .context(ExecuteSelectPlan { + msg: "failed to execute select physical plan", + })?; - let mut tsid_builder = TsidBuilder::new(&mut hash_bytes); + let record_batches = + record_batch_stream + .try_collect() + .await + .box_err() + .context(ExecuteSelectPlan { + msg: "failed to collect select execution results", + })?; - for (idx, column_id) in &tag_idx_column_ids { - tsid_builder.maybe_write_datum(*column_id, &row[*idx])?; + Ok(record_batches) +} + +fn convert_records_to_row_group( + record_batches: RecordBatchVec, + column_index_in_insert: Vec, + schema: Schema, +) -> Result { + let mut data_rows: Vec = Vec::new(); + + for record in &record_batches { + let num_cols = record.num_columns(); + let num_rows = record.num_rows(); + for row_idx in 0..num_rows { + let mut row_builder = RowBuilder::new(&schema); + // For each column in schema, append datum into row builder + for (index_opt, column_schema) in column_index_in_insert.iter().zip(schema.columns()) { + match index_opt { + InsertMode::Direct(index) => { + ensure!( + *index < num_cols, + RecordColumnsNotEnough { + len: num_cols, + index: *index + } + ); + let datum = record.column(*index).datum(row_idx); + row_builder = row_builder.append_datum(datum).context(BuildRow)?; + } + InsertMode::Null => { + // This is a null column + row_builder = row_builder.append_datum(Datum::Null).context(BuildRow)?; + } + InsertMode::Auto => { + // This is an auto generated column, fill by default value. + let kind = &column_schema.data_type; + row_builder = row_builder + .append_datum(Datum::empty(kind)) + .context(BuildRow)?; + } } + } + let row = row_builder.finish().context(BuildRow)?; + data_rows.push(row); + } + } + RowGroup::try_new(schema, data_rows).context(BuildRow) +} + +fn maybe_generate_tsid(rows: &mut RowGroup) -> Result<()> { + let schema = rows.schema(); + let tsid_idx = schema.index_of_tsid(); - let tsid = tsid_builder.finish(); - row[idx] = Datum::UInt64(tsid); + if let Some(idx) = tsid_idx { + // Vec of (`index of tag`, `column id of tag`). + let tag_idx_column_ids: Vec<_> = schema + .columns() + .iter() + .enumerate() + .filter_map(|(i, column)| { + if column.is_tag { + Some((i, column.id)) + } else { + None + } + }) + .collect(); + + let mut hash_bytes = Vec::new(); + for i in 0..rows.num_rows() { + let row = rows.get_row_mut(i).unwrap(); + + let mut tsid_builder = TsidBuilder::new(&mut hash_bytes); + + for (idx, column_id) in &tag_idx_column_ids { + tsid_builder.maybe_write_datum(*column_id, &row[*idx])?; } + + let tsid = tsid_builder.finish(); + row[idx] = Datum::UInt64(tsid); } - Ok(()) } + Ok(()) } struct TsidBuilder<'a> { diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs index fd8e54bd5b..0fcd082a1c 100644 --- a/src/proxy/src/write.rs +++ b/src/proxy/src/write.rs @@ -44,7 +44,7 @@ use interpreters::interpreter::Output; use logger::{debug, error, info, warn}; use query_frontend::{ frontend::{Context as FrontendContext, Frontend}, - plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan}, + plan::{AlterTableOperation, AlterTablePlan, InsertPlan, InsertSource, Plan}, planner::{build_column_schema, try_get_data_type_from_value}, provider::CatalogMetaProvider, }; @@ -861,7 +861,7 @@ fn write_table_request_to_insert_plan( })?; Ok(InsertPlan { table, - rows: row_group, + source: InsertSource::Values { row_group }, default_value_map: BTreeMap::new(), }) } diff --git a/src/query_frontend/src/plan.rs b/src/query_frontend/src/plan.rs index e5db6238eb..400d01e99c 100644 --- a/src/query_frontend/src/plan.rs +++ b/src/query_frontend/src/plan.rs @@ -39,7 +39,11 @@ use runtime::Priority; use snafu::{OptionExt, Snafu}; use table_engine::{partition::PartitionInfo, table::TableRef}; -use crate::{ast::ShowCreateObject, container::TableContainer, planner::get_table_ref}; +use crate::{ + ast::ShowCreateObject, + container::TableContainer, + planner::{get_table_ref, InsertMode}, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -305,13 +309,24 @@ pub struct DropTablePlan { pub partition_info: Option, } +#[derive(Debug)] +pub enum InsertSource { + Values { + row_group: RowGroup, + }, + Select { + column_index_in_insert: Vec, + query: QueryPlan, + }, +} + /// Insert logical plan #[derive(Debug)] pub struct InsertPlan { /// The table to insert pub table: TableRef, - /// RowGroup to insert - pub rows: RowGroup, + /// Insert source(could be value literals or select query) + pub source: InsertSource, /// Column indexes in schema to its default-value-expr which is used to fill /// values pub default_value_map: BTreeMap, diff --git a/src/query_frontend/src/planner.rs b/src/query_frontend/src/planner.rs index b5d0fb5003..cec4f75e8a 100644 --- a/src/query_frontend/src/planner.rs +++ b/src/query_frontend/src/planner.rs @@ -80,12 +80,13 @@ use crate::{ partition::PartitionParser, plan::{ AlterTableOperation, AlterTablePlan, CreateTablePlan, DescribeTablePlan, DropTablePlan, - ExistsTablePlan, InsertPlan, Plan, QueryPlan, QueryType, ShowCreatePlan, ShowPlan, - ShowTablesPlan, + ExistsTablePlan, InsertPlan, InsertSource, Plan, QueryPlan, QueryType, ShowCreatePlan, + ShowPlan, ShowTablesPlan, }, promql::{remote_query_to_plan, ColumnNames, Expr as PromExpr, RemoteQueryPlan}, provider::{ContextProviderAdapter, MetaProvider}, }; + // We do not carry backtrace in sql error because it is mainly used in server // handler and the error is usually caused by invalid/unsupported sql, which // should be easy to find out the reason. @@ -905,7 +906,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { } // REQUIRE: SqlStatement must be INSERT stmt - fn insert_to_plan(&self, sql_stmt: SqlStatement) -> Result { + fn insert_to_plan(self, sql_stmt: SqlStatement) -> Result { match sql_stmt { SqlStatement::Insert { table_name, @@ -996,11 +997,16 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { } } - let rows = build_row_group(schema, source, column_index_in_insert)?; + let source = build_insert_source( + schema, + source, + column_index_in_insert, + self.meta_provider, + )?; Ok(Plan::Insert(InsertPlan { table, - rows, + source, default_value_map, })) } @@ -1105,7 +1111,7 @@ fn normalize_func_name(sql_stmt: &mut SqlStatement) { } #[derive(Debug)] -enum InsertMode { +pub enum InsertMode { // Insert the value in expr with given index directly. Direct(usize), // No value provided, insert a null. @@ -1154,12 +1160,13 @@ fn parse_data_value_from_expr(data_type: DatumKind, expr: &mut Expr) -> Result( schema: Schema, source: Box, column_index_in_insert: Vec, -) -> Result { + meta_provider: ContextProviderAdapter

, +) -> Result { // Build row group by schema match *source.body { SetExpr::Values(Values { @@ -1207,7 +1214,33 @@ fn build_row_group( } // Build the whole row group - Ok(RowGroup::new_unchecked(schema, rows)) + Ok(InsertSource::Values { + row_group: RowGroup::new_unchecked(schema, rows), + }) + } + SetExpr::Select(..) => { + let mut select_stmt = SqlStatement::Query(source); + normalize_func_name(&mut select_stmt); + + let df_planner = SqlToRel::new_with_options(&meta_provider, DEFAULT_PARSER_OPTS); + let select_table_name = parse_table_name_with_standard(&select_stmt); + + let select_df_plan = df_planner + .sql_statement_to_plan(select_stmt) + .context(DatafusionPlan)?; + let select_df_plan = optimize_plan(&select_df_plan).context(DatafusionPlan)?; + + // Get all tables needed in the plan + let tables = meta_provider.try_into_container().context(FindMeta)?; + let query = QueryPlan { + df_plan: select_df_plan, + table_name: select_table_name, + tables: Arc::new(tables), + }; + Ok(InsertSource::Select { + query, + column_index_in_insert, + }) } _ => InsertSourceBodyNotSet.fail(), } @@ -1409,7 +1442,6 @@ pub fn get_table_ref(table_name: &str) -> TableReference { #[cfg(test)] pub mod tests { - use datafusion::{ common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, datasource::source_as_provider, @@ -1809,114 +1841,116 @@ pub mod tests { ], }, }, - rows: RowGroup { - schema: Schema { - timestamp_index: 1, - tsid_index: None, - column_schemas: ColumnSchemas { - columns: [ - ColumnSchema { - id: 1, - name: "key1", - data_type: Varbinary, - is_nullable: false, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "key1", - default_value: None, - }, - ColumnSchema { - id: 2, - name: "key2", - data_type: Timestamp, - is_nullable: false, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "key2", - default_value: None, - }, - ColumnSchema { - id: 3, - name: "field1", - data_type: Double, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field1", - default_value: None, - }, - ColumnSchema { - id: 4, - name: "field2", - data_type: String, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field2", - default_value: None, - }, - ColumnSchema { - id: 5, - name: "field3", - data_type: Date, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field3", - default_value: None, - }, - ColumnSchema { - id: 6, - name: "field4", - data_type: Time, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field4", - default_value: None, - }, + source: Values { + row_group: RowGroup { + schema: Schema { + timestamp_index: 1, + tsid_index: None, + column_schemas: ColumnSchemas { + columns: [ + ColumnSchema { + id: 1, + name: "key1", + data_type: Varbinary, + is_nullable: false, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key1", + default_value: None, + }, + ColumnSchema { + id: 2, + name: "key2", + data_type: Timestamp, + is_nullable: false, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key2", + default_value: None, + }, + ColumnSchema { + id: 3, + name: "field1", + data_type: Double, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field1", + default_value: None, + }, + ColumnSchema { + id: 4, + name: "field2", + data_type: String, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field2", + default_value: None, + }, + ColumnSchema { + id: 5, + name: "field3", + data_type: Date, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field3", + default_value: None, + }, + ColumnSchema { + id: 6, + name: "field4", + data_type: Time, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field4", + default_value: None, + }, + ], + }, + version: 1, + primary_key_indexes: [ + 0, + 1, ], }, - version: 1, - primary_key_indexes: [ - 0, - 1, - ], - }, - rows: [ - Row { - cols: [ - Varbinary( - b"tagk", - ), - Timestamp( + rows: [ + Row { + cols: [ + Varbinary( + b"tagk", + ), Timestamp( - 1638428434000, + Timestamp( + 1638428434000, + ), ), - ), - Double( - 100.0, - ), - String( - StringBytes( - b"hello3", + Double( + 100.0, ), - ), - Date( - 19275, - ), - Time( - 43200456000000, - ), - ], - }, - ], + String( + StringBytes( + b"hello3", + ), + ), + Date( + 19275, + ), + Time( + 43200456000000, + ), + ], + }, + ], + }, }, default_value_map: {}, }, From c5825cc716eca1fc66db8c4faca9a7549a3f119e Mon Sep 17 00:00:00 2001 From: MianChen <283559115@qq.com> Date: Thu, 18 Jul 2024 01:40:30 -0500 Subject: [PATCH 3/3] refactor: insert select to stream mode (#1544) ## Rationale Close #1542 ## Detailed Changes Do select and insert procedure in stream way. ## Test Plan CI test. --------- Co-authored-by: jiacai2050 --- .../env/local/dml/insert_into_select.result | 10 +- .../env/local/dml/insert_into_select.sql | 4 +- src/interpreters/Cargo.toml | 1 + src/interpreters/src/insert.rs | 170 ++++++++++++++---- 4 files changed, 143 insertions(+), 42 deletions(-) diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result b/integration_tests/cases/env/local/dml/insert_into_select.result index 77b648a269..93fc82567c 100644 --- a/integration_tests/cases/env/local/dml/insert_into_select.result +++ b/integration_tests/cases/env/local/dml/insert_into_select.result @@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) VALUES (1, 100, "s1"), (2, 200, "s2"), - (3, 300, "s3"); + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); -affected_rows: 3 +affected_rows: 5 DROP TABLE IF EXISTS `insert_into_select_table2`; @@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) SELECT `timestamp`, `value` FROM `insert_into_select_table1`; -affected_rows: 3 +affected_rows: 5 SELECT `timestamp`, `value`, `name` FROM `insert_into_select_table2`; @@ -67,6 +69,8 @@ timestamp,value,name, Timestamp(1),Int32(100),String(""), Timestamp(2),Int32(200),String(""), Timestamp(3),Int32(300),String(""), +Timestamp(4),Int32(400),String(""), +Timestamp(5),Int32(500),String(""), DROP TABLE `insert_into_select_table1`; diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql b/integration_tests/cases/env/local/dml/insert_into_select.sql index dbe157609d..1a0d4a1da0 100644 --- a/integration_tests/cases/env/local/dml/insert_into_select.sql +++ b/integration_tests/cases/env/local/dml/insert_into_select.sql @@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) VALUES (1, 100, "s1"), (2, 200, "s2"), - (3, 300, "s3"); + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); DROP TABLE IF EXISTS `insert_into_select_table2`; diff --git a/src/interpreters/Cargo.toml b/src/interpreters/Cargo.toml index 6f1513fb9b..d237d5b870 100644 --- a/src/interpreters/Cargo.toml +++ b/src/interpreters/Cargo.toml @@ -54,6 +54,7 @@ regex = { workspace = true } runtime = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } +tokio = { workspace = true } [dev-dependencies] analytic_engine = { workspace = true, features = ["test"] } diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index d6307bf125..5d9e254f1b 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -30,6 +30,7 @@ use common_types::{ column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, + record_batch::RecordBatch as CommonRecordBatch, row::{Row, RowBuilder, RowGroup}, schema::Schema, }; @@ -54,12 +55,15 @@ use query_frontend::{ }; use runtime::Priority; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use table_engine::table::{TableRef, WriteRequest}; +use table_engine::{ + stream::SendableRecordBatchStream, + table::{TableRef, WriteRequest}, +}; +use tokio::sync::mpsc; use crate::{ context::Context, interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, - RecordBatchVec, }; #[derive(Debug, Snafu)] @@ -115,10 +119,23 @@ pub enum Error { #[snafu(display("Record columns not enough, len:{}, index:{}", len, index))] RecordColumnsNotEnough { len: usize, index: usize }, + + #[snafu(display("Failed to do select, err:{}", source))] + Select { source: table_engine::stream::Error }, + + #[snafu(display("Failed to send msg in channel, err:{}", msg))] + MsgChannel { msg: String }, + + #[snafu(display("Failed to join async task, err:{}", msg))] + AsyncTask { msg: String }, } define_result!(Error); +// TODO: make those configurable +const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000; +const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3; + pub struct InsertInterpreter { ctx: Context, plan: InsertPlan, @@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter { default_value_map, } = self.plan; - let mut rows = match source { - InsertSource::Values { row_group } => row_group, + match source { + InsertSource::Values { row_group: rows } => { + let num_rows = + prepare_and_write_table(table.clone(), rows, &default_value_map).await?; + + Ok(Output::AffectedRows(num_rows)) + } InsertSource::Select { query: query_plan, column_index_in_insert, } => { - // TODO: support streaming insert - let record_batches = exec_select_logical_plan( + let mut record_batches_stream = exec_select_logical_plan( self.ctx, query_plan, self.executor, @@ -168,30 +189,112 @@ impl Interpreter for InsertInterpreter { .await .context(Insert)?; - if record_batches.is_empty() { - return Ok(Output::AffectedRows(0)); - } + let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM); + let producer = tokio::spawn(async move { + while let Some(record_batch) = record_batches_stream + .try_next() + .await + .context(Select) + .context(Insert)? + { + if record_batch.is_empty() { + continue; + } + if let Err(e) = tx.send(record_batch).await { + return Err(Error::MsgChannel { + msg: format!("{}", e), + }) + .context(Insert)?; + } + } + Ok(()) + }); + + let consumer = tokio::spawn(async move { + let mut rx = rx; + let mut result_rows = 0; + let mut pending_rows = 0; + let mut record_batches = Vec::new(); + while let Some(record_batch) = rx.recv().await { + pending_rows += record_batch.num_rows(); + record_batches.push(record_batch); + if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM { + pending_rows = 0; + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table.clone(), + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + } - convert_records_to_row_group(record_batches, column_index_in_insert, table.schema()) - .context(Insert)? + if !record_batches.is_empty() { + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table, + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + Ok(result_rows) + }); + + match tokio::try_join!(producer, consumer) { + Ok((select_res, write_rows)) => { + select_res?; + Ok(Output::AffectedRows(write_rows?)) + } + Err(e) => Err(Error::AsyncTask { + msg: format!("{}", e), + }) + .context(Insert)?, + } } - }; + } + } +} - maybe_generate_tsid(&mut rows).context(Insert)?; +async fn write_record_batches( + record_batches: &mut Vec, + column_index_in_insert: &[InsertMode], + table: TableRef, + default_value_map: &BTreeMap, +) -> InterpreterResult { + let row_group = convert_records_to_row_group( + record_batches.as_slice(), + column_index_in_insert, + table.schema(), + ) + .context(Insert)?; + record_batches.clear(); + + prepare_and_write_table(table, row_group, default_value_map).await +} - // Fill default values - fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?; +async fn prepare_and_write_table( + table: TableRef, + mut row_group: RowGroup, + default_value_map: &BTreeMap, +) -> InterpreterResult { + maybe_generate_tsid(&mut row_group).context(Insert)?; - let request = WriteRequest { row_group: rows }; + // Fill default values + fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?; - let num_rows = table - .write(request) - .await - .context(WriteTable) - .context(Insert)?; + let request = WriteRequest { row_group }; - Ok(Output::AffectedRows(num_rows)) - } + let num_rows = table + .write(request) + .await + .context(WriteTable) + .context(Insert)?; + + Ok(num_rows) } async fn exec_select_logical_plan( @@ -199,7 +302,7 @@ async fn exec_select_logical_plan( query_plan: QueryPlan, executor: ExecutorRef, physical_planner: PhysicalPlannerRef, -) -> Result { +) -> Result { let priority = Priority::High; let query_ctx = ctx @@ -216,7 +319,7 @@ async fn exec_select_logical_plan( })?; // Execute select physical plan. - let record_batch_stream = executor + let record_batch_stream: SendableRecordBatchStream = executor .execute(&query_ctx, physical_plan) .await .box_err() @@ -224,26 +327,17 @@ async fn exec_select_logical_plan( msg: "failed to execute select physical plan", })?; - let record_batches = - record_batch_stream - .try_collect() - .await - .box_err() - .context(ExecuteSelectPlan { - msg: "failed to collect select execution results", - })?; - - Ok(record_batches) + Ok(record_batch_stream) } fn convert_records_to_row_group( - record_batches: RecordBatchVec, - column_index_in_insert: Vec, + record_batches: &[CommonRecordBatch], + column_index_in_insert: &[InsertMode], schema: Schema, ) -> Result { let mut data_rows: Vec = Vec::new(); - for record in &record_batches { + for record in record_batches { let num_cols = record.num_columns(); let num_rows = record.num_rows(); for row_idx in 0..num_rows {