Skip to content

Commit

Permalink
refactor: insert select to stream mode (apache#1544)
Browse files Browse the repository at this point in the history
## Rationale
Close apache#1542 

## Detailed Changes
Do select and insert procedure in stream way.

## Test Plan
CI test.

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
  • Loading branch information
2 people authored and LeslieKid committed Sep 24, 2024
1 parent 7c14eac commit ba4a923
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 42 deletions.
10 changes: 7 additions & 3 deletions integration_tests/cases/env/local/dml/insert_into_select.result
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3");
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

affected_rows: 3
affected_rows: 5

DROP TABLE IF EXISTS `insert_into_select_table2`;

Expand All @@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
SELECT `timestamp`, `value`
FROM `insert_into_select_table1`;

affected_rows: 3
affected_rows: 5

SELECT `timestamp`, `value`, `name`
FROM `insert_into_select_table2`;
Expand All @@ -67,6 +69,8 @@ timestamp,value,name,
Timestamp(1),Int32(100),String(""),
Timestamp(2),Int32(200),String(""),
Timestamp(3),Int32(300),String(""),
Timestamp(4),Int32(400),String(""),
Timestamp(5),Int32(500),String(""),


DROP TABLE `insert_into_select_table1`;
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/cases/env/local/dml/insert_into_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3");
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

DROP TABLE IF EXISTS `insert_into_select_table2`;

Expand Down
1 change: 1 addition & 0 deletions src/interpreters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ regex = { workspace = true }
runtime = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
analytic_engine = { workspace = true, features = ["test"] }
Expand Down
170 changes: 132 additions & 38 deletions src/interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_types::{
column_block::{ColumnBlock, ColumnBlockBuilder},
column_schema::ColumnId,
datum::Datum,
record_batch::RecordBatch as CommonRecordBatch,
row::{Row, RowBuilder, RowGroup},
schema::Schema,
};
Expand All @@ -54,12 +55,15 @@ use query_frontend::{
};
use runtime::Priority;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use table_engine::table::{TableRef, WriteRequest};
use table_engine::{
stream::SendableRecordBatchStream,
table::{TableRef, WriteRequest},
};
use tokio::sync::mpsc;

use crate::{
context::Context,
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -115,10 +119,23 @@ pub enum Error {

#[snafu(display("Record columns not enough, len:{}, index:{}", len, index))]
RecordColumnsNotEnough { len: usize, index: usize },

#[snafu(display("Failed to do select, err:{}", source))]
Select { source: table_engine::stream::Error },

#[snafu(display("Failed to send msg in channel, err:{}", msg))]
MsgChannel { msg: String },

#[snafu(display("Failed to join async task, err:{}", msg))]
AsyncTask { msg: String },
}

define_result!(Error);

// TODO: make those configurable
const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000;
const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3;

pub struct InsertInterpreter {
ctx: Context,
plan: InsertPlan,
Expand Down Expand Up @@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter {
default_value_map,
} = self.plan;

let mut rows = match source {
InsertSource::Values { row_group } => row_group,
match source {
InsertSource::Values { row_group: rows } => {
let num_rows =
prepare_and_write_table(table.clone(), rows, &default_value_map).await?;

Ok(Output::AffectedRows(num_rows))
}
InsertSource::Select {
query: query_plan,
column_index_in_insert,
} => {
// TODO: support streaming insert
let record_batches = exec_select_logical_plan(
let mut record_batches_stream = exec_select_logical_plan(
self.ctx,
query_plan,
self.executor,
Expand All @@ -168,38 +189,120 @@ impl Interpreter for InsertInterpreter {
.await
.context(Insert)?;

if record_batches.is_empty() {
return Ok(Output::AffectedRows(0));
}
let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM);
let producer = tokio::spawn(async move {
while let Some(record_batch) = record_batches_stream
.try_next()
.await
.context(Select)
.context(Insert)?
{
if record_batch.is_empty() {
continue;
}
if let Err(e) = tx.send(record_batch).await {
return Err(Error::MsgChannel {
msg: format!("{}", e),
})
.context(Insert)?;
}
}
Ok(())
});

let consumer = tokio::spawn(async move {
let mut rx = rx;
let mut result_rows = 0;
let mut pending_rows = 0;
let mut record_batches = Vec::new();
while let Some(record_batch) = rx.recv().await {
pending_rows += record_batch.num_rows();
record_batches.push(record_batch);
if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM {
pending_rows = 0;
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table.clone(),
&default_value_map,
)
.await?;
result_rows += num_rows;
}
}

convert_records_to_row_group(record_batches, column_index_in_insert, table.schema())
.context(Insert)?
if !record_batches.is_empty() {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table,
&default_value_map,
)
.await?;
result_rows += num_rows;
}
Ok(result_rows)
});

match tokio::try_join!(producer, consumer) {
Ok((select_res, write_rows)) => {
select_res?;
Ok(Output::AffectedRows(write_rows?))
}
Err(e) => Err(Error::AsyncTask {
msg: format!("{}", e),
})
.context(Insert)?,
}
}
};
}
}
}

maybe_generate_tsid(&mut rows).context(Insert)?;
async fn write_record_batches(
record_batches: &mut Vec<CommonRecordBatch>,
column_index_in_insert: &[InsertMode],
table: TableRef,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
let row_group = convert_records_to_row_group(
record_batches.as_slice(),
column_index_in_insert,
table.schema(),
)
.context(Insert)?;
record_batches.clear();

prepare_and_write_table(table, row_group, default_value_map).await
}

// Fill default values
fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?;
async fn prepare_and_write_table(
table: TableRef,
mut row_group: RowGroup,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
maybe_generate_tsid(&mut row_group).context(Insert)?;

let request = WriteRequest { row_group: rows };
// Fill default values
fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?;

let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;
let request = WriteRequest { row_group };

Ok(Output::AffectedRows(num_rows))
}
let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;

Ok(num_rows)
}

async fn exec_select_logical_plan(
ctx: Context,
query_plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> Result<RecordBatchVec> {
) -> Result<SendableRecordBatchStream> {
let priority = Priority::High;

let query_ctx = ctx
Expand All @@ -216,34 +319,25 @@ async fn exec_select_logical_plan(
})?;

// Execute select physical plan.
let record_batch_stream = executor
let record_batch_stream: SendableRecordBatchStream = executor
.execute(&query_ctx, physical_plan)
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to execute select physical plan",
})?;

let record_batches =
record_batch_stream
.try_collect()
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to collect select execution results",
})?;

Ok(record_batches)
Ok(record_batch_stream)
}

fn convert_records_to_row_group(
record_batches: RecordBatchVec,
column_index_in_insert: Vec<InsertMode>,
record_batches: &[CommonRecordBatch],
column_index_in_insert: &[InsertMode],
schema: Schema,
) -> Result<RowGroup> {
let mut data_rows: Vec<Row> = Vec::new();

for record in &record_batches {
for record in record_batches {
let num_cols = record.num_columns();
let num_rows = record.num_rows();
for row_idx in 0..num_rows {
Expand Down

0 comments on commit ba4a923

Please sign in to comment.