diff --git a/e2e_test/streaming/temporal_join.slt b/e2e_test/streaming/temporal_join.slt new file mode 100644 index 000000000000..a3d594985757 --- /dev/null +++ b/e2e_test/streaming/temporal_join.slt @@ -0,0 +1,64 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 1 11 +1 11 NULL NULL + +statement ok +insert into version values(2, 22, 222); + +statement ok +insert into stream values(2, 22, 222); + +query IIII rowsort +select * from v; +---- +1 11 1 11 +1 11 NULL NULL +2 22 2 22 + +statement ok +drop materialized view v; + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 + +query IIII rowsort +select * from v; +---- +1 11 NULL NULL +1 11 NULL NULL +2 22 2 22 + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/src/frontend/planner_test/tests/testdata/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/temporal_join.yaml new file mode 100644 index 000000000000..e788cff2789c --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/temporal_join.yaml @@ -0,0 +1,154 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- name: Left join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1= id2 + stream_plan: | + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } + └─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1) } + | └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } + batch_error: |- + Not supported: do not support temporal join for batch queries + HINT: please use temporal join in streaming queries +- name: Inner join type for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; + stream_plan: | + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1) } + | └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } +- name: implicit join with temporal tables + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF NOW() where id1 = id2 AND a2 < 10; + stream_plan: | + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1) } + | └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } +- name: Multi join key for temporal join + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2, a2)); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on a1 = a2 and id1 = id2 where b2 != a2; + stream_plan: | + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, a2, id1, a1], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1, stream.a1) } + | └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2, version.a2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2], pk: [version.id2, version.a2], dist: UpstreamHashShard(version.id2, version.a2) } +- name: Temporal join with Aggregation + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select count(*) from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; + stream_plan: | + StreamMaterialize { columns: [count], pk_columns: [], pk_conflict: "no check" } + └─StreamProject { exprs: [sum0(count)] } + └─StreamAppendOnlyGlobalSimpleAgg { aggs: [sum0(count), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessLocalSimpleAgg { aggs: [count] } + └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } + ├─StreamExchange { dist: HashShard(stream.id1) } + | └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } +- name: Temporal join join keys requirement test + sql: | + create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table version(id2 int, a2 int, b2 int, primary key (id2, a2)); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; + stream_error: |- + Not supported: Temporal join requires the lookup table's primary key contained exactly in the equivalence condition + HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions +- name: Temporal join append only test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; + stream_error: |- + Not supported: Temporal join requires a append-only left input + HINT: Please ensure your left input is append-only +- name: Temporal join type test + sql: | + create table stream(id1 int, a1 int, b1 int); + create table version(id2 int, a2 int, b2 int, primary key (id2)); + select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; + stream_error: |- + Not supported: exist dangling temporal scan + HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used. +- name: multi-way temporal join with the same key + sql: | + create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table version1(k int, x1 int, y2 int, primary key (k)); + create table version2(k int, x2 int, y2 int, primary key (k)); + select stream.k, x1, x2, a1, b1 + from stream + join version1 FOR SYSTEM_TIME AS OF NOW() on stream.k = version1.k + join version2 FOR SYSTEM_TIME AS OF NOW() on stream.k = version2.k where a1 < 10; + stream_plan: | + StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], pk_columns: [stream._row_id, version1.k, k, version2.k], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] } + ├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + | ├─StreamExchange { dist: HashShard(stream.k) } + | | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + | | └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + | └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.k) } + | └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.k) } + └─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) } +- name: multi-way temporal join with different keys + sql: | + create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table version1(id1 int, x1 int, y2 int, primary key (id1)); + create table version2(id2 int, x2 int, y2 int, primary key (id2)); + select stream.id1, x1, stream.id2, x2, a1, b1 + from stream + join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1 + join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10; + stream_plan: | + StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] } + ├─StreamExchange { dist: HashShard(stream.id2) } + | └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + | ├─StreamExchange { dist: HashShard(stream.id1) } + | | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + | | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + | └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) } + | └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) } + └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) } +- name: multi-way temporal join with different keys + sql: | + create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table version1(id1 int, x1 int, y2 int, primary key (id1)); + create table version2(id2 int, x2 int, y2 int, primary key (id2)); + select stream.id1, x1, stream.id2, x2, a1, b1 + from stream + join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1 + join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10; + stream_plan: | + StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" } + └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] } + ├─StreamExchange { dist: HashShard(stream.id2) } + | └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + | ├─StreamExchange { dist: HashShard(stream.id1) } + | | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + | | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + | └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) } + | └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) } + └─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) } + └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) } diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index a9ef690f6358..c4a4d427e693 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -258,6 +258,7 @@ impl Binder { &mut self, name: ObjectName, alias: Option, + for_system_time_as_of_now: bool, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; if schema_name.is_none() && let Some(item) = self.context.cte_to_relation.get(&table_name) { @@ -293,7 +294,7 @@ impl Binder { Ok(share_relation) } else { - self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias) + self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_now) } } @@ -313,7 +314,7 @@ impl Binder { }?; Ok(( - self.bind_relation_by_name(table_name.clone(), None)?, + self.bind_relation_by_name(table_name.clone(), None, false)?, table_name, )) } @@ -358,12 +359,16 @@ impl Binder { .map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string()); let table_name = self.catalog.get_table_name_by_id(table_id)?; - self.bind_relation_by_name_inner(Some(&schema), &table_name, alias) + self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false) } pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result { match table_factor { - TableFactor::Table { name, alias } => self.bind_relation_by_name(name, alias), + TableFactor::Table { + name, + alias, + for_system_time_as_of_now, + } => self.bind_relation_by_name(name, alias, for_system_time_as_of_now), TableFactor::TableFunction { name, alias, args } => { let func_name = &name.0[0].real_value(); if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) { @@ -378,6 +383,7 @@ impl Binder { Some(PG_CATALOG_SCHEMA_NAME), PG_KEYWORDS_TABLE_NAME, alias, + false, ) } else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) { let args: Vec = args diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 2e46b3725c99..0066f8a4a025 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -37,6 +37,7 @@ pub struct BoundBaseTable { pub table_id: TableId, pub table_catalog: TableCatalog, pub table_indexes: Vec>, + pub for_system_time_as_of_now: bool, } #[derive(Debug, Clone)] @@ -63,6 +64,7 @@ impl Binder { schema_name: Option<&str>, table_name: &str, alias: Option, + for_system_time_as_of_now: bool, ) -> Result { fn is_system_schema(schema_name: &str) -> bool { SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name) @@ -126,7 +128,11 @@ impl Binder { self.catalog .get_table_by_name(&self.db_name, schema_path, table_name) { - self.resolve_table_relation(table_catalog, schema_name)? + self.resolve_table_relation( + table_catalog, + schema_name, + for_system_time_as_of_now, + )? } else if let Ok((source_catalog, _)) = self.catalog .get_source_by_name(&self.db_name, schema_path, table_name) @@ -167,7 +173,11 @@ impl Binder { self.catalog.get_schema_by_name(&self.db_name, schema_name) { if let Some(table_catalog) = schema.get_table_by_name(table_name) { - return self.resolve_table_relation(table_catalog, schema_name); + return self.resolve_table_relation( + table_catalog, + schema_name, + for_system_time_as_of_now, + ); } else if let Some(source_catalog) = schema.get_source_by_name(table_name) { @@ -194,6 +204,7 @@ impl Binder { &self, table_catalog: &TableCatalog, schema_name: &str, + for_system_time_as_of_now: bool, ) -> Result<(Relation, Vec<(bool, Field)>)> { let table_id = table_catalog.id(); let table_catalog = table_catalog.clone(); @@ -208,6 +219,7 @@ impl Binder { table_id, table_catalog, table_indexes, + for_system_time_as_of_now, }; Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns)) @@ -291,6 +303,7 @@ impl Binder { table_id, table_catalog, table_indexes, + for_system_time_as_of_now: false, }) } diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index 3ccdf9abff6c..d0e0f7808ea9 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -309,6 +309,7 @@ impl Binder { Some(PG_CATALOG_SCHEMA_NAME), PG_USER_TABLE_NAME, None, + false, )?); let where_clause = Some( FunctionCall::new( diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index b1b29904157f..e50ed33a2793 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -73,7 +73,7 @@ impl Binder { let owner = table_catalog.owner; let table_version_id = table_catalog.version_id().expect("table must be versioned"); - let table = self.bind_relation_by_name(name, None)?; + let table = self.bind_relation_by_name(name, None, false)?; let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index 55143647ab25..a2db235c6d40 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -286,6 +286,7 @@ fn assemble_materialize( // Index table has no indexes. vec![], context, + false, ); let exprs = index_columns diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 23ba0ea97026..b7aad90ac95b 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -37,6 +37,7 @@ pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { name: from_name, alias: None, + for_system_time_as_of_now: false, }; let from = vec![TableWithJoins { relation: table_factor, diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index 741716fddb75..b14159e3a816 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -34,7 +34,7 @@ use crate::handler::HandlerArgs; pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Result { let session = handler_args.session; let mut binder = Binder::new(&session); - let relation = binder.bind_relation_by_name(table_name.clone(), None)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; // For Source, it doesn't have table catalog so use get source to get column descs. let (columns, pk_columns, indices): (Vec, Vec, Vec>) = { let (column_catalogs, pk_column_catalogs, indices) = match relation { diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index e36680e2d6d7..f03513f294fb 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -33,7 +33,7 @@ pub fn get_columns_from_table( table_name: ObjectName, ) -> Result> { let mut binder = Binder::new(session); - let relation = binder.bind_relation_by_name(table_name.clone(), None)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; let catalogs = match relation { Relation::Source(s) => s.catalog.columns, Relation::BaseTable(t) => t.table_catalog.columns, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 32334b50bc73..91dac433c14c 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -54,6 +54,7 @@ use crate::expr::InputRef; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, }; +use crate::optimizer::plan_visitor::TemporalJoinValidator; use crate::optimizer::property::Distribution; use crate::utils::ColIndexMappingRewriteExt; use crate::WithOptions; @@ -160,6 +161,14 @@ impl PlanRoot { // Logical optimization let mut plan = self.gen_optimized_logical_plan_for_batch()?; + if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) { + return Err(ErrorCode::NotSupported( + "do not support temporal join for batch queries".to_string(), + "please use temporal join in streaming queries".to_string(), + ) + .into()); + } + // Convert to physical plan node plan = plan.to_batch_with_order_required(&self.required_order)?; @@ -349,6 +358,13 @@ impl PlanRoot { #[cfg(debug_assertions)] InputRefValidator.validate(plan.clone()); + if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) { + return Err(ErrorCode::NotSupported( + "exist dangling temporal scan".to_string(), + "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_string(), + ).into()); + } + Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/generic/scan.rs b/src/frontend/src/optimizer/plan_node/generic/scan.rs index a1e3b5ab96f8..ac7cdf4b3f88 100644 --- a/src/frontend/src/optimizer/plan_node/generic/scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/scan.rs @@ -39,6 +39,7 @@ pub struct Scan { pub predicate: Condition, /// Help RowSeqScan executor use a better chunk size pub chunk_size: Option, + pub for_system_time_as_of_now: bool, } impl Scan { diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index ee1c7a255f5b..d3eeca9a0ef8 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1054,7 +1054,15 @@ impl LogicalJoin { } } - #[allow(dead_code)] + fn should_be_temporal_join(&self) -> bool { + let right = self.right(); + if let Some(logical_scan) = right.as_logical_scan() { + logical_scan.for_system_time_as_of_now() + } else { + false + } + } + fn to_stream_temporal_join( &self, predicate: EqJoinPredicate, @@ -1069,7 +1077,7 @@ impl LogicalJoin { if !left.append_only() { return Err(RwError::from(ErrorCode::NotSupported( - "Temporal join needs a append-only left input".into(), + "Temporal join requires a append-only left input".into(), "Please ensure your left input is append-only".into(), ))); } @@ -1077,11 +1085,18 @@ impl LogicalJoin { let right = self.right(); let Some(logical_scan) = right.as_logical_scan() else { return Err(RwError::from(ErrorCode::NotSupported( - "Temporal join needs a table scan as its lookup table".into(), + "Temporal join requires a table scan as its lookup table".into(), "Please provide a table scan".into(), ))); }; + if !logical_scan.for_system_time_as_of_now() { + return Err(RwError::from(ErrorCode::NotSupported( + "Temporal join requires a table defined as temporal table".into(), + "Please use FOR SYSTEM_TIME AS OF NOW() syntax".into(), + ))); + } + let table_desc = logical_scan.table_desc(); // Verify that right join key columns are the primary key of the lookup table. @@ -1364,7 +1379,12 @@ impl ToStream for LogicalJoin { )) .into()); } - self.to_stream_hash_join(predicate, ctx) + + if self.should_be_temporal_join() { + self.to_stream_temporal_join(predicate, ctx) + } else { + self.to_stream_hash_join(predicate, ctx) + } } else if let Some(dynamic_filter) = self.to_stream_dynamic_filter(self.on().clone(), ctx)? { diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 999449397d1a..ad19273b3c04 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -49,6 +49,7 @@ pub struct LogicalScan { impl LogicalScan { /// Create a `LogicalScan` node. Used internally by optimizer. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( table_name: String, // explain-only is_sys_table: bool, @@ -57,6 +58,7 @@ impl LogicalScan { indexes: Vec>, ctx: OptimizerContextRef, predicate: Condition, // refers to column indexes of the table + for_system_time_as_of_now: bool, ) -> Self { // here we have 3 concepts // 1. column_id: ColumnId, stored in catalog and a ID to access data from storage. @@ -86,6 +88,7 @@ impl LogicalScan { indexes, predicate, chunk_size: None, + for_system_time_as_of_now, }; let schema = core.schema(); @@ -112,6 +115,7 @@ impl LogicalScan { table_desc: Rc, indexes: Vec>, ctx: OptimizerContextRef, + for_system_time_as_of_now: bool, ) -> Self { Self::new( table_name, @@ -121,6 +125,7 @@ impl LogicalScan { indexes, ctx, Condition::true_cond(), + for_system_time_as_of_now, ) } @@ -174,6 +179,10 @@ impl LogicalScan { self.core.is_sys_table } + pub fn for_system_time_as_of_now(&self) -> bool { + self.core.for_system_time_as_of_now + } + /// Get a reference to the logical scan's table desc. pub fn table_desc(&self) -> &TableDesc { self.core.table_desc.as_ref() @@ -299,6 +308,7 @@ impl LogicalScan { vec![], self.ctx(), new_predicate, + self.for_system_time_as_of_now(), ) } @@ -349,6 +359,7 @@ impl LogicalScan { self.indexes().to_vec(), self.ctx(), Condition::true_cond(), + self.for_system_time_as_of_now(), ); let project_expr = if self.required_col_idx() != self.output_col_idx() { Some(self.output_idx_to_input_ref()) @@ -367,6 +378,7 @@ impl LogicalScan { self.indexes().to_vec(), self.base.ctx.clone(), predicate, + self.for_system_time_as_of_now(), ) } @@ -379,6 +391,7 @@ impl LogicalScan { self.indexes().to_vec(), self.base.ctx.clone(), self.predicate().clone(), + self.for_system_time_as_of_now(), ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 9404bcabf556..0561bcc88714 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -41,14 +41,23 @@ pub struct StreamTemporalJoin { impl StreamTemporalJoin { pub fn new(logical: LogicalJoin, eq_join_predicate: EqJoinPredicate) -> Self { - let ctx = logical.base.ctx.clone(); - assert!( logical.join_type() == JoinType::Inner || logical.join_type() == JoinType::LeftOuter ); assert!(logical.left().append_only()); assert!(logical.right().logical_pk() == eq_join_predicate.right_eq_indexes()); + let right = logical.right(); + let exchange: &StreamExchange = right + .as_stream_exchange() + .expect("should be a no shuffle stream exchange"); + assert!(exchange.no_shuffle()); + let exchange_input = exchange.input(); + let scan: &StreamTableScan = exchange_input + .as_stream_table_scan() + .expect("should be a stream table scan"); + assert!(scan.logical().for_system_time_as_of_now()); + let ctx = logical.base.ctx.clone(); let l2o = logical .l2i_col_mapping() .composite(&logical.i2o_col_mapping()); diff --git a/src/frontend/src/optimizer/plan_visitor/mod.rs b/src/frontend/src/optimizer/plan_visitor/mod.rs index 0ee19fa0bb41..ff715322a486 100644 --- a/src/frontend/src/optimizer/plan_visitor/mod.rs +++ b/src/frontend/src/optimizer/plan_visitor/mod.rs @@ -27,6 +27,8 @@ pub use input_ref_validator::*; mod execution_mode_decider; pub use execution_mode_decider::*; +mod temporal_join_validator; +pub use temporal_join_validator::*; use crate::for_all_plan_nodes; use crate::optimizer::plan_node::*; diff --git a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs new file mode 100644 index 000000000000..93cfe5858420 --- /dev/null +++ b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::optimizer::plan_node::{ + BatchSeqScan, LogicalScan, PlanTreeNodeBinary, StreamTableScan, StreamTemporalJoin, +}; +use crate::optimizer::plan_visitor::PlanVisitor; +use crate::PlanRef; + +#[derive(Debug, Clone, Default)] +pub struct TemporalJoinValidator {} + +impl TemporalJoinValidator { + pub fn exist_dangling_temporal_scan(plan: PlanRef) -> bool { + let mut decider = TemporalJoinValidator {}; + decider.visit(plan) + } +} + +impl PlanVisitor for TemporalJoinValidator { + fn merge(a: bool, b: bool) -> bool { + a | b + } + + fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool { + stream_table_scan.logical().for_system_time_as_of_now() + } + + fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool { + batch_seq_scan.logical().for_system_time_as_of_now() + } + + fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool { + logical_scan.for_system_time_as_of_now() + } + + fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool { + self.visit(stream_temporal_join.left()) + } +} diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 61c277c06679..a351c1934318 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -94,7 +94,9 @@ impl Rule for IndexSelectionRule { if indexes.is_empty() { return None; } - + if logical_scan.for_system_time_as_of_now() { + return None; + } let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan); let primary_cost = min( self.estimate_table_scan_cost(logical_scan, primary_table_row_size), @@ -191,6 +193,7 @@ impl IndexSelectionRule { index.index_table.table_desc().into(), vec![], logical_scan.ctx(), + false, ); let primary_table_scan = LogicalScan::create( @@ -199,6 +202,7 @@ impl IndexSelectionRule { index.primary_table.table_desc().into(), vec![], logical_scan.ctx(), + false, ); let conjunctions = index @@ -297,6 +301,7 @@ impl IndexSelectionRule { primary_table_desc.clone().into(), vec![], logical_scan.ctx(), + false, ); let conjunctions = primary_table_desc @@ -525,6 +530,7 @@ impl IndexSelectionRule { Condition { conjunctions: conjunctions.to_vec(), }, + false, ); result.push(primary_access.into()); @@ -571,6 +577,7 @@ impl IndexSelectionRule { vec![], ctx, new_predicate, + false, ) .into(), ) diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 8d9c3e9ee41a..f3f1946d1f51 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -55,6 +55,7 @@ impl Planner { Rc::new(sys_table.sys_table_catalog.table_desc()), vec![], self.ctx(), + false, ) .into()) } @@ -70,6 +71,7 @@ impl Planner { .map(|x| x.as_ref().clone().into()) .collect(), self.ctx(), + base_table.for_system_time_as_of_now, ) .into()) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index ea98efb24ace..4689f2bdd874 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -551,6 +551,7 @@ pub(crate) mod tests { }), vec![], ctx, + false, ) .to_batch() .unwrap() diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index 17df0434b1d5..31b7074b6e67 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -339,6 +339,7 @@ pub enum TableFactor { Table { name: ObjectName, alias: Option, + for_system_time_as_of_now: bool, }, Derived { lateral: bool, @@ -363,8 +364,15 @@ pub enum TableFactor { impl fmt::Display for TableFactor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TableFactor::Table { name, alias } => { + TableFactor::Table { + name, + alias, + for_system_time_as_of_now, + } => { write!(f, "{}", name)?; + if *for_system_time_as_of_now { + write!(f, " FOR SYSTEM_TIME AS OF NOW()")?; + } if let Some(alias) = alias { write!(f, " AS {}", alias)?; } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index a3c63c4b2b35..ed9893166585 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2765,6 +2765,22 @@ impl Parser { } } + pub fn parse_for_system_time_as_of_now(&mut self) -> Result { + let after_for = self.parse_keyword(Keyword::FOR); + if after_for { + self.expect_keywords(&[Keyword::SYSTEM_TIME, Keyword::AS, Keyword::OF])?; + let ident = self.parse_identifier()?; + if ident.real_value() != "now" { + return parser_err!(format!("Expected now, found: {}", ident.real_value())); + } + self.expect_token(&Token::LParen)?; + self.expect_token(&Token::RParen)?; + Ok(true) + } else { + Ok(false) + } + } + /// Parse a possibly qualified, possibly quoted identifier, e.g. /// `foo` or `myschema."table" pub fn parse_object_name(&mut self) -> Result { @@ -3544,8 +3560,13 @@ impl Parser { let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; Ok(TableFactor::TableFunction { name, alias, args }) } else { + let for_system_time_as_of_now = self.parse_for_system_time_as_of_now()?; let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; - Ok(TableFactor::Table { name, alias }) + Ok(TableFactor::Table { + name, + alias, + for_system_time_as_of_now, + }) } } } diff --git a/src/sqlparser/src/test_utils.rs b/src/sqlparser/src/test_utils.rs index 2a6ea6438a32..92c79f1e256a 100644 --- a/src/sqlparser/src/test_utils.rs +++ b/src/sqlparser/src/test_utils.rs @@ -138,6 +138,7 @@ pub fn table_alias(name: impl Into) -> Option { pub fn table(name: impl Into) -> TableFactor { TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(name.into())]), + for_system_time_as_of_now: false, alias: None, } } diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index b4c263252e0d..1d3d280d3e56 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -20,6 +20,7 @@ #[macro_use] mod test_utils; use matches::assert_matches; +use risingwave_sqlparser::ast::JoinOperator::Inner; use risingwave_sqlparser::ast::*; use risingwave_sqlparser::keywords::ALL_KEYWORDS; use risingwave_sqlparser::parser::ParserError; @@ -2149,12 +2150,17 @@ fn parse_delimited_identifiers() { ); // check FROM match only(select.from).relation { - TableFactor::Table { name, alias } => { + TableFactor::Table { + name, + alias, + for_system_time_as_of_now, + } => { assert_eq!(vec![Ident::with_quote_unchecked('"', "a table")], name.0); assert_eq!( Ident::with_quote_unchecked('"', "alias"), alias.unwrap().name ); + assert!(!for_system_time_as_of_now); } _ => panic!("Expecting TableFactor::Table"), } @@ -2281,6 +2287,7 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t1".into()]), alias: None, + for_system_time_as_of_now: false, }, joins: vec![], }, @@ -2288,6 +2295,7 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t2".into()]), alias: None, + for_system_time_as_of_now: false, }, joins: vec![], } @@ -2303,11 +2311,13 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t1a".into()]), alias: None, + for_system_time_as_of_now: false, }, joins: vec![Join { relation: TableFactor::Table { name: ObjectName(vec!["t1b".into()]), alias: None, + for_system_time_as_of_now: false, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }] @@ -2316,11 +2326,13 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t2a".into()]), alias: None, + for_system_time_as_of_now: false, }, joins: vec![Join { relation: TableFactor::Table { name: ObjectName(vec!["t2b".into()]), alias: None, + for_system_time_as_of_now: false, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }] @@ -2339,6 +2351,7 @@ fn parse_cross_join() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked("t2")]), alias: None, + for_system_time_as_of_now: false, }, join_operator: JoinOperator::CrossJoin }, @@ -2346,6 +2359,27 @@ fn parse_cross_join() { ); } +#[test] +fn parse_temporal_join() { + let sql = "SELECT * FROM t1 JOIN t2 FOR SYSTEM_TIME AS OF NOW() ON c1 = c2"; + let select = verified_only_select(sql); + assert_eq!( + Join { + relation: TableFactor::Table { + name: ObjectName(vec![Ident::new_unchecked("t2")]), + alias: None, + for_system_time_as_of_now: true, + }, + join_operator: Inner(JoinConstraint::On(Expr::BinaryOp { + left: Box::new(Expr::Identifier("c1".into())), + op: BinaryOperator::Eq, + right: Box::new(Expr::Identifier("c2".into())), + })) + }, + only(only(select.from).joins), + ); +} + #[test] fn parse_joins_on() { fn join_with_constraint( @@ -2357,6 +2391,7 @@ fn parse_joins_on() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(relation.into())]), alias, + for_system_time_as_of_now: false, }, join_operator: f(JoinConstraint::On(Expr::BinaryOp { left: Box::new(Expr::Identifier("c1".into())), @@ -2408,6 +2443,7 @@ fn parse_joins_using() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(relation.into())]), alias, + for_system_time_as_of_now: false, }, join_operator: f(JoinConstraint::Using(vec!["c1".into()])), } @@ -2451,6 +2487,7 @@ fn parse_natural_join() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked("t2")]), alias: None, + for_system_time_as_of_now: false, }, join_operator: f(JoinConstraint::Natural), } @@ -2677,6 +2714,7 @@ fn parse_derived_tables() { relation: TableFactor::Table { name: ObjectName(vec!["t2".into()]), alias: None, + for_system_time_as_of_now: false, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }], diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index 0c551cd4d7d7..b891b97c6edf 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -1,14 +1,14 @@ # This file is automatically generated. See `src/sqlparser/test_runner/src/bin/apply.rs` for more information. - input: SELECT sqrt(id) FROM foo formatted_sql: SELECT sqrt(id) FROM foo - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "sqrt", quote_style: None }]), args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], over: None, distinct: false, order_by: [], filter: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "sqrt", quote_style: None }]), args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], over: None, distinct: false, order_by: [], filter: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_now: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT INT '1' formatted_sql: SELECT INT '1' - input: SELECT (foo).v1.v2 FROM foo formatted_sql: SELECT (foo).v1.v2 FROM foo - input: SELECT ((((foo).v1)).v2) FROM foo formatted_sql: SELECT (((foo).v1).v2) FROM foo - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Nested(FieldIdentifier(FieldIdentifier(Identifier(Ident { value: "foo", quote_style: None }), [Ident { value: "v1", quote_style: None }]), [Ident { value: "v2", quote_style: None }])))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Nested(FieldIdentifier(FieldIdentifier(Identifier(Ident { value: "foo", quote_style: None }), [Ident { value: "v1", quote_style: None }]), [Ident { value: "v2", quote_style: None }])))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_now: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT (foo.v1).v2 FROM foo formatted_sql: SELECT (foo.v1).v2 FROM foo - input: SELECT (v1).v2 FROM foo @@ -76,3 +76,6 @@ error_msg: |- sql parser error: Expected end of statement, found: ( Near "SELECT 1::int" +- input: select id1, a1, id2, a2 from stream as S join version FOR SYSTEM_TIME AS OF NOW() AS V on id1= id2 + formatted_sql: SELECT id1, a1, id2, a2 FROM stream AS S JOIN version FOR SYSTEM_TIME AS OF NOW() AS V ON id1 = id2 + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_now: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_now: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' \ No newline at end of file diff --git a/src/tests/sqlsmith/src/sql_gen/relation.rs b/src/tests/sqlsmith/src/sql_gen/relation.rs index 7d103e3d6e47..86e57497bf53 100644 --- a/src/tests/sqlsmith/src/sql_gen/relation.rs +++ b/src/tests/sqlsmith/src/sql_gen/relation.rs @@ -66,6 +66,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { name: alias.as_str().into(), columns: vec![], }), + for_system_time_as_of_now: false, }; table.name = alias; // Rename the table. let columns = table.get_qualified_columns(); diff --git a/src/tests/sqlsmith/src/sql_gen/utils.rs b/src/tests/sqlsmith/src/sql_gen/utils.rs index 389c4fe05a54..565bd2638cf8 100644 --- a/src/tests/sqlsmith/src/sql_gen/utils.rs +++ b/src/tests/sqlsmith/src/sql_gen/utils.rs @@ -74,6 +74,7 @@ pub(crate) fn create_table_factor_from_table(table: &Table) -> TableFactor { TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(&table.name)]), alias: None, + for_system_time_as_of_now: false, } }