Skip to content

Commit

Permalink
feat(streaming): support temporal join part 3 (#8480)
Browse files Browse the repository at this point in the history
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: st1page <1245835950@qq.com>
  • Loading branch information
3 people authored Mar 13, 2023
1 parent f36bf0b commit 2db01f9
Show file tree
Hide file tree
Showing 27 changed files with 456 additions and 21 deletions.
64 changes: 64 additions & 0 deletions e2e_test/streaming/temporal_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 22, 222);

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL
2 22 2 22

statement ok
drop materialized view v;

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 11 NULL NULL
2 22 2 22

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
154 changes: 154 additions & 0 deletions src/frontend/planner_test/tests/testdata/temporal_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1= id2
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
batch_error: |-
Not supported: do not support temporal join for batch queries
HINT: please use temporal join in streaming queries
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: implicit join with temporal tables
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF NOW() where id1 = id2 AND a2 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: Multi join key for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on a1 = a2 and id1 = id2 where b2 != a2;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, a2, id1, a1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1, stream.a1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2, version.a2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2], pk: [version.id2, version.a2], dist: UpstreamHashShard(version.id2, version.a2) }
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_plan: |
StreamMaterialize { columns: [count], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum0(count)] }
└─StreamAppendOnlyGlobalSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count] }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join join keys requirement test
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: Temporal join requires the lookup table's primary key contained exactly in the equivalence condition
HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: Temporal join requires a append-only left input
HINT: Please ensure your left input is append-only
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: exist dangling temporal scan
HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used.
- name: multi-way temporal join with the same key
sql: |
create table stream(k int, a1 int, b1 int) APPEND ONLY;
create table version1(k int, x1 int, y2 int, primary key (k));
create table version2(k int, x2 int, y2 int, primary key (k));
select stream.k, x1, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.k = version1.k
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.k = version2.k where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], pk_columns: [stream._row_id, version1.k, k, version2.k], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] }
├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
| ├─StreamExchange { dist: HashShard(stream.k) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.k) }
| └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.k) }
└─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) }
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
| ├─StreamExchange { dist: HashShard(stream.id1) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }
- name: multi-way temporal join with different keys
sql: |
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY;
create table version1(id1 int, x1 int, y2 int, primary key (id1));
create table version2(id2 int, x2 int, y2 int, primary key (id2));
select stream.id1, x1, stream.id2, x2, a1, b1
from stream
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
| ├─StreamExchange { dist: HashShard(stream.id1) }
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) }
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) }
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) }
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) }
14 changes: 10 additions & 4 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl Binder {
&mut self,
name: ObjectName,
alias: Option<TableAlias>,
for_system_time_as_of_now: bool,
) -> Result<Relation> {
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;
if schema_name.is_none() && let Some(item) = self.context.cte_to_relation.get(&table_name) {
Expand Down Expand Up @@ -293,7 +294,7 @@ impl Binder {
Ok(share_relation)
} else {

self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias)
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_now)
}
}

Expand All @@ -313,7 +314,7 @@ impl Binder {
}?;

Ok((
self.bind_relation_by_name(table_name.clone(), None)?,
self.bind_relation_by_name(table_name.clone(), None, false)?,
table_name,
))
}
Expand Down Expand Up @@ -358,12 +359,16 @@ impl Binder {
.map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string());

let table_name = self.catalog.get_table_name_by_id(table_id)?;
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias)
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false)
}

pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
match table_factor {
TableFactor::Table { name, alias } => self.bind_relation_by_name(name, alias),
TableFactor::Table {
name,
alias,
for_system_time_as_of_now,
} => self.bind_relation_by_name(name, alias, for_system_time_as_of_now),
TableFactor::TableFunction { name, alias, args } => {
let func_name = &name.0[0].real_value();
if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) {
Expand All @@ -378,6 +383,7 @@ impl Binder {
Some(PG_CATALOG_SCHEMA_NAME),
PG_KEYWORDS_TABLE_NAME,
alias,
false,
)
} else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) {
let args: Vec<ExprImpl> = args
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct BoundBaseTable {
pub table_id: TableId,
pub table_catalog: TableCatalog,
pub table_indexes: Vec<Arc<IndexCatalog>>,
pub for_system_time_as_of_now: bool,
}

#[derive(Debug, Clone)]
Expand All @@ -63,6 +64,7 @@ impl Binder {
schema_name: Option<&str>,
table_name: &str,
alias: Option<TableAlias>,
for_system_time_as_of_now: bool,
) -> Result<Relation> {
fn is_system_schema(schema_name: &str) -> bool {
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)
Expand Down Expand Up @@ -126,7 +128,11 @@ impl Binder {
self.catalog
.get_table_by_name(&self.db_name, schema_path, table_name)
{
self.resolve_table_relation(table_catalog, schema_name)?
self.resolve_table_relation(
table_catalog,
schema_name,
for_system_time_as_of_now,
)?
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -167,7 +173,11 @@ impl Binder {
self.catalog.get_schema_by_name(&self.db_name, schema_name)
{
if let Some(table_catalog) = schema.get_table_by_name(table_name) {
return self.resolve_table_relation(table_catalog, schema_name);
return self.resolve_table_relation(
table_catalog,
schema_name,
for_system_time_as_of_now,
);
} else if let Some(source_catalog) =
schema.get_source_by_name(table_name)
{
Expand All @@ -194,6 +204,7 @@ impl Binder {
&self,
table_catalog: &TableCatalog,
schema_name: &str,
for_system_time_as_of_now: bool,
) -> Result<(Relation, Vec<(bool, Field)>)> {
let table_id = table_catalog.id();
let table_catalog = table_catalog.clone();
Expand All @@ -208,6 +219,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_now,
};

Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
Expand Down Expand Up @@ -291,6 +303,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_now: false,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl Binder {
Some(PG_CATALOG_SCHEMA_NAME),
PG_USER_TABLE_NAME,
None,
false,
)?);
let where_clause = Some(
FunctionCall::new(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Binder {
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");

let table = self.bind_relation_by_name(name, None)?;
let table = self.bind_relation_by_name(name, None, false)?;

let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ fn assemble_materialize(
// Index table has no indexes.
vec![],
context,
false,
);

let exprs = index_columns
Expand Down
Loading

0 comments on commit 2db01f9

Please sign in to comment.