Skip to content

Commit

Permalink
feat: remove the exchange after append_only source (risingwavelabs#8532)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 20, 2023
1 parent 19b1be3 commit 014f6db
Show file tree
Hide file tree
Showing 27 changed files with 295 additions and 223 deletions.
2 changes: 1 addition & 1 deletion e2e_test/batch/aggregate/sum.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ select sum(d) from t;
statement ok
insert into t values (9000000000000000000000000000);

statement error QueryError: Expr error: Numeric out of range
statement error Expr error: Numeric out of range
select sum(d) from t;

statement ok
Expand Down
11 changes: 5 additions & 6 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use std::iter::repeat;
use anyhow::Context;
use futures::future::try_join_all;
use futures_async_stream::try_stream;
use risingwave_common::array::{
ArrayBuilder, DataChunk, I64Array, Op, PrimitiveArrayBuilder, StreamChunk,
};
use risingwave_common::array::serial_array::SerialArray;
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -73,7 +72,7 @@ impl InsertExecutor {
table_schema
} else {
Schema {
fields: vec![Field::unnamed(DataType::Int64)],
fields: vec![Field::unnamed(DataType::Serial)],
}
},
identity,
Expand Down Expand Up @@ -123,7 +122,7 @@ impl InsertExecutor {
// If the user does not specify the primary key, then we need to add a column as the
// primary key.
if let Some(row_id_index) = self.row_id_index {
let row_id_col = I64Array::from_iter(repeat(None).take(cap));
let row_id_col = SerialArray::from_iter(repeat(None).take(cap));
columns.insert(row_id_index, row_id_col.into())
}

Expand Down Expand Up @@ -244,7 +243,7 @@ mod tests {
// Schema of the table
let mut schema = schema_test_utils::ii();
schema.fields.push(struct_field);
schema.fields.push(Field::unnamed(DataType::Int64)); // row_id column
schema.fields.push(Field::unnamed(DataType::Serial)); // row_id column

let row_id_index = Some(3);

Expand Down
28 changes: 14 additions & 14 deletions src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::util::hash_util::Crc32FastBuilder;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::batch_plan::exchange_info::ConsistentHashInfo;
use risingwave_pb::batch_plan::*;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -54,20 +54,20 @@ fn generate_hash_values(
chunk: &DataChunk,
consistent_hash_info: &ConsistentHashInfo,
) -> BatchResult<Vec<usize>> {
let hasher_builder = Crc32FastBuilder;

let hash_values = chunk
.get_hash_values(
&consistent_hash_info
.key
.iter()
.map(|idx| *idx as usize)
.collect::<Vec<_>>(),
hasher_builder,
)
.iter_mut()
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode().to_index()] as usize)
let vnodes = VirtualNode::compute_chunk(
chunk,
&consistent_hash_info
.key
.iter()
.map(|idx| *idx as usize)
.collect::<Vec<_>>(),
);

let hash_values = vnodes
.iter()
.map(|vnode| consistent_hash_info.vmap[vnode.to_index()] as usize)
.collect::<Vec<_>>();

Ok(hash_values)
}

Expand Down
9 changes: 8 additions & 1 deletion src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ pub trait DataChunkTestExt {
/// // f: f32
/// // T: str
/// // TS: Timestamp
/// // SRL: Serial
/// // {i,f}: struct
/// ```
fn from_pretty(s: &str) -> Self;
Expand All @@ -504,6 +505,7 @@ impl DataChunkTestExt for DataChunk {
"TS" => DataType::Timestamp,
"TSZ" => DataType::Timestamptz,
"T" => DataType::Varchar,
"SRL" => DataType::Serial,
array if array.starts_with('{') && array.ends_with('}') => {
DataType::Struct(Arc::new(StructType {
fields: array[1..array.len() - 1]
Expand Down Expand Up @@ -565,6 +567,12 @@ impl DataChunkTestExt for DataChunk {
))
}
ArrayBuilderImpl::Utf8(_) => ScalarImpl::Utf8(s.into()),
ArrayBuilderImpl::Serial(_) => ScalarImpl::Serial(
s.parse::<i64>()
.map_err(|_| panic!("invalid serial: {s:?}"))
.unwrap()
.into(),
),
ArrayBuilderImpl::Struct(builder) => {
assert!(s.starts_with('{') && s.ends_with('}'));
let fields = s[1..s.len() - 1]
Expand Down Expand Up @@ -641,7 +649,6 @@ impl DataChunkTestExt for DataChunk {

#[cfg(test)]
mod tests {

use crate::array::*;
use crate::row::Row;
use crate::{column, column_nonnull};
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/serial_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use postgres_types::{ToSql as _, Type};
use serde::{Serialize, Serializer};

use crate::array::{PrimitiveArray, PrimitiveArrayBuilder};
use crate::util::row_id::RowId;

// Serial is an alias for i64
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
Expand All @@ -37,6 +38,11 @@ impl Serial {
pub fn into_inner(self) -> i64 {
self.0
}

#[inline]
pub fn as_row_id(self) -> RowId {
self.0 as RowId
}
}

impl Serialize for Serial {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl StreamChunkTestExt for StreamChunk {
/// // T: str
/// // TS: Timestamp
/// // TSZ: Timestamptz
/// // SRL: Serial
/// // {i,f}: struct
/// ```
fn from_pretty(s: &str) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
/// Creates a row ID column (for implicit primary key). It'll always have the ID `0` for now.
pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Int64,
data_type: DataType::Serial,
column_id: ROW_ID_COLUMN_ID,
name: row_id_column_name(),
field_descs: vec![],
Expand Down
81 changes: 81 additions & 0 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use parse_display::Display;

use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::HashCode;
use crate::row::{Row, RowExt};
use crate::types::ScalarRefImpl;
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
Expand Down Expand Up @@ -100,3 +106,78 @@ impl VirtualNode {
(0..Self::COUNT).map(Self::from_index)
}
}

impl VirtualNode {
// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
// be the one that contains RowId, and use a special method to skip the calculation of Hash
// and directly extract the `VirtualNode` from `RowId`.
pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
if let Ok(idx) = keys.iter().exactly_one() &&
let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref() {

return serial_array.iter()
.map(|serial|extract_vnode_id_from_row_id(serial.unwrap().as_row_id()))
.collect();
}

data_chunk
.get_hash_values(keys, Crc32FastBuilder)
.into_iter()
.map(|hash| hash.into())
.collect()
}

// `compute_row` is used to calculate the `VirtualNode` for the corresponding column in a `Row`.
// Similar to `compute_chunk`, it also contains special handling for serial columns.
pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
return extract_vnode_id_from_row_id(s.as_row_id());
}

project.hash(Crc32FastBuilder).into()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::array::DataChunkTestExt;
use crate::row::OwnedRow;
use crate::types::ScalarImpl;
use crate::util::row_id::RowIdGenerator;

#[tokio::test]
async fn test_serial_key_chunk() {
let mut gen = RowIdGenerator::new(100);
let chunk = format!(
"SRL I
{} 1
{} 2",
gen.next().await,
gen.next().await,
);

let chunk = DataChunk::from_pretty(chunk.as_str());
let vnodes = VirtualNode::compute_chunk(&chunk, &[0]);

assert_eq!(
vnodes.as_slice(),
&[VirtualNode::from_index(100), VirtualNode::from_index(100)]
);
}

#[tokio::test]
async fn test_serial_key_row() {
let mut gen = RowIdGenerator::new(100);
let row = OwnedRow::new(vec![
Some(ScalarImpl::Serial(gen.next().await.into())),
Some(ScalarImpl::Int64(12345)),
]);

let vnode = VirtualNode::compute_row(&row, &[0]);

assert_eq!(vnode, VirtualNode::from_index(100));
}
}
5 changes: 0 additions & 5 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::array::{
ListRef, StructRef,
};
use crate::collection::estimate_size::EstimateSize;
use crate::hash::VirtualNode;
use crate::row::{OwnedRow, RowDeserializer};
use crate::types::{
DataType, Decimal, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, OrderedF32,
Expand All @@ -60,10 +59,6 @@ impl HashCode {
pub fn hash_code(self) -> u64 {
self.0
}

pub fn to_vnode(self) -> VirtualNode {
VirtualNode::from(self)
}
}

pub trait HashKeySerializer {
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::hash::VirtualNode;

const TIMESTAMP_SHIFT_BITS: u8 = 22;
const VNODE_ID_SHIFT_BITS: u8 = 12;
const SEQUENCE_UPPER_BOUND: u16 = 1 << 12;
Expand All @@ -40,6 +42,13 @@ pub struct RowIdGenerator {

pub type RowId = i64;

#[inline]
pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode {
let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32;
assert!(vnode_id < VNODE_ID_UPPER_BOUND);
VirtualNode::from_index(vnode_id as usize)
}

impl RowIdGenerator {
pub fn new(vnode_id: u32) -> Self {
assert!(vnode_id < VNODE_ID_UPPER_BOUND);
Expand Down
29 changes: 12 additions & 17 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use risingwave_pb::batch_plan::ScanRange as ScanRangePb;
use super::value_encoding::serialize_datum;
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VirtualNode;
use crate::row::{Row, RowExt};
use crate::types::{Datum, ScalarImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangePb`]
Expand Down Expand Up @@ -107,11 +105,8 @@ impl ScanRange {
return None;
}

let pk_prefix_value = &self.eq_conds;
let vnode = pk_prefix_value
.project(dist_key_in_pk_indices)
.hash(Crc32FastBuilder)
.to_vnode();
let pk_prefix_value: &[_] = &self.eq_conds;
let vnode = VirtualNode::compute_row(pk_prefix_value, dist_key_in_pk_indices);
Some(vnode)
}
}
Expand Down Expand Up @@ -193,13 +188,13 @@ mod tests {
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(Some(ScalarImpl::from(514)));
let vnode = OwnedRow::new(vec![
let row = OwnedRow::new(vec![
Some(ScalarImpl::from(114)),
Some(ScalarImpl::from(514)),
])
.project(&[0, 1])
.hash(Crc32FastBuilder)
.to_vnode();
]);

let vnode = VirtualNode::compute_row(&row, &[0, 1]);

assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
}

Expand All @@ -219,14 +214,14 @@ mod tests {
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(Some(ScalarImpl::from(114514)));
let vnode = OwnedRow::new(vec![
let row = OwnedRow::new(vec![
Some(ScalarImpl::from(114)),
Some(ScalarImpl::from(514)),
Some(ScalarImpl::from(114514)),
])
.project(&[2, 1])
.hash(Crc32FastBuilder)
.to_vnode();
]);

let vnode = VirtualNode::compute_row(&row, &[2, 1]);

assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
}
}
Loading

0 comments on commit 014f6db

Please sign in to comment.