Skip to content

Commit

Permalink
Merge branch 'main' into deterministic-fuzz-stability
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 24, 2023
2 parents 9a292c3 + 1e0c0d2 commit 646de18
Show file tree
Hide file tree
Showing 145 changed files with 1,356 additions and 782 deletions.
12 changes: 7 additions & 5 deletions e2e_test/batch/types/jsonb.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,21 @@ select null::jsonb::bool;
----
NULL

# Example of accessing the boolean nested inside object and array
query T
select jsonb_array_element(jsonb_object_field('{"k2":[2,true,4]}', 'k2'), -2)::bool;
select ('{"k2":[2,true,4]}'::jsonb -> 'k2' -> -2)::bool;
----
t

# But for text, avoid cast and use `->>` as the last access operator.
# Note the difference between access text directly vs access jsonb then cast to text.
query TTT
with t(v1) as (values (null::jsonb), ('null'), ('true'), ('1'), ('"a"'), ('[]'), ('{}')),
j(v1) as (select ('{"k":' || v1::varchar || '}')::jsonb from t)
select
jsonb_object_field_text(v1, 'k'),
jsonb_object_field(v1, 'k')::varchar,
jsonb_typeof(jsonb_object_field(v1, 'k'))
v1 ->> 'k',
(v1 -> 'k')::varchar,
jsonb_typeof(v1 -> 'k')
from j order by 2;
----
a "a" string
Expand All @@ -107,7 +109,7 @@ true true boolean
NULL NULL NULL

query T
select jsonb_array_element_text('true'::jsonb, 2);
select 'true'::jsonb ->> 2;
----
NULL

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/streaming/nexmark/views/q5.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ FROM (
FROM
HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY
window_start,
bid.auction
bid.auction,
window_start
) AS AuctionBids
JOIN (
SELECT
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
pub column_id: ColumnId,
Expand Down Expand Up @@ -234,7 +234,7 @@ impl From<&ColumnDesc> for ProstColumnDesc {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnCatalog {
pub column_desc: ColumnDesc,
pub is_hidden: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::util::sort_util::OrderPair;
/// Includes necessary information for compute node to access data of the table.
///
/// It's a subset of `TableCatalog` in frontend. Refer to `TableCatalog` for more details.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct TableDesc {
/// Id of the table, to find in storage.
pub table_id: TableId,
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::array::ArrayBuilderImpl;
use crate::types::DataType;

/// The field in the schema of the executor's return data
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
pub data_type: DataType,
pub name: String,
Expand Down Expand Up @@ -112,7 +112,7 @@ macro_rules! schema_unnamed {
}

/// the schema of the executor's return data
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Schema {
pub fields: Vec<Field>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl PartialOrd for IntervalUnit {
Some(Ordering::Equal)
} else {
let diff = *self - *other;
let days = (diff.months * 30 + diff.days) as i64;
let days = diff.months as i64 * 30 + diff.days as i64;
Some((days * DAY_MS + diff.ms).cmp(&0))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
/// `ColIndexMapping` is a partial mapping from usize to usize.
///
/// It is used in optimizer for transformation of column index.
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ColIndexMapping {
/// The size of the target space, i.e. target index is in the range `(0..target_size)`.
target_size: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum_into;

/// See also [`ScanRangeProst`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/sort_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::array::{Array, ArrayImpl, DataChunk};
use crate::error::ErrorCode::InternalError;
use crate::error::Result;

#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub enum OrderType {
Ascending,
Descending,
Expand All @@ -47,7 +47,7 @@ impl OrderType {
/// Column index with an order type (ASC or DESC). Used to represent a sort key (`Vec<OrderPair>`).
///
/// Corresponds to protobuf [`ColumnOrder`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderPair {
pub column_idx: usize,
pub order_type: OrderType,
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::BTreeMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
Expand All @@ -22,7 +22,7 @@ use risingwave_pb::stream_plan::SinkDesc as ProstSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SinkDesc {
/// Id of the sink. For debug now.
pub id: SinkId,
Expand All @@ -48,7 +48,7 @@ pub struct SinkDesc {
pub distribution_key: Vec<usize>,

/// The properties of the sink.
pub properties: HashMap<String, String>,
pub properties: BTreeMap<String, String>,

// The append-only behavior of the physical sink connector. Frontend will determine `sink_type`
// based on both its own derivation on the append-only attribute and other user-specified
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SinkDesc {
distribution_key: self.distribution_key,
owner,
dependent_relations,
properties: self.properties,
properties: self.properties.into_iter().collect(),
sink_type: self.sink_type,
}
}
Expand All @@ -94,7 +94,7 @@ impl SinkDesc {
pk: self.pk.iter().map(|k| k.to_protobuf()).collect_vec(),
stream_key: self.stream_key.iter().map(|idx| *idx as _).collect_vec(),
distribution_key: self.distribution_key.iter().map(|k| *k as _).collect_vec(),
properties: self.properties.clone(),
properties: self.properties.clone().into_iter().collect(),
sink_type: self.sink_type.to_proto() as i32,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl From<SinkId> for u32 {
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
/// The data written into the sink connector can only be INSERT. No UPDATE or DELETE is
/// allowed.
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/planner_test/tests/testdata/column_pruning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,18 @@
logical_plan: |
LogicalProject { exprs: [t1.a, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_end], pk_conflict: "no check" }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
18 changes: 11 additions & 7 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -815,25 +815,29 @@
logical_plan: |
LogicalProject { exprs: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end], pk_conflict: "no check" }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end], pk_conflict: "no check" }
materialized table: 4294967294
StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(t1.created_at) }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
Table 4294967294 { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id], primary key: [$6 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] }
14 changes: 1 addition & 13 deletions src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Share Source:
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Predicate Push Down:
LogicalProject { exprs: [1:Int32] }
Expand Down Expand Up @@ -63,7 +58,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 34,
"plan_node_id": 33,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down Expand Up @@ -105,13 +100,6 @@
explain_output: |+
Begin:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
└─LogicalScan { table: t2, columns: [v2, _row_id] }
Share Source:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
Expand Down
26 changes: 15 additions & 11 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,20 @@
└─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x) }
└─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.x] }
└─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
├─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
| └─StreamShare { id = 513 }
| └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
| ├─StreamExchange { dist: HashShard(i.x) }
| | └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
| └─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamProject { exprs: [i.x, i.t._row_id, i.t._row_id, i.x] }
└─StreamShare { id = 513 }
└─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id, i.x] }
├─StreamExchange { dist: HashShard(i.x) }
| └─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamExchange { dist: HashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
- name: Use lookup join
sql: |
create table t1 (v1 int, v2 int);
Expand Down Expand Up @@ -499,7 +503,7 @@
└─BatchExchange { order: [], dist: HashShard(b.x) }
└─BatchScan { table: b, columns: [b.x], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [y, z, $expr156(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr156, a._row_id, b._row_id, a.x, b.x], pk_conflict: "no check" }
StreamMaterialize { columns: [y, z, $expr159(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], pk_columns: [a._row_id, b._row_id, a.x, b.x], order_descs: [$expr159, a._row_id, b._row_id, a.x, b.x], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamProject { exprs: [(2:Int32 * Coalesce(a.x, b.x)) as $expr1, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr2, (Coalesce(a.x, b.x) + Coalesce(a.x, b.x)) as $expr3, a._row_id, b._row_id, a.x, b.x] }
└─StreamFilter { predicate: ((2:Int32 * Coalesce(a.x, b.x)) < 10:Int32) }
Expand Down
Loading

0 comments on commit 646de18

Please sign in to comment.