Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: insert select to stream mode #1544

Merged
merged 7 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions integration_tests/cases/env/local/dml/insert_into_select.result
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3");
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

affected_rows: 3
affected_rows: 5

DROP TABLE IF EXISTS `insert_into_select_table2`;

Expand All @@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
SELECT `timestamp`, `value`
FROM `insert_into_select_table1`;

affected_rows: 3
affected_rows: 5

SELECT `timestamp`, `value`, `name`
FROM `insert_into_select_table2`;
Expand All @@ -67,6 +69,8 @@ timestamp,value,name,
Timestamp(1),Int32(100),String(""),
Timestamp(2),Int32(200),String(""),
Timestamp(3),Int32(300),String(""),
Timestamp(4),Int32(400),String(""),
Timestamp(5),Int32(500),String(""),


DROP TABLE `insert_into_select_table1`;
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/cases/env/local/dml/insert_into_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3");
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

DROP TABLE IF EXISTS `insert_into_select_table2`;

Expand Down
1 change: 1 addition & 0 deletions src/interpreters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ regex = { workspace = true }
runtime = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
analytic_engine = { workspace = true, features = ["test"] }
Expand Down
170 changes: 132 additions & 38 deletions src/interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_types::{
column_block::{ColumnBlock, ColumnBlockBuilder},
column_schema::ColumnId,
datum::Datum,
record_batch::RecordBatch as CommonRecordBatch,
row::{Row, RowBuilder, RowGroup},
schema::Schema,
};
Expand All @@ -54,12 +55,15 @@ use query_frontend::{
};
use runtime::Priority;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use table_engine::table::{TableRef, WriteRequest};
use table_engine::{
stream::SendableRecordBatchStream,
table::{TableRef, WriteRequest},
};
use tokio::sync::mpsc;

use crate::{
context::Context,
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -115,10 +119,23 @@ pub enum Error {

#[snafu(display("Record columns not enough, len:{}, index:{}", len, index))]
RecordColumnsNotEnough { len: usize, index: usize },

#[snafu(display("Failed to do select, err:{}", source))]
Select { source: table_engine::stream::Error },

#[snafu(display("Failed to send msg in channel, err:{}", msg))]
MsgChannel { msg: String },

#[snafu(display("Failed to join async task, err:{}", msg))]
AsyncTask { msg: String },
}

define_result!(Error);

// TODO: make those configurable
const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000;
const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3;

pub struct InsertInterpreter {
ctx: Context,
plan: InsertPlan,
Expand Down Expand Up @@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter {
default_value_map,
} = self.plan;

let mut rows = match source {
InsertSource::Values { row_group } => row_group,
match source {
InsertSource::Values { row_group: rows } => {
let num_rows =
prepare_and_write_table(table.clone(), rows, &default_value_map).await?;

Ok(Output::AffectedRows(num_rows))
}
InsertSource::Select {
query: query_plan,
column_index_in_insert,
} => {
// TODO: support streaming insert
let record_batches = exec_select_logical_plan(
let mut record_batches_stream = exec_select_logical_plan(
self.ctx,
query_plan,
self.executor,
Expand All @@ -168,38 +189,120 @@ impl Interpreter for InsertInterpreter {
.await
.context(Insert)?;

if record_batches.is_empty() {
return Ok(Output::AffectedRows(0));
}
let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM);
let producer = tokio::spawn(async move {
while let Some(record_batch) = record_batches_stream
.try_next()
.await
.context(Select)
.context(Insert)?
{
if record_batch.is_empty() {
continue;
}
if let Err(e) = tx.send(record_batch).await {
return Err(Error::MsgChannel {
msg: format!("{}", e),
})
.context(Insert)?;
}
}
Ok(())
});

let consumer = tokio::spawn(async move {
let mut rx = rx;
let mut result_rows = 0;
let mut pending_rows = 0;
let mut record_batches = Vec::new();
while let Some(record_batch) = rx.recv().await {
pending_rows += record_batch.num_rows();
record_batches.push(record_batch);
if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM {
pending_rows = 0;
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table.clone(),
&default_value_map,
)
.await?;
result_rows += num_rows;
}
}

convert_records_to_row_group(record_batches, column_index_in_insert, table.schema())
.context(Insert)?
if !record_batches.is_empty() {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table,
&default_value_map,
)
.await?;
result_rows += num_rows;
}
Ok(result_rows)
});

match tokio::try_join!(producer, consumer) {
Ok((select_res, write_rows)) => {
select_res?;
Ok(Output::AffectedRows(write_rows?))
}
Err(e) => Err(Error::AsyncTask {
msg: format!("{}", e),
})
.context(Insert)?,
}
}
};
}
}
}

maybe_generate_tsid(&mut rows).context(Insert)?;
async fn write_record_batches(
record_batches: &mut Vec<CommonRecordBatch>,
column_index_in_insert: &[InsertMode],
table: TableRef,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
let row_group = convert_records_to_row_group(
record_batches.as_slice(),
column_index_in_insert,
table.schema(),
)
.context(Insert)?;
record_batches.clear();

prepare_and_write_table(table, row_group, default_value_map).await
}

// Fill default values
fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?;
async fn prepare_and_write_table(
table: TableRef,
mut row_group: RowGroup,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
maybe_generate_tsid(&mut row_group).context(Insert)?;

let request = WriteRequest { row_group: rows };
// Fill default values
fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?;

let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;
let request = WriteRequest { row_group };

Ok(Output::AffectedRows(num_rows))
}
let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;

Ok(num_rows)
}

async fn exec_select_logical_plan(
ctx: Context,
query_plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> Result<RecordBatchVec> {
) -> Result<SendableRecordBatchStream> {
let priority = Priority::High;

let query_ctx = ctx
Expand All @@ -216,34 +319,25 @@ async fn exec_select_logical_plan(
})?;

// Execute select physical plan.
let record_batch_stream = executor
let record_batch_stream: SendableRecordBatchStream = executor
.execute(&query_ctx, physical_plan)
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to execute select physical plan",
})?;

let record_batches =
record_batch_stream
.try_collect()
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to collect select execution results",
})?;

Ok(record_batches)
Ok(record_batch_stream)
}

fn convert_records_to_row_group(
record_batches: RecordBatchVec,
column_index_in_insert: Vec<InsertMode>,
record_batches: &[CommonRecordBatch],
column_index_in_insert: &[InsertMode],
schema: Schema,
) -> Result<RowGroup> {
let mut data_rows: Vec<Row> = Vec::new();

for record in &record_batches {
for record in record_batches {
let num_cols = record.num_columns();
let num_rows = record.num_rows();
for row_idx in 0..num_rows {
Expand Down
Loading