diff --git a/Cargo.lock b/Cargo.lock index 5849cc71a321..f840a4d5bb80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6386,6 +6386,7 @@ dependencies = [ "tracing", "uuid", "workspace-hack", + "xorf", "xxhash-rust", "zstd", ] @@ -8600,6 +8601,16 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +[[package]] +name = "xorf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57901b00e3f8e14f4d20b8955bf8087ecb545cfe2ed8741c2a2dbc89847a1a29" +dependencies = [ + "libm", + "rand 0.8.5", +] + [[package]] name = "xxhash-rust" version = "0.8.6" diff --git a/ci/scripts/e2e-iceberg-sink-test.sh b/ci/scripts/e2e-iceberg-sink-test.sh index a3e54b401a5d..67a34a0e1466 100755 --- a/ci/scripts/e2e-iceberg-sink-test.sh +++ b/ci/scripts/e2e-iceberg-sink-test.sh @@ -52,7 +52,7 @@ sleep 1 # prepare minio iceberg sink echo "--- preparing iceberg" .risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/iceberg -wget https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz +wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner DEPENDENCIES=org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2 spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ diff --git a/e2e_test/batch/types/array_ty.slt.part b/e2e_test/batch/types/array_ty.slt.part index 3839d261dcfd..5f1a58e68531 100644 --- a/e2e_test/batch/types/array_ty.slt.part +++ b/e2e_test/batch/types/array_ty.slt.part @@ -84,6 +84,20 @@ select max(ARRAY[1, v1*2]) from t; ---- {1,6} +query T +select CAST(NULL as bool[]) from t; +---- +NULL +NULL +NULL + +query T +select array[false, false] from t; +---- +{f,f} +{f,f} +{f,f} + statement ok drop table t; diff --git a/risedev.yml b/risedev.yml index 82ac6d5cf236..1a310b322267 100644 --- a/risedev.yml +++ b/risedev.yml @@ -162,7 +162,6 @@ profile: - use: kafka persist-data: true - 3etcd-3meta: steps: - use: etcd diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index 3ee32e744306..f1e85f2e40d2 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -55,6 +55,8 @@ pub fn read_numeric_array> } } let arr = builder.finish(); + ensure_eq!(arr.len(), cardinality); + Ok(arr.into()) } @@ -68,7 +70,7 @@ pub fn read_bool_array(array: &ProstArray, cardinality: usize) -> ArrayResult>( } } let arr = builder.finish(); + ensure_eq!(arr.len(), cardinality); + Ok(arr.into()) } diff --git a/src/common/src/array/decimal_array.rs b/src/common/src/array/decimal_array.rs index 496cb5802cc0..cf64149d9d92 100644 --- a/src/common/src/array/decimal_array.rs +++ b/src/common/src/array/decimal_array.rs @@ -60,7 +60,7 @@ mod tests { assert_eq!(prost_array.values.len(), 1); - let decoded_array = ArrayImpl::from_protobuf(&prost_array, 4) + let decoded_array = ArrayImpl::from_protobuf(&prost_array, 8) .unwrap() .into_decimal(); diff --git a/src/common/src/array/list_array.rs b/src/common/src/array/list_array.rs index 1469548e9d83..1c060b92c8c5 100644 --- a/src/common/src/array/list_array.rs +++ b/src/common/src/array/list_array.rs @@ -32,8 +32,6 @@ use crate::types::{ DatumRef, Scalar, ScalarRefImpl, ToDatumRef, }; -/// This is a naive implementation of list array. -/// We will eventually move to a more efficient flatten implementation. #[derive(Debug)] pub struct ListArrayBuilder { bitmap: BitmapBuilder, @@ -146,8 +144,15 @@ impl ListArrayBuilder { } } -/// This is a naive implementation of list array. -/// We will eventually move to a more efficient flatten implementation. +/// Each item of this `ListArray` is a `List`, or called `T[]` (T array). +/// +/// * As other arrays, there is a null bitmap, with `1` meaning nonnull and `0` meaning null. +/// * As [`BytesArray`], there is an offsets `Vec` and a value `Array`. The value `Array` has all +/// items concatenated, and the offsets `Vec` stores start and end indices into it for slicing. +/// Effectively, the inner array is the flattened form, and `offsets.len() == n + 1`. +/// +/// For example, `values (array[1]), (array[]::int[]), (null), (array[2, 3]);` stores an inner +/// `I32Array` with `[1, 2, 3]`, along with offsets `[0, 1, 1, 1, 3]` and null bitmap `TTFT`. #[derive(Debug, Clone, PartialEq)] pub struct ListArray { bitmap: Bitmap, @@ -221,7 +226,11 @@ impl ListArray { ); let bitmap: Bitmap = array.get_null_bitmap()?.into(); let array_data = array.get_list_array_data()?.to_owned(); - let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), bitmap.len())?; + let flatten_len = match array_data.offsets.last() { + Some(&n) => n as usize, + None => bail!("Must have at least one element in offsets"), + }; + let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), flatten_len)?; let arr = ListArray { bitmap, offsets: array_data.offsets, diff --git a/src/frontend/planner_test/tests/testdata/column_pruning.yaml b/src/frontend/planner_test/tests/testdata/column_pruning.yaml index 24f0cc0ab348..8768fa59d7c1 100644 --- a/src/frontend/planner_test/tests/testdata/column_pruning.yaml +++ b/src/frontend/planner_test/tests/testdata/column_pruning.yaml @@ -141,15 +141,18 @@ logical_plan: | LogicalProject { exprs: [t1.a, window_end] } └─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all } - └─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] } optimized_logical_plan: | LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] } - └─LogicalScan { table: t1, columns: [t1.a, t1.created_at] } + └─LogicalScan { table: t1, columns: [t1.a, t1.created_at], predicate: IsNotNull(t1.created_at) } batch_plan: | BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(t1.created_at) } + └─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [a, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_end] } └─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml index c8c348511352..c47a702581e2 100644 --- a/src/frontend/planner_test/tests/testdata/distribution_derive.yaml +++ b/src/frontend/planner_test/tests/testdata/distribution_derive.yaml @@ -815,25 +815,29 @@ logical_plan: | LogicalProject { exprs: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end] } └─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all } - └─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] } optimized_logical_plan: | LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all } - └─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at] } + └─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], predicate: IsNotNull(t1.created_at) } batch_plan: | BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(t1.created_at) } + └─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] } └─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] } materialized table: 4294967294 StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] } - Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - Upstream - BatchPlanNode + StreamFilter { predicate: IsNotNull(t1.created_at) } + Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + Upstream + BatchPlanNode Table 4294967294 { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id], primary key: [$6 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] } diff --git a/src/frontend/planner_test/tests/testdata/nexmark.yaml b/src/frontend/planner_test/tests/testdata/nexmark.yaml index af7336166d6b..08756481b570 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark.yaml @@ -279,14 +279,16 @@ | └─BatchHashAgg { group_key: [window_start, bid.auction], aggs: [count] } | └─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] } | └─BatchExchange { order: [], dist: HashShard(bid.auction) } - | └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard } + | └─BatchFilter { predicate: IsNotNull(bid.date_time) } + | └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard } └─BatchProject { exprs: [max(count), window_start] } └─BatchHashAgg { group_key: [window_start], aggs: [max(count)] } └─BatchExchange { order: [], dist: HashShard(window_start) } └─BatchHashAgg { group_key: [bid.auction, window_start], aggs: [count] } └─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] } └─BatchExchange { order: [], dist: HashShard(bid.auction) } - └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(bid.date_time) } + └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } └─StreamProject { exprs: [bid.auction, count, window_start, window_start] } @@ -297,7 +299,8 @@ | └─StreamAppendOnlyHashAgg { group_key: [window_start, bid.auction], aggs: [count, count] } | └─StreamExchange { dist: HashShard(bid.auction, window_start) } | └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } - | └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + | └─StreamFilter { predicate: IsNotNull(bid.date_time) } + | └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } └─StreamProject { exprs: [max(count), window_start] } └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } @@ -305,7 +308,8 @@ └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(bid.auction, window_start) } └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + └─StreamFilter { predicate: IsNotNull(bid.date_time) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } @@ -328,9 +332,10 @@ Fragment 2 StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } - Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } - Upstream - BatchPlanNode + StreamFilter { predicate: IsNotNull(bid.date_time) } + Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + Upstream + BatchPlanNode Fragment 3 StreamProject { exprs: [bid.auction, window_start, count] } @@ -340,9 +345,10 @@ Fragment 4 StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] } - Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } - Upstream - BatchPlanNode + StreamFilter { predicate: IsNotNull(bid.date_time) } + Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + Upstream + BatchPlanNode Table 0 { columns: [bid_auction, count, window_start], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [2] } Table 1 { columns: [window_start, bid_auction, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index 93417b548a57..fb637bba603d 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -329,16 +329,20 @@ | └─BatchHashAgg { group_key: [window_start, auction], aggs: [count] } | └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } | └─BatchExchange { order: [], dist: HashShard(auction) } - | └─BatchProject { exprs: [auction, date_time] } - | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } + | └─BatchFilter { predicate: IsNotNull(date_time) } + | └─BatchProject { exprs: [auction, date_time] } + | └─BatchFilter { predicate: IsNotNull(date_time) } + | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } └─BatchProject { exprs: [max(count), window_start] } └─BatchHashAgg { group_key: [window_start], aggs: [max(count)] } └─BatchExchange { order: [], dist: HashShard(window_start) } └─BatchHashAgg { group_key: [auction, window_start], aggs: [count] } └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } └─BatchExchange { order: [], dist: HashShard(auction) } - └─BatchProject { exprs: [auction, date_time] } - └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } + └─BatchFilter { predicate: IsNotNull(date_time) } + └─BatchProject { exprs: [auction, date_time] } + └─BatchFilter { predicate: IsNotNull(date_time) } + └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } stream_plan: | StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } └─StreamProject { exprs: [auction, count, window_start, window_start] } @@ -349,11 +353,13 @@ | └─StreamAppendOnlyHashAgg { group_key: [window_start, auction], aggs: [count, count] } | └─StreamExchange { dist: HashShard(auction, window_start) } | └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - | └─StreamProject { exprs: [auction, date_time, _row_id] } - | └─StreamShare { id = 754 } - | └─StreamProject { exprs: [auction, date_time, _row_id] } - | └─StreamRowIdGen { row_id_index: 7 } - | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + | └─StreamFilter { predicate: IsNotNull(date_time) } + | └─StreamProject { exprs: [auction, date_time, _row_id] } + | └─StreamShare { id = 919 } + | └─StreamProject { exprs: [auction, date_time, _row_id] } + | └─StreamFilter { predicate: IsNotNull(date_time) } + | └─StreamRowIdGen { row_id_index: 7 } + | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } └─StreamProject { exprs: [max(count), window_start] } └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } @@ -361,11 +367,13 @@ └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(auction, window_start) } └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - └─StreamProject { exprs: [auction, date_time, _row_id] } - └─StreamShare { id = 754 } - └─StreamProject { exprs: [auction, date_time, _row_id] } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + └─StreamFilter { predicate: IsNotNull(date_time) } + └─StreamProject { exprs: [auction, date_time, _row_id] } + └─StreamShare { id = 919 } + └─StreamProject { exprs: [auction, date_time, _row_id] } + └─StreamFilter { predicate: IsNotNull(date_time) } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } stream_dist_plan: | Fragment 0 StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } @@ -388,14 +396,16 @@ Fragment 2 StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - StreamProject { exprs: [auction, date_time, _row_id] } - StreamExchange Hash([2]) from 3 + StreamFilter { predicate: IsNotNull(date_time) } + StreamProject { exprs: [auction, date_time, _row_id] } + StreamExchange Hash([2]) from 3 Fragment 3 StreamProject { exprs: [auction, date_time, _row_id] } - StreamRowIdGen { row_id_index: 7 } - StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } - source state table: 5 + StreamFilter { predicate: IsNotNull(date_time) } + StreamRowIdGen { row_id_index: 7 } + StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + source state table: 5 Fragment 4 StreamProject { exprs: [auction, window_start, count] } @@ -405,8 +415,9 @@ Fragment 5 StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - StreamProject { exprs: [auction, date_time, _row_id] } - StreamExchange Hash([2]) from 3 + StreamFilter { predicate: IsNotNull(date_time) } + StreamProject { exprs: [auction, date_time, _row_id] } + StreamExchange Hash([2]) from 3 Table 0 { columns: [auction, count, window_start], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [2] } Table 1 { columns: [window_start, auction, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } diff --git a/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml b/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml index 59567b31c598..3ae0faf0d7e9 100644 --- a/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml +++ b/src/frontend/planner_test/tests/testdata/predicate_pushdown.yaml @@ -54,14 +54,15 @@ logical_plan: | LogicalProject { exprs: [t.v1, t.v2, t.v3, t.v4, t.ts, window_start, window_end] } └─LogicalFilter { predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Date) AND (window_start >= '1997-07-02':Date) AND (window_end >= '1997-07-03':Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) } - └─LogicalShare { id = 4 } + └─LogicalShare { id = 5 } └─LogicalProject { exprs: [t.v1, t.v2, t.v3, t.v4, t.ts, window_start, window_end] } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] } + └─LogicalFilter { predicate: IsNotNull(t.ts) } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] } optimized_logical_plan: | LogicalFilter { predicate: (window_start >= '1997-07-02':Date) AND (window_end >= '1997-07-03':Date) AND (window_start >= (t.ts + '1 day':Interval)) AND (window_end > (t.ts + '4 days':Interval)) } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Date) } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts], predicate: (t.v1 = 10:Int32) AND (t.v2 = 20:Int32) AND (t.v3 = 30:Int32) AND (t.ts >= '1997-07-01':Date) AND IsNotNull(t.ts) } - name: filter hop transpose with non-trivial output-indices sql: | create table t(v1 int, v2 int, v3 int, v4 int, ts date); @@ -70,15 +71,16 @@ logical_plan: | LogicalProject { exprs: [window_end, t.v4, t.v2] } └─LogicalFilter { predicate: (window_end > '2022-01-01':Date) AND (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) } - └─LogicalShare { id = 4 } + └─LogicalShare { id = 5 } └─LogicalProject { exprs: [window_end, t.v4, t.v2] } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] } + └─LogicalFilter { predicate: IsNotNull(t.ts) } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t.v4, t.ts, t._row_id] } optimized_logical_plan: | LogicalProject { exprs: [window_end, t.v4, t.v2] } └─LogicalFilter { predicate: (window_end > '2022-01-01':Date) } └─LogicalHopWindow { time_col: t.ts, slide: 1 day, size: 3 days, output: [t.v2, t.v4, window_end] } - └─LogicalScan { table: t, columns: [t.v2, t.v4, t.ts], predicate: (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) } + └─LogicalScan { table: t, columns: [t.v2, t.v4, t.ts], predicate: (t.v4 = 10:Int32) AND (t.v2 > 20:Int32) AND IsNotNull(t.ts) } - name: filter union transpose sql: | create table t1 (v1 int, v2 int, v3 int); diff --git a/src/frontend/planner_test/tests/testdata/share.yaml b/src/frontend/planner_test/tests/testdata/share.yaml index 9444c038a90d..1d5e5dfaebe2 100644 --- a/src/frontend/planner_test/tests/testdata/share.yaml +++ b/src/frontend/planner_test/tests/testdata/share.yaml @@ -104,16 +104,20 @@ | └─BatchHashAgg { group_key: [window_start, auction], aggs: [count] } | └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } | └─BatchExchange { order: [], dist: HashShard(auction) } - | └─BatchProject { exprs: [auction, date_time] } - | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } + | └─BatchFilter { predicate: IsNotNull(date_time) } + | └─BatchProject { exprs: [auction, date_time] } + | └─BatchFilter { predicate: IsNotNull(date_time) } + | └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } └─BatchProject { exprs: [max(count), window_start] } └─BatchHashAgg { group_key: [window_start], aggs: [max(count)] } └─BatchExchange { order: [], dist: HashShard(window_start) } └─BatchHashAgg { group_key: [auction, window_start], aggs: [count] } └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] } └─BatchExchange { order: [], dist: HashShard(auction) } - └─BatchProject { exprs: [auction, date_time] } - └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } + └─BatchFilter { predicate: IsNotNull(date_time) } + └─BatchProject { exprs: [auction, date_time] } + └─BatchFilter { predicate: IsNotNull(date_time) } + └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } stream_plan: | StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] } └─StreamProject { exprs: [auction, count, window_start, window_start] } @@ -124,11 +128,13 @@ | └─StreamAppendOnlyHashAgg { group_key: [window_start, auction], aggs: [count, count] } | └─StreamExchange { dist: HashShard(auction, window_start) } | └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - | └─StreamProject { exprs: [auction, date_time, _row_id] } - | └─StreamShare { id = 754 } - | └─StreamProject { exprs: [auction, date_time, _row_id] } - | └─StreamRowIdGen { row_id_index: 7 } - | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + | └─StreamFilter { predicate: IsNotNull(date_time) } + | └─StreamProject { exprs: [auction, date_time, _row_id] } + | └─StreamShare { id = 919 } + | └─StreamProject { exprs: [auction, date_time, _row_id] } + | └─StreamFilter { predicate: IsNotNull(date_time) } + | └─StreamRowIdGen { row_id_index: 7 } + | └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } └─StreamProject { exprs: [max(count), window_start] } └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] } └─StreamExchange { dist: HashShard(window_start) } @@ -136,11 +142,13 @@ └─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] } └─StreamExchange { dist: HashShard(auction, window_start) } └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] } - └─StreamProject { exprs: [auction, date_time, _row_id] } - └─StreamShare { id = 754 } - └─StreamProject { exprs: [auction, date_time, _row_id] } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + └─StreamFilter { predicate: IsNotNull(date_time) } + └─StreamProject { exprs: [auction, date_time, _row_id] } + └─StreamShare { id = 919 } + └─StreamProject { exprs: [auction, date_time, _row_id] } + └─StreamFilter { predicate: IsNotNull(date_time) } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } - sql: | set rw_enable_share_plan=true; create table t(a int, b int); diff --git a/src/frontend/planner_test/tests/testdata/subquery.yaml b/src/frontend/planner_test/tests/testdata/subquery.yaml index 7044e20acb7f..0bca90173b30 100644 --- a/src/frontend/planner_test/tests/testdata/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/subquery.yaml @@ -210,30 +210,32 @@ LogicalProject { exprs: [auction.date_time, window_start, window_end] } └─LogicalApply { type: LeftSemi, on: true, correlated_id: 1 } ├─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } - | └─LogicalScan { table: auction, columns: [auction.date_time, auction._row_id] } + | └─LogicalFilter { predicate: IsNotNull(auction.date_time) } + | └─LogicalScan { table: auction, columns: [auction.date_time, auction._row_id] } └─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr2] } └─LogicalAgg { group_key: [$expr1], aggs: [] } └─LogicalProject { exprs: [CorrelatedInputRef { index: 0, correlated_id: 1 } as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } optimized_logical_plan: | LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(auction.date_time, auction.date_time), output: all } - ├─LogicalShare { id = 121 } + ├─LogicalShare { id = 125 } | └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } - | └─LogicalScan { table: auction, columns: [auction.date_time] } + | └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } └─LogicalProject { exprs: [auction.date_time] } └─LogicalAgg { group_key: [auction.date_time, auction.date_time], aggs: [] } └─LogicalJoin { type: Inner, on: true, output: [auction.date_time, auction.date_time] } ├─LogicalAgg { group_key: [auction.date_time], aggs: [] } - | └─LogicalShare { id = 121 } + | └─LogicalShare { id = 125 } | └─LogicalHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } - | └─LogicalScan { table: auction, columns: [auction.date_time] } + | └─LogicalScan { table: auction, columns: [auction.date_time], predicate: IsNotNull(auction.date_time) } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: | BatchExchange { order: [], dist: Single } └─BatchHashJoin { type: LeftSemi, predicate: auction.date_time IS NOT DISTINCT FROM auction.date_time, output: all } ├─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } | └─BatchExchange { order: [], dist: HashShard(auction.date_time) } - | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } + | └─BatchFilter { predicate: IsNotNull(auction.date_time) } + | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(auction.date_time) } └─BatchProject { exprs: [auction.date_time] } └─BatchHashAgg { group_key: [auction.date_time, auction.date_time], aggs: [] } @@ -243,7 +245,8 @@ | └─BatchHashAgg { group_key: [auction.date_time], aggs: [] } | └─BatchHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: all } | └─BatchExchange { order: [], dist: HashShard(auction.date_time) } - | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } + | └─BatchFilter { predicate: IsNotNull(auction.date_time) } + | └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard } └─BatchValues { rows: [[]] } stream_error: |- Feature is not yet implemented: Stream values executor is unimplemented! diff --git a/src/frontend/planner_test/tests/testdata/time_window.yaml b/src/frontend/planner_test/tests/testdata/time_window.yaml index 678147aeeadd..817d008bd656 100644 --- a/src/frontend/planner_test/tests/testdata/time_window.yaml +++ b/src/frontend/planner_test/tests/testdata/time_window.yaml @@ -62,63 +62,75 @@ logical_plan: | LogicalProject { exprs: [t1.id, t1.created_at, window_start, window_end] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } stream_plan: | StreamMaterialize { columns: [id, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at, window_start, window_end, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | create table t1 (id int, created_at date); select id, created_at, window_start from hop(t1, created_at, interval '1' day, interval '3' day); logical_plan: | LogicalProject { exprs: [t1.id, t1.created_at, window_start] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } stream_plan: | StreamMaterialize { columns: [id, created_at, window_start, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at, window_start, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | create table t1 (id int, created_at date); select id, created_at, window_end from hop(t1, created_at, interval '1' day, interval '3' day); logical_plan: | LogicalProject { exprs: [t1.id, t1.created_at, window_end] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } stream_plan: | StreamMaterialize { columns: [id, created_at, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_end] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at, window_end, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | create table t1 (id int, created_at date); select id, created_at from hop(t1, created_at, interval '1' day, interval '3' day); logical_plan: | LogicalProject { exprs: [t1.id, t1.created_at] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } batch_plan: | BatchHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t1, columns: [t1.id, t1.created_at], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(t1.created_at) } + └─BatchScan { table: t1, columns: [t1.id, t1.created_at], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [id, created_at, window_start(hidden), t1._row_id(hidden)], pk_columns: [t1._row_id, window_start] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at, window_start, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | create table t1 (id int, created_at date); select t_hop.id, t_hop.created_at from hop(t1, created_at, interval '1' day, interval '3' day) as t_hop; logical_plan: | LogicalProject { exprs: [t1.id, t1.created_at] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id] } batch_plan: | BatchHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at] } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t1, columns: [t1.id, t1.created_at], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(t1.created_at) } + └─BatchScan { table: t1, columns: [t1.id, t1.created_at], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [id, created_at, window_start(hidden), t1._row_id(hidden)], pk_columns: [t1._row_id, window_start] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.created_at, window_start, t1._row_id] } - └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) } + └─StreamTableScan { table: t1, columns: [t1.id, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | create table t (v1 varchar, v2 timestamp, v3 float); select v1, window_end, avg(v3) as avg from hop( t, v2, interval '1' minute, interval '10' minute) group by v1, window_end; @@ -127,21 +139,24 @@ └─LogicalAgg { group_key: [t.v1, window_end], aggs: [sum(t.v3), count(t.v3)] } └─LogicalProject { exprs: [t.v1, window_end, t.v3] } └─LogicalHopWindow { time_col: t.v2, slide: 00:01:00, size: 00:10:00, output: all } - └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id] } + └─LogicalFilter { predicate: IsNotNull(t.v2) } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id] } batch_plan: | BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.v1, window_end, (sum(t.v3) / count(t.v3)::Float64) as $expr23] } └─BatchHashAgg { group_key: [t.v1, window_end], aggs: [sum(t.v3), count(t.v3)] } └─BatchHopWindow { time_col: t.v2, slide: 00:01:00, size: 00:10:00, output: [t.v1, t.v3, window_end] } └─BatchExchange { order: [], dist: HashShard(t.v1) } - └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } + └─BatchFilter { predicate: IsNotNull(t.v2) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [v1, window_end, avg], pk_columns: [v1, window_end] } └─StreamProject { exprs: [t.v1, window_end, (sum(t.v3) / count(t.v3)::Float64) as $expr47] } └─StreamHashAgg { group_key: [t.v1, window_end], aggs: [count, sum(t.v3), count(t.v3)] } └─StreamExchange { dist: HashShard(t.v1, window_end) } └─StreamHopWindow { time_col: t.v2, slide: 00:01:00, size: 00:10:00, output: [t.v1, t.v3, window_end, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamFilter { predicate: IsNotNull(t.v2) } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - sql: | create table t1 (id int, v1 int, created_at date); with t2 as (select * from t1 where v1 >= 10) @@ -170,19 +185,20 @@ logical_plan: | LogicalProject { exprs: [t1.id, t1.v1, t1.created_at, window_start, window_end] } └─LogicalHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } - └─LogicalShare { id = 4 } - └─LogicalProject { exprs: [t1.id, t1.v1, t1.created_at] } - └─LogicalFilter { predicate: (t1.v1 >= 10:Int32) } - └─LogicalScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id] } + └─LogicalFilter { predicate: IsNotNull(t1.created_at) } + └─LogicalShare { id = 4 } + └─LogicalProject { exprs: [t1.id, t1.v1, t1.created_at] } + └─LogicalFilter { predicate: (t1.v1 >= 10:Int32) } + └─LogicalScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id] } batch_plan: | BatchHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: all } └─BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (t1.v1 >= 10:Int32) } + └─BatchFilter { predicate: IsNotNull(t1.created_at) AND (t1.v1 >= 10:Int32) } └─BatchScan { table: t1, columns: [t1.id, t1.v1, t1.created_at], distribution: SomeShard } stream_plan: | StreamMaterialize { columns: [id, v1, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] } └─StreamHopWindow { time_col: t1.created_at, slide: 1 day, size: 3 days, output: [t1.id, t1.v1, t1.created_at, window_start, window_end, t1._row_id] } - └─StreamFilter { predicate: (t1.v1 >= 10:Int32) } + └─StreamFilter { predicate: IsNotNull(t1.created_at) AND (t1.v1 >= 10:Int32) } └─StreamTableScan { table: t1, columns: [t1.id, t1.v1, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - sql: | with t(ts) as (values ('2020-01-01 12:00:00'::timestamp)) select * from tumble(t, ts, interval '10' second) as z; diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 7c7e838bb045..14fef7889bb4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -22,10 +22,10 @@ use risingwave_common::types::{DataType, IntervalUnit}; use super::generic::GenericPlanNode; use super::{ - gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, PlanBase, - PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, + gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, + PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, }; -use crate::expr::InputRef; +use crate::expr::{ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -40,6 +40,8 @@ pub struct LogicalHopWindow { } impl LogicalHopWindow { + /// just used in optimizer and the function will not check if the `time_col`'s value is NULL + /// compared with `LogicalHopWindow::create` fn new( input: PlanRef, time_col: InputRef, @@ -117,13 +119,20 @@ impl LogicalHopWindow { self.core.into_parts() } - /// the function will check if the cond is bool expression + /// used for binder and planner. The function will add a filter operator to ignore records with + /// NULL time value. pub fn create( input: PlanRef, time_col: InputRef, window_slide: IntervalUnit, window_size: IntervalUnit, ) -> PlanRef { + let input = LogicalFilter::create_with_expr( + input, + FunctionCall::new(ExprType::IsNotNull, vec![time_col.clone().into()]) + .unwrap() + .into(), + ); Self::new(input, time_col, window_slide, window_size, None).into() } diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 93efb008979f..539b770793b9 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -20,13 +20,16 @@ use risingwave_backup::error::BackupResult; use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot}; use risingwave_backup::MetaSnapshotId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt; -use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; use risingwave_pb::hummock::{HummockVersion, HummockVersionDelta, HummockVersionStats}; use risingwave_pb::user::UserInfo; +use crate::manager::model::get_system_params_at_snapshot; use crate::model::MetadataModel; use crate::storage::{MetaStore, Snapshot, DEFAULT_COLUMN_FAMILY}; +const VERSION: u32 = 1; + pub struct MetaSnapshotBuilder { snapshot: MetaSnapshot, meta_store: Arc, @@ -41,6 +44,7 @@ impl MetaSnapshotBuilder { } pub async fn build(&mut self, id: MetaSnapshotId) -> BackupResult<()> { + self.snapshot.format_version = VERSION; self.snapshot.id = id; // Caveat: snapshot impl of etcd meta store doesn't prevent it from expiration. // So expired snapshot read may return error. If that happens, @@ -89,6 +93,10 @@ impl MetaSnapshotBuilder { let sink = Sink::list_at_snapshot::(&meta_store_snapshot).await?; let source = Source::list_at_snapshot::(&meta_store_snapshot).await?; let view = View::list_at_snapshot::(&meta_store_snapshot).await?; + let function = Function::list_at_snapshot::(&meta_store_snapshot).await?; + let system_param = get_system_params_at_snapshot::(&meta_store_snapshot) + .await? + .ok_or_else(|| anyhow!("system params not found in meta store"))?; self.snapshot.metadata = ClusterMetadata { default_cf, @@ -104,6 +112,8 @@ impl MetaSnapshotBuilder { view, table_fragments, user_info, + function, + system_param, }; Ok(()) } @@ -137,8 +147,10 @@ mod tests { use risingwave_pb::hummock::{HummockVersion, HummockVersionStats}; use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder; + use crate::manager::model::SystemParamsModel; use crate::model::MetadataModel; use crate::storage::{MemStore, MetaStore, DEFAULT_COLUMN_FAMILY}; + use crate::MetaOpts; #[tokio::test] async fn test_snapshot_builder() { @@ -173,6 +185,15 @@ mod tests { .insert(meta_store.deref()) .await .unwrap(); + let err = builder.build(1).await.unwrap_err(); + let err = assert_matches!(err, BackupError::Other(e) => e); + assert_eq!("system params not found in meta store", err.to_error_str()); + + MetaOpts::test(true) + .init_system_params() + .insert(meta_store.deref()) + .await + .unwrap(); let mut builder = MetaSnapshotBuilder::new(meta_store.clone()); builder.build(1).await.unwrap(); diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index a96cf9299b4e..cf01331c34b4 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -22,6 +22,7 @@ use risingwave_common::config::MetaBackend; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; use crate::dispatch_meta_store; use crate::hummock::model::CompactionGroup; +use crate::manager::model::SystemParamsModel; use crate::model::{MetadataModel, TableFragments}; use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; @@ -53,6 +54,9 @@ pub struct RestoreOpts { /// Directory of storage to fetch meta snapshot from. #[clap(long, default_value_t = String::from("backup"))] pub storage_directory: String, + /// Print the target snapshot, but won't restore to meta store. + #[clap(long)] + pub dry_run: bool, } async fn restore_metadata_model( @@ -68,6 +72,19 @@ async fn restore_metadata_model( Ok(()) } +async fn restore_system_param_model( + meta_store: &S, + metadata: &[T], +) -> BackupResult<()> { + if T::get(meta_store).await?.is_some() { + return Err(BackupError::NonemptyMetaStorage); + } + for d in metadata { + d.insert(meta_store).await?; + } + Ok(()) +} + async fn restore_default_cf( meta_store: &S, snapshot: &MetaSnapshot, @@ -115,6 +132,8 @@ async fn restore_metadata(meta_store: S, snapshot: MetaSnapshot) - restore_metadata_model(&meta_store, &snapshot.metadata.sink).await?; restore_metadata_model(&meta_store, &snapshot.metadata.view).await?; restore_metadata_model(&meta_store, &snapshot.metadata.source).await?; + restore_metadata_model(&meta_store, &snapshot.metadata.function).await?; + restore_system_param_model(&meta_store, &[snapshot.metadata.system_param]).await?; Ok(()) } @@ -146,13 +165,18 @@ async fn restore_impl( target_id ))); } + let mut target_snapshot = backup_store.get(target_id).await?; + tracing::info!( + "snapshot {} before rewrite:\n{}", + target_id, + target_snapshot + ); let newest_id = snapshot_list .into_iter() .map(|m| m.id) .max() .expect("should exist"); - let newest_snapshot = backup_store.get(newest_id).await?; - let mut target_snapshot = backup_store.get(target_id).await?; + assert!(newest_id >= target_id); // Always use newest snapshot's `default_cf` during restoring, in order not to corrupt shared // data of snapshots. Otherwise, for example if we restore a older SST id generator, an // existent SST in object store is at risk of being overwrote by the restored cluster. @@ -160,17 +184,27 @@ async fn restore_impl( // - Value is monotonically non-decreasing. // - Value is memcomparable. // - Keys of newest_snapshot is a superset of that of target_snapshot. - assert!(newest_snapshot.id >= target_snapshot.id); - for (k, v) in &target_snapshot.metadata.default_cf { - let newest_v = newest_snapshot - .metadata - .default_cf - .get(k) - .unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k)); - assert!(newest_v >= v, "violate monotonicity requirement"); + if newest_id > target_id { + let newest_snapshot = backup_store.get(newest_id).await?; + for (k, v) in &target_snapshot.metadata.default_cf { + let newest_v = newest_snapshot + .metadata + .default_cf + .get(k) + .unwrap_or_else(|| panic!("violate superset requirement. key {:x?}", k)); + assert!(newest_v >= v, "violate monotonicity requirement"); + } + target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf; + tracing::info!( + "snapshot {} after rewrite by snapshot {}:\n{}", + target_id, + target_snapshot, + newest_id + ); + } + if opts.dry_run { + return Ok(()); } - target_snapshot.metadata.default_cf = newest_snapshot.metadata.default_cf; - dispatch_meta_store!(meta_store.clone(), store, { restore_metadata(store.clone(), target_snapshot.clone()).await?; }); @@ -182,10 +216,10 @@ pub async fn restore(opts: RestoreOpts) -> BackupResult<()> { let result = restore_impl(opts, None, None).await; match &result { Ok(_) => { - tracing::info!("restore succeeded"); + tracing::info!("command succeeded"); } Err(e) => { - tracing::warn!("restore failed: {}", e); + tracing::warn!("command failed: {}", e); } } result @@ -199,11 +233,13 @@ mod tests { use itertools::Itertools; use risingwave_backup::meta_snapshot::{ClusterMetadata, MetaSnapshot}; use risingwave_pb::hummock::HummockVersion; + use risingwave_pb::meta::SystemParams; use crate::backup_restore::restore::restore_impl; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; use crate::backup_restore::RestoreOpts; use crate::dispatch_meta_store; + use crate::manager::model::SystemParamsModel; use crate::model::MetadataModel; use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; @@ -219,6 +255,20 @@ mod tests { ]) } + fn get_system_params() -> SystemParams { + SystemParams { + barrier_interval_ms: Some(101), + checkpoint_frequency: Some(102), + sstable_size_mb: Some(103), + block_size_kb: Some(104), + bloom_false_positive: Some(0.1), + state_store: Some("state_store".to_string()), + data_directory: Some("data_directory".to_string()), + backup_storage_url: Some("backup_storage_url".to_string()), + backup_storage_directory: Some("backup_storage_directory".to_string()), + } + } + #[tokio::test] async fn test_restore_basic() { let opts = get_restore_opts(); @@ -229,6 +279,7 @@ mod tests { hummock_version.insert(&store).await.unwrap(); }); let empty_meta_store = get_meta_store(opts.clone()).await.unwrap(); + let system_param = get_system_params(); let snapshot = MetaSnapshot { id: opts.meta_snapshot_id, metadata: ClusterMetadata { @@ -236,8 +287,10 @@ mod tests { id: 123, ..Default::default() }, + system_param: system_param.clone(), ..Default::default() }, + ..Default::default() }; // target snapshot not found @@ -261,11 +314,23 @@ mod tests { restore_impl( opts.clone(), - Some(empty_meta_store), + Some(empty_meta_store.clone()), Some(backup_store.clone()), ) .await .unwrap(); + + dispatch_meta_store!(empty_meta_store, store, { + let restored_hummock_version = HummockVersion::list(&store) + .await + .unwrap() + .into_iter() + .next() + .unwrap(); + assert_eq!(restored_hummock_version.id, 123); + let restored_system_param = SystemParams::get(&store).await.unwrap().unwrap(); + assert_eq!(restored_system_param, system_param); + }); } #[tokio::test] @@ -276,8 +341,10 @@ mod tests { id: opts.meta_snapshot_id, metadata: ClusterMetadata { default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]), + system_param: get_system_params(), ..Default::default() }, + ..Default::default() }; backup_store.create(&snapshot).await.unwrap(); @@ -331,8 +398,10 @@ mod tests { id: opts.meta_snapshot_id, metadata: ClusterMetadata { default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]), + system_param: get_system_params(), ..Default::default() }, + ..Default::default() }; backup_store.create(&snapshot).await.unwrap(); @@ -360,8 +429,10 @@ mod tests { id: opts.meta_snapshot_id, metadata: ClusterMetadata { default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]), + system_param: get_system_params(), ..Default::default() }, + ..Default::default() }; backup_store.create(&snapshot).await.unwrap(); @@ -379,4 +450,49 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_dry_run() { + let mut opts = get_restore_opts(); + assert!(!opts.dry_run); + opts.dry_run = true; + let backup_store = get_backup_store(opts.clone()).await.unwrap(); + let empty_meta_store = get_meta_store(opts.clone()).await.unwrap(); + let system_param = get_system_params(); + let snapshot = MetaSnapshot { + id: opts.meta_snapshot_id, + metadata: ClusterMetadata { + default_cf: HashMap::from([ + ( + "some_key_1".as_bytes().to_vec(), + memcomparable::to_vec(&10).unwrap(), + ), + ( + "some_key_2".as_bytes().to_vec(), + memcomparable::to_vec(&"some_value_2".to_string()).unwrap(), + ), + ]), + hummock_version: HummockVersion { + id: 123, + ..Default::default() + }, + system_param: system_param.clone(), + ..Default::default() + }, + ..Default::default() + }; + backup_store.create(&snapshot).await.unwrap(); + restore_impl( + opts.clone(), + Some(empty_meta_store.clone()), + Some(backup_store.clone()), + ) + .await + .unwrap(); + + dispatch_meta_store!(empty_meta_store, store, { + assert!(HummockVersion::list(&store).await.unwrap().is_empty()); + assert!(SystemParams::get(&store).await.unwrap().is_none()); + }); + } } diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index e29933ef0056..4e463e52d5ed 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -68,7 +68,7 @@ impl TierCompactionPicker { let max_compaction_bytes = std::cmp::min( self.config.max_compaction_bytes, - self.config.max_bytes_for_level_base * 2, + self.config.sub_level_max_compaction_bytes, ); let mut compaction_bytes = level.total_file_size; @@ -77,7 +77,7 @@ impl TierCompactionPicker { let mut waiting_enough_files = true; for other in &l0.sub_levels[idx + 1..] { - if compaction_bytes >= max_compaction_bytes { + if compaction_bytes > max_compaction_bytes { waiting_enough_files = false; break; } @@ -122,12 +122,11 @@ impl TierCompactionPicker { // compact task never be trigger. if level.level_type == non_overlapping_type && is_write_amp_large - && waiting_enough_files + && select_level_inputs.len() < self.config.level0_tier_compact_file_number as usize { stats.skip_by_write_amp_limit += 1; continue; } - select_level_inputs.reverse(); return Some(CompactionInput { @@ -217,8 +216,9 @@ impl CompactionPicker for TierCompactionPicker { pub mod tests { use std::sync::Arc; + use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level; use risingwave_pb::hummock::hummock_version::Levels; - use risingwave_pb::hummock::LevelType; + use risingwave_pb::hummock::{LevelType, OverlappingLevel}; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::level_selector::tests::{ @@ -362,10 +362,10 @@ pub mod tests { let l0 = generate_l0_overlapping_sublevels(vec![ vec![ generate_table(4, 1, 10, 90, 1), - generate_table(5, 1, 210, 220, 1), + generate_table(5, 1, 200, 220, 1), ], - vec![generate_table(6, 1, 0, 100, 1)], - vec![generate_table(7, 1, 0, 100, 1)], + vec![generate_table(6, 1, 1, 100, 1)], + vec![generate_table(7, 1, 1, 100, 1)], ]); let mut levels = Levels { l0: Some(l0), @@ -377,7 +377,7 @@ pub mod tests { let config = Arc::new( CompactionConfigBuilder::new() .level0_tier_compact_file_number(2) - .sub_level_max_compaction_bytes(1) + .sub_level_max_compaction_bytes(100) .max_compaction_bytes(500000) .build(), ); @@ -386,24 +386,53 @@ pub mod tests { // sub-level 0 is excluded because it's nonoverlapping and violating // sub_level_max_compaction_bytes. let mut picker = - TierCompactionPicker::new(config.clone(), Arc::new(RangeOverlapStrategy::default())); + TierCompactionPicker::new(config, Arc::new(RangeOverlapStrategy::default())); let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); assert_eq!(ret.input_levels.len(), 2); assert_eq!(ret.target_level, 0); assert_eq!(ret.target_sub_level_id, 1); + } - // sub-level 0 is included because it's overlapping even if violating - // sub_level_max_compaction_bytes. - levels.l0.as_mut().unwrap().sub_levels[0].level_type = LevelType::Overlapping as i32; + #[test] + fn test_write_amp_bug_skip() { + let l1 = new_sub_level( + 1, + LevelType::Nonoverlapping, + vec![ + generate_table(3, 1, 1, 50, 1), + generate_table(4, 1, 51, 100, 1), + ], + ); + let l2 = new_sub_level( + 2, + LevelType::Nonoverlapping, + vec![ + generate_table(3, 1, 1, 50, 1), + generate_table(4, 1, 51, 200, 1), + ], + ); + let levels = Levels { + l0: Some(OverlappingLevel { + total_file_size: l1.total_file_size + l2.total_file_size, + sub_levels: vec![l1, l2], + }), + levels: vec![], + ..Default::default() + }; + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(4) + .sub_level_max_compaction_bytes(100) + .max_compaction_bytes(500000) + .build(), + ); + let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + let mut local_stats = LocalPickerStatistic::default(); let mut picker = TierCompactionPicker::new(config, Arc::new(RangeOverlapStrategy::default())); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - assert_eq!(ret.input_levels.len(), 3); - assert_eq!(ret.target_level, 0); - assert_eq!(ret.target_sub_level_id, 0); + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); } } diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index fe3227defadc..af4f59f4f973 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -166,7 +166,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use risingwave_common::config::{load_config, MetaBackend}; +use risingwave_common::config::{load_config, MetaBackend, RwConfig}; use tracing::info; /// Start meta node @@ -201,6 +201,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { MetaBackend::Mem => MetaStoreBackend::Mem, }; + validate_config(&config); + let max_heartbeat_interval = Duration::from_secs(config.meta.max_heartbeat_interval_secs as u64); let barrier_interval = Duration::from_millis(config.streaming.barrier_interval_ms as u64); @@ -267,3 +269,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { } }) } + +fn validate_config(config: &RwConfig) { + if config.meta.meta_leader_lease_secs <= 1 { + let error_msg = "meta leader lease secs should be larger than 1"; + tracing::error!(error_msg); + panic!("{}", error_msg); + } +} diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index bd2ecb69d1ce..119c25d8079e 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod model; - +pub mod model; use std::ops::DerefMut; use std::sync::Arc; diff --git a/src/meta/src/manager/system_param/model.rs b/src/meta/src/manager/system_param/model.rs index 9ee4c2da4ec9..d53eb0f88a01 100644 --- a/src/meta/src/manager/system_param/model.rs +++ b/src/meta/src/manager/system_param/model.rs @@ -17,7 +17,7 @@ use risingwave_common::system_param::{system_params_from_kv, system_params_to_kv use risingwave_pb::meta::SystemParams; use crate::model::{MetadataModelError, MetadataModelResult, Transactional}; -use crate::storage::{MetaStore, Transaction}; +use crate::storage::{MetaStore, Snapshot, Transaction}; const SYSTEM_PARAMS_CF_NAME: &str = "cf/system_params"; @@ -75,3 +75,19 @@ impl Transactional for SystemParams { unreachable!() } } + +pub async fn get_system_params_at_snapshot( + snapshot: &S::Snapshot, +) -> MetadataModelResult> +where + S: MetaStore, +{ + let kvs = snapshot.list_cf(&SystemParams::cf_name()).await?; + if kvs.is_empty() { + Ok(None) + } else { + Ok(Some( + system_params_from_kv(kvs).map_err(MetadataModelError::internal)?, + )) + } +} diff --git a/src/meta/src/rpc/election_client.rs b/src/meta/src/rpc/election_client.rs index ef62d4239a4c..b54b53abd4e7 100644 --- a/src/meta/src/rpc/election_client.rs +++ b/src/meta/src/rpc/election_client.rs @@ -146,15 +146,25 @@ impl ElectionClient for EtcdElectionClient { let mut ticker = time::interval(Duration::from_secs(1)); + // timeout controller, when keep alive fails for more than a certain period of time + // before it is considered a complete failure + let mut timeout = time::interval(Duration::from_secs((ttl / 2) as u64)); + timeout.reset(); + loop { tokio::select! { biased; + _ = timeout.tick() => { + tracing::warn!("lease {} keep alive timeout", lease_id); + keep_alive_fail_tx.send(()).unwrap(); + break; + } + _ = ticker.tick() => { if let Err(err) = keeper.keep_alive().await { - tracing::error!("keep alive for lease {} failed {}", lease_id, err); - keep_alive_fail_tx.send(()).unwrap(); - break; + tracing::warn!("keep alive for lease {} failed {}", lease_id, err); + continue } match resp_stream.message().await { @@ -164,16 +174,23 @@ impl ElectionClient for EtcdElectionClient { keep_alive_fail_tx.send(()).unwrap(); break; } + + timeout.reset(); }, Ok(None) => { tracing::warn!("lease keeper for lease {} response stream closed unexpected", lease_id); - keep_alive_fail_tx.send(()).unwrap(); - break; + + // try to re-create lease keeper, with timeout as ttl / 2 + if let Ok(Ok((keeper_, resp_stream_))) = time::timeout(Duration::from_secs((ttl / 2) as u64), lease_client.keep_alive(lease_id)).await { + keeper = keeper_; + resp_stream = resp_stream_; + }; + + continue; } Err(e) => { tracing::error!("lease keeper failed {}", e.to_string()); - keep_alive_fail_tx.send(()).unwrap(); - break; + continue; } }; } @@ -224,7 +241,15 @@ impl ElectionClient for EtcdElectionClient { }, resp = observe_stream.next() => { match resp { - None => unreachable!(), + None => { + tracing::warn!("observe stream closed unexpected, recreating"); + + // try to re-create observe stream, with timeout as ttl / 2 + if let Ok(Ok(stream)) = time::timeout(Duration::from_secs((ttl / 2) as u64), election_client.observe(META_ELECTION_KEY)).await { + observe_stream = stream; + tracing::info!("recreating observe stream"); + } + } Some(Ok(leader)) => { if let Some(kv) = leader.kv() && kv.value() != self.id.as_bytes() { tracing::warn!("leader has been changed to {}", String::from_utf8_lossy(kv.value()).to_string()); @@ -232,8 +257,8 @@ impl ElectionClient for EtcdElectionClient { } } Some(Err(e)) => { - tracing::error!("error {} received from leader observe stream", e.to_string()); - break; + tracing::warn!("error {} received from leader observe stream", e.to_string()); + continue } } } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index a75516764e18..0ef8842f20a4 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -62,6 +62,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tokio-retry = "0.3" tracing = "0.1" +xorf = "0.8.1" xxhash-rust = { version = "0.8.5", features = ["xxh32", "xxh64"] } zstd = "0.11.2" diff --git a/src/storage/backup/src/meta_snapshot.rs b/src/storage/backup/src/meta_snapshot.rs index 1193cc954f92..3fabbba54128 100644 --- a/src/storage/backup/src/meta_snapshot.rs +++ b/src/storage/backup/src/meta_snapshot.rs @@ -13,13 +13,14 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::{Display, Formatter}; use bytes::{Buf, BufMut}; use itertools::Itertools; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{Database, Function, Index, Schema, Sink, Source, Table, View}; use risingwave_pb::hummock::{CompactionGroup, HummockVersion, HummockVersionStats}; -use risingwave_pb::meta::TableFragments; +use risingwave_pb::meta::{SystemParams, TableFragments}; use risingwave_pb::user::UserInfo; use crate::error::{BackupError, BackupResult}; @@ -27,6 +28,7 @@ use crate::{xxhash64_checksum, xxhash64_verify, MetaSnapshotId}; #[derive(Debug, Default, Clone, PartialEq)] pub struct MetaSnapshot { + pub format_version: u32, pub id: MetaSnapshotId, /// Snapshot of meta store. pub metadata: ClusterMetadata, @@ -35,6 +37,7 @@ pub struct MetaSnapshot { impl MetaSnapshot { pub fn encode(&self) -> Vec { let mut buf = vec![]; + buf.put_u32_le(self.format_version); buf.put_u64_le(self.id); self.metadata.encode_to(&mut buf); let checksum = xxhash64_checksum(&buf); @@ -45,25 +48,68 @@ impl MetaSnapshot { pub fn decode(mut buf: &[u8]) -> BackupResult { let checksum = (&buf[buf.len() - 8..]).get_u64_le(); xxhash64_verify(&buf[..buf.len() - 8], checksum)?; + let format_version = buf.get_u32_le(); let id = buf.get_u64_le(); let metadata = ClusterMetadata::decode(buf)?; - Ok(Self { id, metadata }) + Ok(Self { + format_version, + id, + metadata, + }) } } +impl Display for MetaSnapshot { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "format_version: {}", self.format_version)?; + writeln!(f, "id: {}", self.id)?; + writeln!(f, "default_cf:")?; + for (k, v) in &self.metadata.default_cf { + let key = String::from_utf8(k.clone()).unwrap(); + writeln!(f, "{} {:x?}", key, v)?; + } + writeln!(f, "hummock_version:")?; + writeln!(f, "{:#?}", self.metadata.hummock_version)?; + writeln!(f, "version_stats:")?; + writeln!(f, "{:#?}", self.metadata.version_stats)?; + writeln!(f, "compaction_groups:")?; + writeln!(f, "{:#?}", self.metadata.compaction_groups)?; + writeln!(f, "database:")?; + writeln!(f, "{:#?}", self.metadata.database)?; + writeln!(f, "schema:")?; + writeln!(f, "{:#?}", self.metadata.schema)?; + writeln!(f, "table:")?; + writeln!(f, "{:#?}", self.metadata.table)?; + writeln!(f, "index:")?; + writeln!(f, "{:#?}", self.metadata.index)?; + writeln!(f, "sink:")?; + writeln!(f, "{:#?}", self.metadata.sink)?; + writeln!(f, "source:")?; + writeln!(f, "{:#?}", self.metadata.source)?; + writeln!(f, "view:")?; + writeln!(f, "{:#?}", self.metadata.view)?; + writeln!(f, "table_fragments:")?; + writeln!(f, "{:#?}", self.metadata.table_fragments)?; + writeln!(f, "user_info:")?; + writeln!(f, "{:#?}", self.metadata.user_info)?; + writeln!(f, "function:")?; + writeln!(f, "{:#?}", self.metadata.function)?; + writeln!(f, "system_param:")?; + writeln!(f, "{:#?}", self.metadata.system_param)?; + Ok(()) + } +} + +/// For backward compatibility, never remove fields and only append new field. #[derive(Debug, Default, Clone, PartialEq)] pub struct ClusterMetadata { /// Unlike other metadata that has implemented `MetadataModel`, /// DEFAULT_COLUMN_FAMILY stores various single row metadata, e.g. id offset and epoch offset. /// So we use `default_cf` stores raw KVs for them. pub default_cf: HashMap, Vec>, - - /// Hummock metadata pub hummock_version: HummockVersion, pub version_stats: HummockVersionStats, pub compaction_groups: Vec, - - /// Catalog metadata pub database: Vec, pub schema: Vec, pub table: Vec, @@ -71,9 +117,10 @@ pub struct ClusterMetadata { pub sink: Vec, pub source: Vec, pub view: Vec, - pub table_fragments: Vec, pub user_info: Vec, + pub function: Vec, + pub system_param: SystemParams, } impl ClusterMetadata { @@ -94,6 +141,8 @@ impl ClusterMetadata { Self::encode_prost_message_list(&self.sink.iter().collect_vec(), buf); Self::encode_prost_message_list(&self.source.iter().collect_vec(), buf); Self::encode_prost_message_list(&self.view.iter().collect_vec(), buf); + Self::encode_prost_message_list(&self.function.iter().collect_vec(), buf); + Self::encode_prost_message(&self.system_param, buf); } pub fn decode(mut buf: &[u8]) -> BackupResult { @@ -115,6 +164,8 @@ impl ClusterMetadata { let sink: Vec = Self::decode_prost_message_list(&mut buf)?; let source: Vec = Self::decode_prost_message_list(&mut buf)?; let view: Vec = Self::decode_prost_message_list(&mut buf)?; + let function: Vec = Self::decode_prost_message_list(&mut buf)?; + let system_param: SystemParams = Self::decode_prost_message(&mut buf)?; Ok(Self { default_cf, @@ -130,6 +181,8 @@ impl ClusterMetadata { view, table_fragments, user_info, + function, + system_param, }) } @@ -180,7 +233,11 @@ mod tests { fn test_snapshot_encoding_decoding() { let mut metadata = ClusterMetadata::default(); metadata.hummock_version.id = 321; - let raw = MetaSnapshot { id: 123, metadata }; + let raw = MetaSnapshot { + format_version: 0, + id: 123, + metadata, + }; let encoded = raw.encode(); let decoded = MetaSnapshot::decode(&encoded).unwrap(); assert_eq!(raw, decoded); diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index a10ae0d91bfb..b65bb7d188e5 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -28,9 +28,9 @@ use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStor use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ - BatchSstableWriterFactory, BloomFilterBuilder, CachePolicy, CompressionAlgorithm, - HummockResult, MemoryLimiter, SstableBuilder, SstableBuilderOptions, SstableStore, - SstableWriterFactory, SstableWriterOptions, StreamingSstableWriterFactory, TieredCache, + BatchSstableWriterFactory, CachePolicy, CompressionAlgorithm, HummockResult, MemoryLimiter, + SstableBuilder, SstableBuilderOptions, SstableStore, SstableWriterFactory, + SstableWriterOptions, StreamingSstableWriterFactory, TieredCache, XorFilterBuilder, }; use risingwave_storage::monitor::ObjectStoreMetrics; @@ -61,7 +61,7 @@ impl LocalTableBuilderFactory { #[async_trait::async_trait] impl TableBuilderFactory for LocalTableBuilderFactory { - type Filter = BloomFilterBuilder; + type Filter = XorFilterBuilder; type Writer = ::Writer; async fn open_builder(&mut self) -> HummockResult> { diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 655d5d5ccea9..966e1bd8a0ef 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -25,6 +25,8 @@ use crate::HummockEpoch; pub const EPOCH_LEN: usize = std::mem::size_of::(); pub const TABLE_PREFIX_LEN: usize = std::mem::size_of::(); +// Max length for key overlap and diff length. See KeyPrefix::encode. +pub const MAX_KEY_LEN: usize = u16::MAX as usize; pub type TableKeyRange = (Bound>>, Bound>>); pub type UserKeyRange = (Bound>>, Bound>>); diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 1b7fb93919a7..79976dc420a6 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -42,6 +42,9 @@ struct SstableStreamIterator { /// Counts the time used for IO. stats_ptr: Arc, + + // For debugging + sstable_info: SstableInfo, } impl SstableStreamIterator { @@ -60,6 +63,7 @@ impl SstableStreamIterator { /// Initialises a new [`SstableStreamIterator`] which iterates over the given [`BlockStream`]. /// The iterator reads at most `max_block_count` from the stream. pub fn new( + sstable_info: &SstableInfo, block_stream: BlockStream, max_block_count: usize, stats: &StoreLocalStatistic, @@ -69,6 +73,7 @@ impl SstableStreamIterator { block_iter: None, remaining_blocks: max_block_count, stats_ptr: stats.remote_io_time.clone(), + sstable_info: sstable_info.clone(), } } @@ -152,18 +157,33 @@ impl SstableStreamIterator { } fn key(&self) -> &[u8] { - self.block_iter.as_ref().expect("no block iter").key() + self.block_iter + .as_ref() + .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info())) + .key() } fn value(&self) -> HummockValue<&[u8]> { - let raw_value = self.block_iter.as_ref().expect("no block iter").value(); - HummockValue::from_slice(raw_value).expect("decode error") + let raw_value = self + .block_iter + .as_ref() + .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info())) + .value(); + HummockValue::from_slice(raw_value) + .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info())) } fn is_valid(&self) -> bool { // True iff block_iter exists and is valid. self.block_iter.as_ref().map_or(false, |i| i.is_valid()) } + + fn sst_debug_info(&self) -> String { + format!( + "sst_id={}, meta_offset={}, table_ids={:?}", + self.sstable_info.id, self.sstable_info.meta_offset, self.sstable_info.table_ids + ) + } } /// Iterates over the KV-pairs of a given list of SSTs. The key-ranges of these SSTs are assumed to @@ -264,8 +284,12 @@ impl ConcatSstableIterator { let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed); - let mut sstable_iter = - SstableStreamIterator::new(block_stream, end_index - start_index, &self.stats); + let mut sstable_iter = SstableStreamIterator::new( + table_info, + block_stream, + end_index - start_index, + &self.stats, + ); sstable_iter.seek(seek_key).await?; self.sstable_iter = Some(sstable_iter); diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 7a2d35fc8f8f..44749bc5412a 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -53,7 +53,7 @@ use tokio::task::JoinHandle; pub use self::compaction_utils::{CompactionStatistics, RemoteBuilderFactory, TaskConfig}; use self::task_progress::TaskProgress; use super::multi_builder::CapacitySplitTableBuilder; -use super::{HummockResult, SstableBuilderOptions}; +use super::{HummockResult, SstableBuilderOptions, XorFilterBuilder}; use crate::hummock::compactor::compaction_utils::{ build_multi_compaction_filter, estimate_memory_use_for_compaction, generate_splits, }; @@ -63,8 +63,8 @@ use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::{SplitTableOutput, TableBuilderFactory}; use crate::hummock::vacuum::Vacuum; use crate::hummock::{ - validate_ssts, BatchSstableWriterFactory, BloomFilterBuilder, DeleteRangeAggregator, - HummockError, RangeTombstonesCollector, SstableWriterFactory, StreamingSstableWriterFactory, + validate_ssts, BatchSstableWriterFactory, DeleteRangeAggregator, HummockError, + RangeTombstonesCollector, SstableWriterFactory, StreamingSstableWriterFactory, }; use crate::monitor::{CompactorMetrics, StoreLocalStatistic}; @@ -729,7 +729,7 @@ impl Compactor { filter_key_extractor: Arc, task_progress: Option>, ) -> HummockResult<(Vec, CompactionStatistics)> { - let builder_factory = RemoteBuilderFactory:: { + let builder_factory = RemoteBuilderFactory:: { sstable_id_manager: self.context.sstable_id_manager.clone(), limiter: self.context.read_memory_limiter.clone(), options: self.options.clone(), diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index cb0b6e901970..4b577640f273 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -350,7 +350,7 @@ pub async fn get_from_sstable_info( sstable_info: &SstableInfo, full_key: FullKey<&[u8]>, read_options: &ReadOptions, - dist_key_hash: Option, + dist_key_hash: Option, local_stats: &mut StoreLocalStatistic, ) -> HummockResult>> { let sstable = sstable_store_ref.sstable(sstable_info, local_stats).await?; @@ -413,7 +413,7 @@ pub async fn get_from_sstable_info( pub fn hit_sstable_bloom_filter( sstable_info_ref: &Sstable, - prefix_hash: u32, + prefix_hash: u64, local_stats: &mut StoreLocalStatistic, ) -> bool { local_stats.bloom_filter_check_counts += 1; diff --git a/src/storage/src/hummock/sstable/block.rs b/src/storage/src/hummock/sstable/block.rs index 582e4577498b..a4dc167d83a9 100644 --- a/src/storage/src/hummock/sstable/block.rs +++ b/src/storage/src/hummock/sstable/block.rs @@ -17,10 +17,11 @@ use std::io::{Read, Write}; use std::ops::Range; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use risingwave_hummock_sdk::key::MAX_KEY_LEN; use risingwave_hummock_sdk::KeyComparator; use {lz4, zstd}; -use super::utils::{bytes_diff, xxhash64_verify, CompressionAlgorithm}; +use super::utils::{bytes_diff_below_max_key_length, xxhash64_verify, CompressionAlgorithm}; use crate::hummock::sstable::utils::xxhash64_checksum; use crate::hummock::{HummockError, HummockResult}; @@ -150,13 +151,21 @@ pub struct KeyPrefix { impl KeyPrefix { pub fn encode(&self, buf: &mut impl BufMut) { buf.put_u16(self.overlap as u16); - buf.put_u16(self.diff as u16); + if self.diff >= MAX_KEY_LEN { + buf.put_u16(MAX_KEY_LEN as u16); + buf.put_u32(self.diff as u32); + } else { + buf.put_u16(self.diff as u16); + } buf.put_u32(self.value as u32); } pub fn decode(buf: &mut impl Buf, offset: usize) -> Self { let overlap = buf.get_u16() as usize; - let diff = buf.get_u16() as usize; + let mut diff = buf.get_u16() as usize; + if diff == MAX_KEY_LEN { + diff = buf.get_u32() as usize; + } let value = buf.get_u32() as usize; Self { overlap, @@ -168,7 +177,11 @@ impl KeyPrefix { /// Encoded length. fn len(&self) -> usize { - 2 + 2 + 4 + if self.diff >= MAX_KEY_LEN { + 12 // 2 + 2 + 4 + 4 + } else { + 8 // 2 + 2 + 4 + } } /// Gets overlap len. @@ -249,7 +262,10 @@ impl BlockBuilder { /// # Format /// /// ```plain - /// entry (kv pair): | overlap len (2B) | diff len (2B) | value len(4B) | diff key | value | + /// For diff len < MAX_KEY_LEN (65536) + /// entry (kv pair): | overlap len (2B) | diff len (2B) | value len(4B) | diff key | value | + /// For diff len >= MAX_KEY_LEN (65536) + /// entry (kv pair): | overlap len (2B) | MAX_KEY_LEN (2B) | diff len (4B) | value len(4B) | diff key | value | /// ``` /// /// # Panics @@ -268,7 +284,7 @@ impl BlockBuilder { self.restart_points.push(self.buf.len() as u32); key } else { - bytes_diff(&self.last_key, key) + bytes_diff_below_max_key_length(&self.last_key, key) }; let prefix = KeyPrefix { @@ -465,4 +481,39 @@ mod tests { buf.put_u64(!epoch); buf.freeze() } + + #[test] + fn test_block_enc_large_key() { + let options = BlockBuilderOptions::default(); + let mut builder = BlockBuilder::new(options); + let medium_key = vec![b'a'; MAX_KEY_LEN - 500]; + let large_key = vec![b'b'; MAX_KEY_LEN]; + let xlarge_key = vec![b'c'; MAX_KEY_LEN + 500]; + + builder.add(&full_key(&medium_key, 1), b"v1"); + builder.add(&full_key(&large_key, 2), b"v2"); + builder.add(&full_key(&xlarge_key, 3), b"v3"); + let capacity = builder.uncompressed_block_size(); + let buf = builder.build().to_vec(); + let block = Box::new(Block::decode(buf.into(), capacity).unwrap()); + let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block)); + + bi.seek_to_first(); + assert!(bi.is_valid()); + assert_eq!(&full_key(&medium_key, 1)[..], bi.key()); + assert_eq!(b"v1", bi.value()); + + bi.next(); + assert!(bi.is_valid()); + assert_eq!(&full_key(&large_key, 2)[..], bi.key()); + assert_eq!(b"v2", bi.value()); + + bi.next(); + assert!(bi.is_valid()); + assert_eq!(&full_key(&xlarge_key, 3)[..], bi.key()); + assert_eq!(b"v3", bi.value()); + + bi.next(); + assert!(!bi.is_valid()); + } } diff --git a/src/storage/src/hummock/sstable/bloom.rs b/src/storage/src/hummock/sstable/bloom.rs index 8d5045dfdc9c..9f58d48ba8fb 100644 --- a/src/storage/src/hummock/sstable/bloom.rs +++ b/src/storage/src/hummock/sstable/bloom.rs @@ -55,6 +55,7 @@ impl> BitSliceMut for T { } /// Bloom implements Bloom filter functionalities over a bit-slice of data. +#[allow(dead_code)] #[derive(Clone)] pub struct BloomFilterReader { /// data of filter in bits @@ -65,6 +66,7 @@ pub struct BloomFilterReader { impl BloomFilterReader { /// Creates a Bloom filter from a byte slice + #[allow(dead_code)] pub fn new(mut buf: Vec) -> Self { if buf.len() <= 1 { return Self { data: vec![], k: 0 }; @@ -74,10 +76,12 @@ impl BloomFilterReader { Self { data: buf, k } } + #[allow(dead_code)] pub fn is_empty(&self) -> bool { self.data.is_empty() } + #[allow(dead_code)] pub fn get_raw_data(&self) -> &[u8] { &self.data } @@ -89,6 +93,7 @@ impl BloomFilterReader { /// the hash; /// - if the return value is true, then the table may or may not have the user key that has /// the hash actually, a.k.a. we don't know the answer. + #[allow(dead_code)] pub fn may_match(&self, mut h: u32) -> bool { if self.k > 30 || self.k == 00 { // potential new encoding for short Bloom filters @@ -138,7 +143,7 @@ pub fn bloom_bits_per_key(entries: usize, false_positive_rate: f64) -> usize { impl FilterBuilder for BloomFilterBuilder { fn add_key(&mut self, key: &[u8], table_id: u32) { self.key_hash_entries - .push(Sstable::hash_for_bloom_filter(key, table_id)); + .push(Sstable::hash_for_bloom_filter_u32(key, table_id)); } fn approximate_len(&self) -> usize { diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index d298166011fd..313c7fee698c 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::filter_key_extractor::{ FilterKeyExtractorImpl, FullKeyFilterKeyExtractor, }; -use risingwave_hummock_sdk::key::{user_key, FullKey}; +use risingwave_hummock_sdk::key::{user_key, FullKey, MAX_KEY_LEN}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; use risingwave_pb::hummock::SstableInfo; @@ -30,7 +30,7 @@ use super::{ BlockBuilder, BlockBuilderOptions, BlockMeta, SstableMeta, SstableWriter, DEFAULT_BLOCK_SIZE, DEFAULT_ENTRY_SIZE, DEFAULT_RESTART_INTERVAL, VERSION, }; -use crate::hummock::sstable::{BloomFilterBuilder, FilterBuilder}; +use crate::hummock::sstable::{FilterBuilder, XorFilterBuilder}; use crate::hummock::value::HummockValue; use crate::hummock::{DeleteRangeTombstone, HummockResult}; use crate::opts::StorageOpts; @@ -118,15 +118,12 @@ pub struct SstableBuilder { filter_builder: F, } -impl SstableBuilder { +impl SstableBuilder { pub fn for_test(sstable_id: u64, writer: W, options: SstableBuilderOptions) -> Self { Self::new( sstable_id, writer, - BloomFilterBuilder::new( - options.bloom_false_positive, - options.capacity / DEFAULT_ENTRY_SIZE + 1, - ), + XorFilterBuilder::new(options.capacity / DEFAULT_ENTRY_SIZE + 1), options, Arc::new(FilterKeyExtractorImpl::FullKey( FullKeyFilterKeyExtractor::default(), @@ -189,8 +186,21 @@ impl SstableBuilder { value: HummockValue<&[u8]>, is_new_user_key: bool, ) -> HummockResult<()> { + const LARGE_KEY_LEN: usize = MAX_KEY_LEN >> 1; + let mut is_new_table = false; + let table_key_len = full_key.user_key.table_key.as_ref().len(); + if table_key_len >= LARGE_KEY_LEN { + let table_id = full_key.user_key.table_id.table_id(); + tracing::warn!( + "A large key (table_id={}, len={}, epoch={}) is added to block", + table_id, + table_key_len, + full_key.epoch + ); + } + // TODO: refine me full_key.encode_into(&mut self.raw_key); value.encode(&mut self.raw_value); diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index 418ece1e6a25..14a7f472f2c0 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -24,8 +24,10 @@ pub use block::*; mod block_iterator; pub use block_iterator::*; mod bloom; +mod xor_filter; pub use bloom::BloomFilterBuilder; -use bloom::BloomFilterReader; +pub use xor_filter::XorFilterBuilder; +use xor_filter::XorFilterReader; pub mod builder; pub use builder::*; pub mod writer; @@ -56,7 +58,7 @@ pub use filter::FilterBuilder; pub use sstable_id_manager::*; pub use utils::CompressionAlgorithm; use utils::{get_length_prefixed_slice, put_length_prefixed_slice}; -use xxhash_rust::xxh32; +use xxhash_rust::{xxh32, xxh64}; use self::utils::{xxhash64_checksum, xxhash64_verify}; use super::{HummockError, HummockResult}; @@ -125,7 +127,7 @@ impl DeleteRangeTombstone { pub struct Sstable { pub id: HummockSstableId, pub meta: SstableMeta, - pub filter_reader: BloomFilterReader, + pub filter_reader: XorFilterReader, } impl Debug for Sstable { @@ -140,7 +142,7 @@ impl Debug for Sstable { impl Sstable { pub fn new(id: HummockSstableId, mut meta: SstableMeta) -> Self { let filter_data = std::mem::take(&mut meta.bloom_filter); - let filter_reader = BloomFilterReader::new(filter_data); + let filter_reader = XorFilterReader::new(filter_data); Self { id, meta, @@ -148,6 +150,7 @@ impl Sstable { } } + #[inline(always)] pub fn has_bloom_filter(&self) -> bool { !self.filter_reader.is_empty() } @@ -158,7 +161,7 @@ impl Sstable { true }; if enable_bloom_filter() && self.has_bloom_filter() { - let hash = xxh32::xxh32(dist_key, 0); + let hash = xxh64::xxh64(dist_key, 0); self.may_match_hash(hash) } else { true @@ -166,13 +169,21 @@ impl Sstable { } #[inline(always)] - pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u32 { + pub fn hash_for_bloom_filter_u32(dist_key: &[u8], table_id: u32) -> u32 { let dist_key_hash = xxh32::xxh32(dist_key, 0); + // congyi adds this because he aims to dedup keys in different tables table_id.bitxor(dist_key_hash) } #[inline(always)] - pub fn may_match_hash(&self, hash: u32) -> bool { + pub fn hash_for_bloom_filter(dist_key: &[u8], table_id: u32) -> u64 { + let dist_key_hash = xxh64::xxh64(dist_key, 0); + // congyi adds this because he aims to dedup keys in different tables + (table_id as u64).bitxor(dist_key_hash) + } + + #[inline(always)] + pub fn may_match_hash(&self, hash: u64) -> bool { self.filter_reader.may_match(hash) } @@ -182,7 +193,7 @@ impl Sstable { #[inline] pub fn estimate_size(&self) -> usize { - 8 /* id */ + self.meta.encoded_size() + 8 /* id */ + self.filter_reader.estimate_size() + self.meta.encoded_size() } #[cfg(test)] diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 75c3144f0c30..6d6e4900e8c1 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -26,9 +26,9 @@ use crate::hummock::sstable::filter::FilterBuilder; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::value::HummockValue; use crate::hummock::{ - BatchUploadWriter, BloomFilterBuilder, CachePolicy, DeleteRangeTombstone, HummockResult, - MemoryLimiter, RangeTombstonesCollector, SstableBuilder, SstableBuilderOptions, SstableWriter, - SstableWriterOptions, + BatchUploadWriter, CachePolicy, DeleteRangeTombstone, HummockResult, MemoryLimiter, + RangeTombstonesCollector, SstableBuilder, SstableBuilderOptions, SstableWriter, + SstableWriterOptions, XorFilterBuilder, }; use crate::monitor::CompactorMetrics; @@ -253,12 +253,12 @@ impl LocalTableBuilderFactory { #[async_trait::async_trait] impl TableBuilderFactory for LocalTableBuilderFactory { - type Filter = BloomFilterBuilder; + type Filter = XorFilterBuilder; type Writer = BatchUploadWriter; async fn open_builder( &mut self, - ) -> HummockResult> { + ) -> HummockResult> { let id = self.next_id.fetch_add(1, SeqCst); let tracker = self.limiter.require_memory(1).await; let writer_options = SstableWriterOptions { diff --git a/src/storage/src/hummock/sstable/utils.rs b/src/storage/src/hummock/sstable/utils.rs index 8d42bcb4f70c..5abd76dbd075 100644 --- a/src/storage/src/hummock/sstable/utils.rs +++ b/src/storage/src/hummock/sstable/utils.rs @@ -14,9 +14,9 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. -use std::cmp::{self}; use std::ptr; +use risingwave_hummock_sdk::key::MAX_KEY_LEN; use xxhash_rust::xxh64; use super::{HummockError, HummockResult}; @@ -30,8 +30,8 @@ unsafe fn u32(ptr: *const u8) -> u32 { } #[inline] -pub fn bytes_diff<'a>(base: &[u8], target: &'a [u8]) -> &'a [u8] { - let end = cmp::min(base.len(), target.len()); +pub fn bytes_diff_below_max_key_length<'a>(base: &[u8], target: &'a [u8]) -> &'a [u8] { + let end = base.len().min(target.len()).min(MAX_KEY_LEN); let mut i = 0; unsafe { while i + 8 <= end { diff --git a/src/storage/src/hummock/sstable/writer.rs b/src/storage/src/hummock/sstable/writer.rs index b650ea474886..95eedba42dee 100644 --- a/src/storage/src/hummock/sstable/writer.rs +++ b/src/storage/src/hummock/sstable/writer.rs @@ -100,7 +100,7 @@ mod tests { } let meta = SstableMeta { block_metas, - bloom_filter: Vec::new(), + bloom_filter: vec![], estimated_size: 0, key_count: 0, smallest_key: Vec::new(), diff --git a/src/storage/src/hummock/sstable/xor_filter.rs b/src/storage/src/hummock/sstable/xor_filter.rs new file mode 100644 index 000000000000..ea6a678b2710 --- /dev/null +++ b/src/storage/src/hummock/sstable/xor_filter.rs @@ -0,0 +1,140 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use bytes::{Buf, BufMut}; +use itertools::Itertools; +use xorf::{Filter, Xor16}; + +use super::{FilterBuilder, Sstable}; + +pub struct XorFilterBuilder { + key_hash_entries: Vec, +} + +impl XorFilterBuilder { + pub fn new(capacity: usize) -> Self { + let key_hash_entries = if capacity > 0 { + Vec::with_capacity(capacity) + } else { + vec![] + }; + Self { key_hash_entries } + } +} + +impl FilterBuilder for XorFilterBuilder { + fn add_key(&mut self, key: &[u8], table_id: u32) { + self.key_hash_entries + .push(Sstable::hash_for_bloom_filter(key, table_id)); + } + + fn approximate_len(&self) -> usize { + self.key_hash_entries.len() * 4 + } + + fn finish(&mut self) -> Vec { + let xor_filter = Xor16::from( + &HashSet::::from_iter(std::mem::take(&mut self.key_hash_entries).into_iter()) + .into_iter() + .collect_vec(), + ); + let mut buf = Vec::with_capacity(8 + 4 + xor_filter.fingerprints.len() * 2 + 1); + buf.put_u64_le(xor_filter.seed); + buf.put_u32_le(xor_filter.block_length as u32); + xor_filter + .fingerprints + .iter() + .for_each(|x| buf.put_u16_le(*x)); + // We add an extra byte so we can distinguish bloom filter and xor filter by the last + // byte(255 indicates a xor filter and others indicate a bloom filter). + buf.put_u8(255); + buf + } + + fn create(_fpr: f64, capacity: usize) -> Self { + XorFilterBuilder::new(capacity) + } +} + +pub struct XorFilterReader { + filter: Xor16, +} + +impl XorFilterReader { + /// Creates an xor filter from a byte slice + pub fn new(buf: Vec) -> Self { + if buf.len() <= 1 { + return Self { + filter: Xor16 { + seed: 0, + block_length: 0, + fingerprints: vec![].into_boxed_slice(), + }, + }; + } + let buf = &mut &buf[..]; + let xor_filter_seed = buf.get_u64_le(); + let xor_filter_block_length = buf.get_u32_le(); + // is correct even when there is an extra 0xff byte in the end of buf + let len = buf.len() / 2; + let xor_filter_fingerprints = (0..len) + .map(|_| buf.get_u16_le()) + .collect_vec() + .into_boxed_slice(); + Self { + filter: Xor16 { + seed: xor_filter_seed, + block_length: xor_filter_block_length as usize, + fingerprints: xor_filter_fingerprints, + }, + } + } + + pub fn estimate_size(&self) -> usize { + self.filter.fingerprints.len() * std::mem::size_of::() + } + + pub fn is_empty(&self) -> bool { + self.filter.block_length == 0 + } + + /// Judges whether the hash value is in the table with the given false positive rate. + /// + /// Note: + /// - if the return value is false, then the table surely does not have the user key that has + /// the hash; + /// - if the return value is true, then the table may or may not have the user key that has + /// the hash actually, a.k.a. we don't know the answer. + pub fn may_match(&self, h: u64) -> bool { + if self.is_empty() { + true + } else { + self.filter.contains(&h) + } + } +} + +impl Clone for XorFilterReader { + fn clone(&self) -> Self { + Self { + filter: Xor16 { + seed: self.filter.seed, + block_length: self.filter.block_length, + fingerprints: self.filter.fingerprints.clone(), + }, + } + } +} diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 98c1d3085d15..272764a83f4f 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -346,7 +346,7 @@ impl SstableStore { .map_err(HummockError::object_io_error)?; let meta = SstableMeta::decode(&mut &buf[..])?; let sst = Sstable::new(sst_id, meta); - let charge = sst.meta.encoded_size(); + let charge = sst.estimate_size(); let add = (now.elapsed().as_secs_f64() * 1000.0).ceil(); stats_ptr.fetch_add(add as u64, Ordering::Relaxed); Ok((Box::new(sst), charge)) @@ -871,17 +871,10 @@ mod tests { ) { let mut stats = StoreLocalStatistic::default(); let holder = sstable_store.sstable(info, &mut stats).await.unwrap(); - let mut filter_data = std::mem::take(&mut meta.bloom_filter); - if !filter_data.is_empty() { - filter_data.pop(); - } + std::mem::take(&mut meta.bloom_filter); assert_eq!(holder.value().meta, meta); let holder = sstable_store.sstable(info, &mut stats).await.unwrap(); assert_eq!(holder.value().meta, meta); - assert_eq!( - filter_data.as_slice(), - holder.value().filter_reader.get_raw_data() - ); let mut iter = SstableIterator::new( holder, sstable_store, diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 52b18d3015fd..13c970654dc7 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -313,6 +313,15 @@ where // `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to // scan. Otherwise, use Excluded. let range_bounds = if let Some(current_pos) = current_pos { + // If `current_pos` is an empty row which means upstream mv contains only one row and it + // has been consumed. The iter interface doesn't support + // `Excluded(empty_row)` range bound, so we can simply return `None`. + if current_pos.is_empty() { + assert!(table.pk_indices().is_empty()); + yield None; + return Ok(()); + } + (Bound::Excluded(current_pos), Bound::Unbounded) } else { (Bound::Unbounded, Bound::Unbounded)