Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into li0k/storage_ttl_selector
  • Loading branch information
Li0k committed Feb 15, 2023
2 parents 453b8b1 + 26f2c65 commit 8c59182
Show file tree
Hide file tree
Showing 68 changed files with 972 additions and 501 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions dashboard/proto/gen/batch_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ message FilterNode {
message InsertNode {
// Id of the table to perform inserting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 5;
repeated uint32 column_indices = 2;
// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
Expand All @@ -88,12 +90,16 @@ message InsertNode {
message DeleteNode {
// Id of the table to perform deleting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 3;
bool returning = 2;
}

message UpdateNode {
// Id of the table to perform updating.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 4;
repeated expr.ExprNode exprs = 2;
bool returning = 3;
}
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ message SortNode {
message DmlNode {
// Id of the table on which DML performs.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 3;
// Column descriptions of the table.
repeated plan_common.ColumnDesc column_descs = 2;
}
Expand Down
17 changes: 13 additions & 4 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::anyhow;
use futures::future::try_join_all;
use futures_async_stream::try_stream;
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand All @@ -35,6 +35,7 @@ use crate::task::BatchTaskContext;
pub struct DeleteExecutor {
/// Target table id.
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,
child: BoxedExecutor,
chunk_size: usize,
Expand All @@ -46,6 +47,7 @@ pub struct DeleteExecutor {
impl DeleteExecutor {
pub fn new(
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,
child: BoxedExecutor,
chunk_size: usize,
Expand All @@ -55,6 +57,7 @@ impl DeleteExecutor {
let table_schema = child.schema().clone();
Self {
table_id,
table_version_id,
dml_manager,
child,
chunk_size,
Expand Down Expand Up @@ -98,7 +101,9 @@ impl DeleteExecutor {
let cap = chunk.capacity();
let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);

let notifier = self.dml_manager.write_chunk(&self.table_id, stream_chunk)?;
let notifier =
self.dml_manager
.write_chunk(self.table_id, self.table_version_id, stream_chunk)?;
notifiers.push(notifier);

Ok(())
Expand Down Expand Up @@ -155,6 +160,7 @@ impl BoxedExecutorBuilder for DeleteExecutor {

Ok(Box::new(Self::new(
table_id,
delete_node.table_version_id,
source.context().dml_manager(),
child,
source.context.get_config().developer.batch_chunk_size,
Expand All @@ -171,7 +177,9 @@ mod tests {
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::array::Array;
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::{
schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID,
};
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::dml_manager::DmlManager;

Expand Down Expand Up @@ -210,13 +218,14 @@ mod tests {
// We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
// due to the `Weak` reference in `DmlManager`.
let reader = dml_manager
.register_reader(table_id, &column_descs)
.register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
.unwrap();
let mut reader = reader.stream_reader().into_stream();

// Delete
let delete_executor = Box::new(DeleteExecutor::new(
table_id,
INITIAL_TABLE_VERSION_ID,
dml_manager,
Box::new(mock_executor),
1024,
Expand Down
17 changes: 13 additions & 4 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures_async_stream::try_stream;
use risingwave_common::array::{
ArrayBuilder, DataChunk, I64Array, Op, PrimitiveArrayBuilder, StreamChunk,
};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand All @@ -36,6 +36,7 @@ use crate::task::BatchTaskContext;
pub struct InsertExecutor {
/// Target table id.
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,

child: BoxedExecutor,
Expand All @@ -52,6 +53,7 @@ impl InsertExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
table_version_id: TableVersionId,
dml_manager: DmlManagerRef,
child: BoxedExecutor,
chunk_size: usize,
Expand All @@ -63,6 +65,7 @@ impl InsertExecutor {
let table_schema = child.schema().clone();
Self {
table_id,
table_version_id,
dml_manager,
child,
chunk_size,
Expand Down Expand Up @@ -127,7 +130,9 @@ impl InsertExecutor {
let stream_chunk =
StreamChunk::new(vec![Op::Insert; cap], columns, vis.into_visibility());

let notifier = self.dml_manager.write_chunk(&self.table_id, stream_chunk)?;
let notifier =
self.dml_manager
.write_chunk(self.table_id, self.table_version_id, stream_chunk)?;
notifiers.push(notifier);

Ok(())
Expand Down Expand Up @@ -190,6 +195,7 @@ impl BoxedExecutorBuilder for InsertExecutor {

Ok(Box::new(Self::new(
table_id,
insert_node.table_version_id,
source.context().dml_manager(),
child,
source.context.get_config().developer.batch_chunk_size,
Expand All @@ -212,7 +218,9 @@ mod tests {
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::{
schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID,
};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::dml_manager::DmlManager;
Expand Down Expand Up @@ -274,13 +282,14 @@ mod tests {
// We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
// due to the `Weak` reference in `DmlManager`.
let reader = dml_manager
.register_reader(table_id, &column_descs)
.register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
.unwrap();
let mut reader = reader.stream_reader().into_stream();

// Insert
let insert_executor = Box::new(InsertExecutor::new(
table_id,
INITIAL_TABLE_VERSION_ID,
dml_manager,
Box::new(mock_executor),
1024,
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher
use risingwave_common::row::{repeat_n, RowExt};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

Expand Down Expand Up @@ -1530,7 +1530,7 @@ impl DataChunkMutator {
) -> Self {
let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
for (output_row_id, (output_row_non_null, &build_row_id)) in
filter.iter().zip_eq_debug(build_row_ids.iter()).enumerate()
filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
{
if output_row_non_null {
build_row_matched[build_row_id] = true;
Expand All @@ -1550,7 +1550,7 @@ impl DataChunkMutator {
build_row_ids: &mut Vec<RowId>,
build_row_matched: &mut ChunkedData<bool>,
) {
for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_debug(build_row_ids.iter())
for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
{
if output_row_non_null {
build_row_matched[build_row_id] = true;
Expand Down Expand Up @@ -1607,7 +1607,7 @@ impl DataChunkMutator {
first_output_row_id.clear();

for (output_row_id, (output_row_non_null, &build_row_id)) in
filter.iter().zip_eq_debug(build_row_ids.iter()).enumerate()
filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
{
if output_row_non_null {
build_row_matched[build_row_id] = true;
Expand Down
Loading

0 comments on commit 8c59182

Please sign in to comment.