Skip to content

Commit

Permalink
Merge branch 'v0.1.17-rc' into release-v0.1.17
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Feb 28, 2023
2 parents 65e1361 + eb2a946 commit 1576866
Show file tree
Hide file tree
Showing 40 changed files with 836 additions and 216 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ sleep 1
# prepare minio iceberg sink
echo "--- preparing iceberg"
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/iceberg
wget https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner
DEPENDENCIES=org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2
spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
Expand Down
14 changes: 14 additions & 0 deletions e2e_test/batch/types/array_ty.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ select max(ARRAY[1, v1*2]) from t;
----
{1,6}

query T
select CAST(NULL as bool[]) from t;
----
NULL
NULL
NULL

query T
select array[false, false] from t;
----
{f,f}
{f,f}
{f,f}

statement ok
drop table t;

Expand Down
1 change: 0 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ profile:
- use: kafka
persist-data: true


3etcd-3meta:
steps:
- use: etcd
Expand Down
8 changes: 7 additions & 1 deletion src/common/src/array/column_proto_readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub fn read_numeric_array<T: PrimitiveArrayItemType, R: PrimitiveValueReader<T>>
}
}
let arr = builder.finish();
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}

Expand All @@ -68,7 +70,7 @@ pub fn read_bool_array(array: &ProstArray, cardinality: usize) -> ArrayResult<Ar
let bitmap: Bitmap = array.get_null_bitmap()?.into();

let arr = BoolArray::new(data, bitmap);
assert_eq!(arr.len(), cardinality);
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}
Expand Down Expand Up @@ -133,6 +135,8 @@ macro_rules! read_one_value_array {
}
}
let arr = builder.finish();
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}
)*
Expand Down Expand Up @@ -195,5 +199,7 @@ pub fn read_string_array<B: ArrayBuilder, R: VarSizedValueReader<B>>(
}
}
let arr = builder.finish();
ensure_eq!(arr.len(), cardinality);

Ok(arr.into())
}
2 changes: 1 addition & 1 deletion src/common/src/array/decimal_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {

assert_eq!(prost_array.values.len(), 1);

let decoded_array = ArrayImpl::from_protobuf(&prost_array, 4)
let decoded_array = ArrayImpl::from_protobuf(&prost_array, 8)
.unwrap()
.into_decimal();

Expand Down
19 changes: 14 additions & 5 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use crate::types::{
DatumRef, Scalar, ScalarRefImpl, ToDatumRef,
};

/// This is a naive implementation of list array.
/// We will eventually move to a more efficient flatten implementation.
#[derive(Debug)]
pub struct ListArrayBuilder {
bitmap: BitmapBuilder,
Expand Down Expand Up @@ -146,8 +144,15 @@ impl ListArrayBuilder {
}
}

/// This is a naive implementation of list array.
/// We will eventually move to a more efficient flatten implementation.
/// Each item of this `ListArray` is a `List<T>`, or called `T[]` (T array).
///
/// * As other arrays, there is a null bitmap, with `1` meaning nonnull and `0` meaning null.
/// * As [`BytesArray`], there is an offsets `Vec` and a value `Array`. The value `Array` has all
/// items concatenated, and the offsets `Vec` stores start and end indices into it for slicing.
/// Effectively, the inner array is the flattened form, and `offsets.len() == n + 1`.
///
/// For example, `values (array[1]), (array[]::int[]), (null), (array[2, 3]);` stores an inner
/// `I32Array` with `[1, 2, 3]`, along with offsets `[0, 1, 1, 1, 3]` and null bitmap `TTFT`.
#[derive(Debug, Clone, PartialEq)]
pub struct ListArray {
bitmap: Bitmap,
Expand Down Expand Up @@ -221,7 +226,11 @@ impl ListArray {
);
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let array_data = array.get_list_array_data()?.to_owned();
let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), bitmap.len())?;
let flatten_len = match array_data.offsets.last() {
Some(&n) => n as usize,
None => bail!("Must have at least one element in offsets"),
};
let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), flatten_len)?;
let arr = ListArray {
bitmap,
offsets: array_data.offsets,
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/planner_test/tests/testdata/column_pruning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,18 @@
logical_plan: |
LogicalProject { exprs: [t1.a, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_end] }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
18 changes: 11 additions & 7 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -815,25 +815,29 @@
logical_plan: |
LogicalProject { exprs: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end] }
materialized table: 4294967294
StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(t1.created_at) }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
Table 4294967294 { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id], primary key: [$6 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] }
26 changes: 16 additions & 10 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,16 @@
| └─BatchHashAgg { group_key: [window_start, bid.auction], aggs: [count] }
| └─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] }
| └─BatchExchange { order: [], dist: HashShard(bid.auction) }
| └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
| └─BatchFilter { predicate: IsNotNull(bid.date_time) }
| └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
└─BatchProject { exprs: [max(count), window_start] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(bid.date_time) }
└─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] }
└─StreamProject { exprs: [bid.auction, count, window_start, window_start] }
Expand All @@ -297,15 +299,17 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, bid.auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(bid.auction, window_start) }
| └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
| └─StreamFilter { predicate: IsNotNull(bid.date_time) }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [bid.auction, window_start, count] }
└─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(bid.auction, window_start) }
└─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamFilter { predicate: IsNotNull(bid.date_time) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] }
Expand All @@ -328,9 +332,10 @@
Fragment 2
StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(bid.date_time) }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
Fragment 3
StreamProject { exprs: [bid.auction, window_start, count] }
Expand All @@ -340,9 +345,10 @@
Fragment 4
StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(bid.date_time) }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
Table 0 { columns: [bid_auction, count, window_start], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [2] }
Table 1 { columns: [window_start, bid_auction, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Expand Down
Loading

0 comments on commit 1576866

Please sign in to comment.