diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result b/integration_tests/cases/env/local/dml/insert_into_select.result new file mode 100644 index 0000000000..93fc82567c --- /dev/null +++ b/integration_tests/cases/env/local/dml/insert_into_select.result @@ -0,0 +1,83 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- +DROP TABLE IF EXISTS `insert_into_select_table1`; + +affected_rows: 0 + +CREATE TABLE `insert_into_select_table1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +affected_rows: 0 + +INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) +VALUES + (1, 100, "s1"), + (2, 200, "s2"), + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); + +affected_rows: 5 + +DROP TABLE IF EXISTS `insert_into_select_table2`; + +affected_rows: 0 + +CREATE TABLE `insert_into_select_table2` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string NULL, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +affected_rows: 0 + +INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) +SELECT `timestamp`, `value` +FROM `insert_into_select_table1`; + +affected_rows: 5 + +SELECT `timestamp`, `value`, `name` +FROM `insert_into_select_table2`; + +timestamp,value,name, +Timestamp(1),Int32(100),String(""), +Timestamp(2),Int32(200),String(""), +Timestamp(3),Int32(300),String(""), +Timestamp(4),Int32(400),String(""), +Timestamp(5),Int32(500),String(""), + + +DROP TABLE `insert_into_select_table1`; + +affected_rows: 0 + +DROP TABLE `insert_into_select_table2`; + +affected_rows: 0 + diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql b/integration_tests/cases/env/local/dml/insert_into_select.sql new file mode 100644 index 0000000000..1a0d4a1da0 --- /dev/null +++ b/integration_tests/cases/env/local/dml/insert_into_select.sql @@ -0,0 +1,59 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +DROP TABLE IF EXISTS `insert_into_select_table1`; + +CREATE TABLE `insert_into_select_table1` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) +VALUES + (1, 100, "s1"), + (2, 200, "s2"), + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); + +DROP TABLE IF EXISTS `insert_into_select_table2`; + +CREATE TABLE `insert_into_select_table2` ( + `timestamp` timestamp NOT NULL, + `value` int, + `name` string NULL, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false' +); + +INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) +SELECT `timestamp`, `value` +FROM `insert_into_select_table1`; + +SELECT `timestamp`, `value`, `name` +FROM `insert_into_select_table2`; + +DROP TABLE `insert_into_select_table1`; + +DROP TABLE `insert_into_select_table2`; diff --git a/src/components/object_store/src/disk_cache.rs b/src/components/object_store/src/disk_cache.rs index 981e6d0648..b0c7ba13ac 100644 --- a/src/components/object_store/src/disk_cache.rs +++ b/src/components/object_store/src/disk_cache.rs @@ -296,7 +296,7 @@ impl DiskCache { Ok(Self { root_dir, - meta_cache: Arc::new(PartitionedMutex::try_new( + meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, SeaHasherBuilder {}, @@ -545,7 +545,7 @@ impl DiskCacheStore { assert!(cap_per_part > 0); Ok(LruCache::new(cap_per_part)) }; - let meta_cache = PartitionedMutex::try_new( + let meta_cache = PartitionedMutex::try_new_with_bit_len( init_size_lru, FILE_SIZE_CACHE_PARTITION_BITS, SeaHasherBuilder, diff --git a/src/components/object_store/src/mem_cache.rs b/src/components/object_store/src/mem_cache.rs index 001be2ab8a..f602eee66e 100644 --- a/src/components/object_store/src/mem_cache.rs +++ b/src/components/object_store/src/mem_cache.rs @@ -81,7 +81,7 @@ impl MemCache { )) }; - let inner = PartitionedMutex::try_new( + let inner = PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, build_fixed_seed_ahasher_builder(), diff --git a/src/components/partitioned_lock/src/lib.rs b/src/components/partitioned_lock/src/lib.rs index de7ba3454e..22273b9709 100644 --- a/src/components/partitioned_lock/src/lib.rs +++ b/src/components/partitioned_lock/src/lib.rs @@ -36,20 +36,30 @@ impl PartitionedRwLock where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (1..partition_num) - .map(|_| init_fn(partition_num).map(RwLock::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) } pub fn read(&self, key: &K) -> RwLockReadGuard<'_, T> { @@ -68,6 +78,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(RwLock::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &RwLock { &self.partitions[index] @@ -89,20 +115,30 @@ impl PartitionedMutex where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (0..partition_num) - .map(|_| init_fn(partition_num).map(Mutex::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) } pub fn lock(&self, key: &K) -> MutexGuard<'_, T> { @@ -115,6 +151,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(Mutex::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &Mutex { &self.partitions[index] @@ -140,11 +192,43 @@ impl PartitionedMutexAsync where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = 1 << partition_bit_len; + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { + let mutex = self.get_partition(key); + + mutex.lock().await + } + + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; let partitions = (0..partition_num) .map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new)) .collect::>, E>>()?; @@ -156,12 +240,6 @@ where }) } - pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { - let mutex = self.get_partition(key); - - mutex.lock().await - } - fn get_partition(&self, key: &K) -> &tokio::sync::Mutex { &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } @@ -181,11 +259,66 @@ mod tests { use super::*; + #[test] + fn test_new_equivalence() { + let init_42 = |_: usize| Ok::<_, ()>(42); + + let test_rwlock_42_bit_len = + PartitionedRwLock::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_rwlock_42_suggest_cap = PartitionedRwLock::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_42_bit_len = + PartitionedMutex::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_mutex_42_suggest_cap = PartitionedMutex::try_new_with_suggest_cap( + init_42, + 16, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_async_42_bit_len = PartitionedMutexAsync::try_new_with_bit_len( + init_42, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + let test_mutex_async_42_suggest_cap = PartitionedMutexAsync::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + assert_eq!( + test_rwlock_42_bit_len.partition_mask, + test_rwlock_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_42_bit_len.partition_mask, + test_mutex_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_async_42_bit_len.partition_mask, + test_mutex_async_42_suggest_cap.partition_mask + ); + } + #[test] fn test_partitioned_rwlock() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -203,8 +336,12 @@ mod tests { #[test] fn test_partitioned_mutex() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedMutex::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedMutex::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -223,7 +360,7 @@ mod tests { async fn test_partitioned_mutex_async() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_hmap, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4, SeaHasherBuilder).unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -242,7 +379,8 @@ mod tests { fn test_partitioned_mutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutex::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + PartitionedMutex::try_new_with_bit_len(init_vec, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp_data = mutex_first.lock().unwrap(); @@ -256,8 +394,12 @@ mod tests { #[test] fn test_partitioned_rwmutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_vec, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp = mutex_first.write().unwrap(); assert!(mutex_first.try_write().is_err()); @@ -271,7 +413,7 @@ mod tests { async fn test_partitioned_mutex_async_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_vec, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4, SeaHasherBuilder).unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0).await; let mut _tmp_data = mutex_first.lock().await; diff --git a/src/interpreters/Cargo.toml b/src/interpreters/Cargo.toml index 6f1513fb9b..d237d5b870 100644 --- a/src/interpreters/Cargo.toml +++ b/src/interpreters/Cargo.toml @@ -54,6 +54,7 @@ regex = { workspace = true } runtime = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } +tokio = { workspace = true } [dev-dependencies] analytic_engine = { workspace = true, features = ["test"] } diff --git a/src/interpreters/src/factory.rs b/src/interpreters/src/factory.rs index 1b10661c60..a47a86b259 100644 --- a/src/interpreters/src/factory.rs +++ b/src/interpreters/src/factory.rs @@ -82,7 +82,9 @@ impl Factory { self.physical_planner, self.query_runtime, ), - Plan::Insert(p) => InsertInterpreter::create(ctx, p), + Plan::Insert(p) => { + InsertInterpreter::create(ctx, p, self.query_executor, self.physical_planner) + } Plan::Create(p) => { CreateInterpreter::create(ctx, p, self.table_engine, self.table_manipulator) } diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index cc455b3fb6..5d9e254f1b 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -30,7 +30,9 @@ use common_types::{ column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, - row::RowGroup, + record_batch::RecordBatch as CommonRecordBatch, + row::{Row, RowBuilder, RowGroup}, + schema::Schema, }; use datafusion::{ common::ToDFSchema, @@ -42,11 +44,22 @@ use datafusion::{ }, }; use df_operator::visitor::find_columns_by_expr; +use futures::TryStreamExt; +use generic_error::{BoxError, GenericError}; use hash_ext::hash64; use macros::define_result; -use query_frontend::plan::InsertPlan; -use snafu::{OptionExt, ResultExt, Snafu}; -use table_engine::table::{TableRef, WriteRequest}; +use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; +use query_frontend::{ + plan::{InsertPlan, InsertSource, QueryPlan}, + planner::InsertMode, +}; +use runtime::Priority; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use table_engine::{ + stream::SendableRecordBatchStream, + table::{TableRef, WriteRequest}, +}; +use tokio::sync::mpsc; use crate::{ context::Context, @@ -94,18 +107,55 @@ pub enum Error { BuildColumnBlock { source: common_types::column_block::Error, }, + + #[snafu(display("Failed to create query context, err:{}", source))] + CreateQueryContext { source: crate::context::Error }, + + #[snafu(display("Failed to execute select physical plan, msg:{}, err:{}", msg, source))] + ExecuteSelectPlan { msg: String, source: GenericError }, + + #[snafu(display("Failed to build row, err:{}", source))] + BuildRow { source: common_types::row::Error }, + + #[snafu(display("Record columns not enough, len:{}, index:{}", len, index))] + RecordColumnsNotEnough { len: usize, index: usize }, + + #[snafu(display("Failed to do select, err:{}", source))] + Select { source: table_engine::stream::Error }, + + #[snafu(display("Failed to send msg in channel, err:{}", msg))] + MsgChannel { msg: String }, + + #[snafu(display("Failed to join async task, err:{}", msg))] + AsyncTask { msg: String }, } define_result!(Error); +// TODO: make those configurable +const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000; +const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3; + pub struct InsertInterpreter { ctx: Context, plan: InsertPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, } impl InsertInterpreter { - pub fn create(ctx: Context, plan: InsertPlan) -> InterpreterPtr { - Box::new(Self { ctx, plan }) + pub fn create( + ctx: Context, + plan: InsertPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, + ) -> InterpreterPtr { + Box::new(Self { + ctx, + plan, + executor, + physical_planner, + }) } } @@ -113,67 +163,253 @@ impl InsertInterpreter { impl Interpreter for InsertInterpreter { async fn execute(mut self: Box) -> InterpreterResult { // Generate tsid if needed. - self.maybe_generate_tsid().context(Insert)?; let InsertPlan { table, - mut rows, + source, default_value_map, } = self.plan; - // Fill default values - fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?; - - // Context is unused now - let _ctx = self.ctx; + match source { + InsertSource::Values { row_group: rows } => { + let num_rows = + prepare_and_write_table(table.clone(), rows, &default_value_map).await?; - let request = WriteRequest { row_group: rows }; + Ok(Output::AffectedRows(num_rows)) + } + InsertSource::Select { + query: query_plan, + column_index_in_insert, + } => { + let mut record_batches_stream = exec_select_logical_plan( + self.ctx, + query_plan, + self.executor, + self.physical_planner, + ) + .await + .context(Insert)?; + + let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM); + let producer = tokio::spawn(async move { + while let Some(record_batch) = record_batches_stream + .try_next() + .await + .context(Select) + .context(Insert)? + { + if record_batch.is_empty() { + continue; + } + if let Err(e) = tx.send(record_batch).await { + return Err(Error::MsgChannel { + msg: format!("{}", e), + }) + .context(Insert)?; + } + } + Ok(()) + }); + + let consumer = tokio::spawn(async move { + let mut rx = rx; + let mut result_rows = 0; + let mut pending_rows = 0; + let mut record_batches = Vec::new(); + while let Some(record_batch) = rx.recv().await { + pending_rows += record_batch.num_rows(); + record_batches.push(record_batch); + if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM { + pending_rows = 0; + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table.clone(), + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + } - let num_rows = table - .write(request) - .await - .context(WriteTable) - .context(Insert)?; + if !record_batches.is_empty() { + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table, + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + Ok(result_rows) + }); - Ok(Output::AffectedRows(num_rows)) + match tokio::try_join!(producer, consumer) { + Ok((select_res, write_rows)) => { + select_res?; + Ok(Output::AffectedRows(write_rows?)) + } + Err(e) => Err(Error::AsyncTask { + msg: format!("{}", e), + }) + .context(Insert)?, + } + } + } } } -impl InsertInterpreter { - fn maybe_generate_tsid(&mut self) -> Result<()> { - let schema = self.plan.rows.schema(); - let tsid_idx = schema.index_of_tsid(); - - if let Some(idx) = tsid_idx { - // Vec of (`index of tag`, `column id of tag`). - let tag_idx_column_ids: Vec<_> = schema - .columns() - .iter() - .enumerate() - .filter_map(|(i, column)| { - if column.is_tag { - Some((i, column.id)) - } else { - None - } - }) - .collect(); +async fn write_record_batches( + record_batches: &mut Vec, + column_index_in_insert: &[InsertMode], + table: TableRef, + default_value_map: &BTreeMap, +) -> InterpreterResult { + let row_group = convert_records_to_row_group( + record_batches.as_slice(), + column_index_in_insert, + table.schema(), + ) + .context(Insert)?; + record_batches.clear(); + + prepare_and_write_table(table, row_group, default_value_map).await +} + +async fn prepare_and_write_table( + table: TableRef, + mut row_group: RowGroup, + default_value_map: &BTreeMap, +) -> InterpreterResult { + maybe_generate_tsid(&mut row_group).context(Insert)?; - let mut hash_bytes = Vec::new(); - for i in 0..self.plan.rows.num_rows() { - let row = self.plan.rows.get_row_mut(i).unwrap(); + // Fill default values + fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?; - let mut tsid_builder = TsidBuilder::new(&mut hash_bytes); + let request = WriteRequest { row_group }; - for (idx, column_id) in &tag_idx_column_ids { - tsid_builder.maybe_write_datum(*column_id, &row[*idx])?; + let num_rows = table + .write(request) + .await + .context(WriteTable) + .context(Insert)?; + + Ok(num_rows) +} + +async fn exec_select_logical_plan( + ctx: Context, + query_plan: QueryPlan, + executor: ExecutorRef, + physical_planner: PhysicalPlannerRef, +) -> Result { + let priority = Priority::High; + + let query_ctx = ctx + .new_query_context(priority) + .context(CreateQueryContext)?; + + // Create select physical plan. + let physical_plan = physical_planner + .plan(&query_ctx, query_plan) + .await + .box_err() + .context(ExecuteSelectPlan { + msg: "failed to build select physical plan", + })?; + + // Execute select physical plan. + let record_batch_stream: SendableRecordBatchStream = executor + .execute(&query_ctx, physical_plan) + .await + .box_err() + .context(ExecuteSelectPlan { + msg: "failed to execute select physical plan", + })?; + + Ok(record_batch_stream) +} + +fn convert_records_to_row_group( + record_batches: &[CommonRecordBatch], + column_index_in_insert: &[InsertMode], + schema: Schema, +) -> Result { + let mut data_rows: Vec = Vec::new(); + + for record in record_batches { + let num_cols = record.num_columns(); + let num_rows = record.num_rows(); + for row_idx in 0..num_rows { + let mut row_builder = RowBuilder::new(&schema); + // For each column in schema, append datum into row builder + for (index_opt, column_schema) in column_index_in_insert.iter().zip(schema.columns()) { + match index_opt { + InsertMode::Direct(index) => { + ensure!( + *index < num_cols, + RecordColumnsNotEnough { + len: num_cols, + index: *index + } + ); + let datum = record.column(*index).datum(row_idx); + row_builder = row_builder.append_datum(datum).context(BuildRow)?; + } + InsertMode::Null => { + // This is a null column + row_builder = row_builder.append_datum(Datum::Null).context(BuildRow)?; + } + InsertMode::Auto => { + // This is an auto generated column, fill by default value. + let kind = &column_schema.data_type; + row_builder = row_builder + .append_datum(Datum::empty(kind)) + .context(BuildRow)?; + } } + } + let row = row_builder.finish().context(BuildRow)?; + data_rows.push(row); + } + } + RowGroup::try_new(schema, data_rows).context(BuildRow) +} + +fn maybe_generate_tsid(rows: &mut RowGroup) -> Result<()> { + let schema = rows.schema(); + let tsid_idx = schema.index_of_tsid(); - let tsid = tsid_builder.finish(); - row[idx] = Datum::UInt64(tsid); + if let Some(idx) = tsid_idx { + // Vec of (`index of tag`, `column id of tag`). + let tag_idx_column_ids: Vec<_> = schema + .columns() + .iter() + .enumerate() + .filter_map(|(i, column)| { + if column.is_tag { + Some((i, column.id)) + } else { + None + } + }) + .collect(); + + let mut hash_bytes = Vec::new(); + for i in 0..rows.num_rows() { + let row = rows.get_row_mut(i).unwrap(); + + let mut tsid_builder = TsidBuilder::new(&mut hash_bytes); + + for (idx, column_id) in &tag_idx_column_ids { + tsid_builder.maybe_write_datum(*column_id, &row[*idx])?; } + + let tsid = tsid_builder.finish(); + row[idx] = Datum::UInt64(tsid); } - Ok(()) } + Ok(()) } struct TsidBuilder<'a> { diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs index fd8e54bd5b..0fcd082a1c 100644 --- a/src/proxy/src/write.rs +++ b/src/proxy/src/write.rs @@ -44,7 +44,7 @@ use interpreters::interpreter::Output; use logger::{debug, error, info, warn}; use query_frontend::{ frontend::{Context as FrontendContext, Frontend}, - plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan}, + plan::{AlterTableOperation, AlterTablePlan, InsertPlan, InsertSource, Plan}, planner::{build_column_schema, try_get_data_type_from_value}, provider::CatalogMetaProvider, }; @@ -861,7 +861,7 @@ fn write_table_request_to_insert_plan( })?; Ok(InsertPlan { table, - rows: row_group, + source: InsertSource::Values { row_group }, default_value_map: BTreeMap::new(), }) } diff --git a/src/query_frontend/src/plan.rs b/src/query_frontend/src/plan.rs index e5db6238eb..400d01e99c 100644 --- a/src/query_frontend/src/plan.rs +++ b/src/query_frontend/src/plan.rs @@ -39,7 +39,11 @@ use runtime::Priority; use snafu::{OptionExt, Snafu}; use table_engine::{partition::PartitionInfo, table::TableRef}; -use crate::{ast::ShowCreateObject, container::TableContainer, planner::get_table_ref}; +use crate::{ + ast::ShowCreateObject, + container::TableContainer, + planner::{get_table_ref, InsertMode}, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -305,13 +309,24 @@ pub struct DropTablePlan { pub partition_info: Option, } +#[derive(Debug)] +pub enum InsertSource { + Values { + row_group: RowGroup, + }, + Select { + column_index_in_insert: Vec, + query: QueryPlan, + }, +} + /// Insert logical plan #[derive(Debug)] pub struct InsertPlan { /// The table to insert pub table: TableRef, - /// RowGroup to insert - pub rows: RowGroup, + /// Insert source(could be value literals or select query) + pub source: InsertSource, /// Column indexes in schema to its default-value-expr which is used to fill /// values pub default_value_map: BTreeMap, diff --git a/src/query_frontend/src/planner.rs b/src/query_frontend/src/planner.rs index b5d0fb5003..cec4f75e8a 100644 --- a/src/query_frontend/src/planner.rs +++ b/src/query_frontend/src/planner.rs @@ -80,12 +80,13 @@ use crate::{ partition::PartitionParser, plan::{ AlterTableOperation, AlterTablePlan, CreateTablePlan, DescribeTablePlan, DropTablePlan, - ExistsTablePlan, InsertPlan, Plan, QueryPlan, QueryType, ShowCreatePlan, ShowPlan, - ShowTablesPlan, + ExistsTablePlan, InsertPlan, InsertSource, Plan, QueryPlan, QueryType, ShowCreatePlan, + ShowPlan, ShowTablesPlan, }, promql::{remote_query_to_plan, ColumnNames, Expr as PromExpr, RemoteQueryPlan}, provider::{ContextProviderAdapter, MetaProvider}, }; + // We do not carry backtrace in sql error because it is mainly used in server // handler and the error is usually caused by invalid/unsupported sql, which // should be easy to find out the reason. @@ -905,7 +906,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { } // REQUIRE: SqlStatement must be INSERT stmt - fn insert_to_plan(&self, sql_stmt: SqlStatement) -> Result { + fn insert_to_plan(self, sql_stmt: SqlStatement) -> Result { match sql_stmt { SqlStatement::Insert { table_name, @@ -996,11 +997,16 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { } } - let rows = build_row_group(schema, source, column_index_in_insert)?; + let source = build_insert_source( + schema, + source, + column_index_in_insert, + self.meta_provider, + )?; Ok(Plan::Insert(InsertPlan { table, - rows, + source, default_value_map, })) } @@ -1105,7 +1111,7 @@ fn normalize_func_name(sql_stmt: &mut SqlStatement) { } #[derive(Debug)] -enum InsertMode { +pub enum InsertMode { // Insert the value in expr with given index directly. Direct(usize), // No value provided, insert a null. @@ -1154,12 +1160,13 @@ fn parse_data_value_from_expr(data_type: DatumKind, expr: &mut Expr) -> Result( schema: Schema, source: Box, column_index_in_insert: Vec, -) -> Result { + meta_provider: ContextProviderAdapter

, +) -> Result { // Build row group by schema match *source.body { SetExpr::Values(Values { @@ -1207,7 +1214,33 @@ fn build_row_group( } // Build the whole row group - Ok(RowGroup::new_unchecked(schema, rows)) + Ok(InsertSource::Values { + row_group: RowGroup::new_unchecked(schema, rows), + }) + } + SetExpr::Select(..) => { + let mut select_stmt = SqlStatement::Query(source); + normalize_func_name(&mut select_stmt); + + let df_planner = SqlToRel::new_with_options(&meta_provider, DEFAULT_PARSER_OPTS); + let select_table_name = parse_table_name_with_standard(&select_stmt); + + let select_df_plan = df_planner + .sql_statement_to_plan(select_stmt) + .context(DatafusionPlan)?; + let select_df_plan = optimize_plan(&select_df_plan).context(DatafusionPlan)?; + + // Get all tables needed in the plan + let tables = meta_provider.try_into_container().context(FindMeta)?; + let query = QueryPlan { + df_plan: select_df_plan, + table_name: select_table_name, + tables: Arc::new(tables), + }; + Ok(InsertSource::Select { + query, + column_index_in_insert, + }) } _ => InsertSourceBodyNotSet.fail(), } @@ -1409,7 +1442,6 @@ pub fn get_table_ref(table_name: &str) -> TableReference { #[cfg(test)] pub mod tests { - use datafusion::{ common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, datasource::source_as_provider, @@ -1809,114 +1841,116 @@ pub mod tests { ], }, }, - rows: RowGroup { - schema: Schema { - timestamp_index: 1, - tsid_index: None, - column_schemas: ColumnSchemas { - columns: [ - ColumnSchema { - id: 1, - name: "key1", - data_type: Varbinary, - is_nullable: false, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "key1", - default_value: None, - }, - ColumnSchema { - id: 2, - name: "key2", - data_type: Timestamp, - is_nullable: false, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "key2", - default_value: None, - }, - ColumnSchema { - id: 3, - name: "field1", - data_type: Double, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field1", - default_value: None, - }, - ColumnSchema { - id: 4, - name: "field2", - data_type: String, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field2", - default_value: None, - }, - ColumnSchema { - id: 5, - name: "field3", - data_type: Date, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field3", - default_value: None, - }, - ColumnSchema { - id: 6, - name: "field4", - data_type: Time, - is_nullable: true, - is_tag: false, - is_dictionary: false, - comment: "", - escaped_name: "field4", - default_value: None, - }, + source: Values { + row_group: RowGroup { + schema: Schema { + timestamp_index: 1, + tsid_index: None, + column_schemas: ColumnSchemas { + columns: [ + ColumnSchema { + id: 1, + name: "key1", + data_type: Varbinary, + is_nullable: false, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key1", + default_value: None, + }, + ColumnSchema { + id: 2, + name: "key2", + data_type: Timestamp, + is_nullable: false, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "key2", + default_value: None, + }, + ColumnSchema { + id: 3, + name: "field1", + data_type: Double, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field1", + default_value: None, + }, + ColumnSchema { + id: 4, + name: "field2", + data_type: String, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field2", + default_value: None, + }, + ColumnSchema { + id: 5, + name: "field3", + data_type: Date, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field3", + default_value: None, + }, + ColumnSchema { + id: 6, + name: "field4", + data_type: Time, + is_nullable: true, + is_tag: false, + is_dictionary: false, + comment: "", + escaped_name: "field4", + default_value: None, + }, + ], + }, + version: 1, + primary_key_indexes: [ + 0, + 1, ], }, - version: 1, - primary_key_indexes: [ - 0, - 1, - ], - }, - rows: [ - Row { - cols: [ - Varbinary( - b"tagk", - ), - Timestamp( + rows: [ + Row { + cols: [ + Varbinary( + b"tagk", + ), Timestamp( - 1638428434000, + Timestamp( + 1638428434000, + ), ), - ), - Double( - 100.0, - ), - String( - StringBytes( - b"hello3", + Double( + 100.0, ), - ), - Date( - 19275, - ), - Time( - 43200456000000, - ), - ], - }, - ], + String( + StringBytes( + b"hello3", + ), + ), + Date( + 19275, + ), + Time( + 43200456000000, + ), + ], + }, + ], + }, }, default_value_map: {}, },