Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): support temporal join part 3 #8480

Merged
merged 9 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions e2e_test/streaming/temporal_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 11, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 22, 222);

query IIII rowsort
select * from v;
----
1 11 1 11
1 11 NULL NULL
2 22 2 22

statement ok
drop materialized view v;

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 11 NULL NULL
2 22 2 22

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
80 changes: 80 additions & 0 deletions src/frontend/planner_test/tests/testdata/temporal_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: Left join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1= id2
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
batch_error: |-
Not supported: do not support temporal join for batch queries
HINT: please use temporal join in streaming queries
- name: Inner join type for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: Multi join key for temporal join
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on a1 = a2 and id1 = id2 where b2 != a2;
stream_plan: |
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, a2, id1, a1], pk_conflict: "no check" }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] }
├─StreamExchange { dist: HashShard(stream.id1, stream.a1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2, version.a2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2], pk: [version.id2, version.a2], dist: UpstreamHashShard(version.id2, version.a2) }
- name: Temporal join with Aggregation
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select count(*) from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_plan: |
StreamMaterialize { columns: [count], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum0(count)] }
└─StreamAppendOnlyGlobalSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count] }
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] }
├─StreamExchange { dist: HashShard(stream.id1) }
| └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) }
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) }
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
- name: Temporal join join keys requirement test
sql: |
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2, a2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: Temporal join requires the lookup table's primary key contained exactly in the equivalence condition
HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions
- name: Temporal join append only test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: Temporal join requires a append-only left input
HINT: Please ensure your left input is append-only
- name: Temporal join type test
sql: |
create table stream(id1 int, a1 int, b1 int);
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10;
stream_error: |-
Not supported: exist dangling temporal scan
HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used.
14 changes: 10 additions & 4 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl Binder {
&mut self,
name: ObjectName,
alias: Option<TableAlias>,
for_system_time_as_of_now: bool,
) -> Result<Relation> {
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;
if schema_name.is_none() && let Some(item) = self.context.cte_to_relation.get(&table_name) {
Expand Down Expand Up @@ -293,7 +294,7 @@ impl Binder {
Ok(share_relation)
} else {

self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias)
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_now)
}
}

Expand All @@ -313,7 +314,7 @@ impl Binder {
}?;

Ok((
self.bind_relation_by_name(table_name.clone(), None)?,
self.bind_relation_by_name(table_name.clone(), None, false)?,
table_name,
))
}
Expand Down Expand Up @@ -358,12 +359,16 @@ impl Binder {
.map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string());

let table_name = self.catalog.get_table_name_by_id(table_id)?;
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias)
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false)
}

pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
match table_factor {
TableFactor::Table { name, alias } => self.bind_relation_by_name(name, alias),
TableFactor::Table {
name,
alias,
for_system_time_as_of_now,
} => self.bind_relation_by_name(name, alias, for_system_time_as_of_now),
TableFactor::TableFunction { name, alias, args } => {
let func_name = &name.0[0].real_value();
if func_name.eq_ignore_ascii_case(RW_INTERNAL_TABLE_FUNCTION_NAME) {
Expand All @@ -378,6 +383,7 @@ impl Binder {
Some(PG_CATALOG_SCHEMA_NAME),
PG_KEYWORDS_TABLE_NAME,
alias,
false,
)
} else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) {
let args: Vec<ExprImpl> = args
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct BoundBaseTable {
pub table_id: TableId,
pub table_catalog: TableCatalog,
pub table_indexes: Vec<Arc<IndexCatalog>>,
pub for_system_time_as_of_now: bool,
}

#[derive(Debug, Clone)]
Expand All @@ -63,6 +64,7 @@ impl Binder {
schema_name: Option<&str>,
table_name: &str,
alias: Option<TableAlias>,
for_system_time_as_of_now: bool,
) -> Result<Relation> {
fn is_system_schema(schema_name: &str) -> bool {
SYSTEM_SCHEMAS.iter().any(|s| *s == schema_name)
Expand Down Expand Up @@ -126,7 +128,11 @@ impl Binder {
self.catalog
.get_table_by_name(&self.db_name, schema_path, table_name)
{
self.resolve_table_relation(table_catalog, schema_name)?
self.resolve_table_relation(
table_catalog,
schema_name,
for_system_time_as_of_now,
)?
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -167,7 +173,11 @@ impl Binder {
self.catalog.get_schema_by_name(&self.db_name, schema_name)
{
if let Some(table_catalog) = schema.get_table_by_name(table_name) {
return self.resolve_table_relation(table_catalog, schema_name);
return self.resolve_table_relation(
table_catalog,
schema_name,
for_system_time_as_of_now,
);
} else if let Some(source_catalog) =
schema.get_source_by_name(table_name)
{
Expand All @@ -194,6 +204,7 @@ impl Binder {
&self,
table_catalog: &TableCatalog,
schema_name: &str,
for_system_time_as_of_now: bool,
) -> Result<(Relation, Vec<(bool, Field)>)> {
let table_id = table_catalog.id();
let table_catalog = table_catalog.clone();
Expand All @@ -208,6 +219,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_now,
};

Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
Expand Down Expand Up @@ -291,6 +303,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_now: false,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl Binder {
Some(PG_CATALOG_SCHEMA_NAME),
PG_USER_TABLE_NAME,
None,
false,
)?);
let where_clause = Some(
FunctionCall::new(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Binder {
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");

let table = self.bind_relation_by_name(name, None)?;
let table = self.bind_relation_by_name(name, None, false)?;

let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ fn assemble_materialize(
// Index table has no indexes.
vec![],
context,
false,
);

let exprs = index_columns
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result<Query> {
let table_factor = TableFactor::Table {
name: from_name,
alias: None,
for_system_time_as_of_now: false,
};
let from = vec![TableWithJoins {
relation: table_factor,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::handler::HandlerArgs;
pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Result<RwPgResponse> {
let session = handler_args.session;
let mut binder = Binder::new(&session);
let relation = binder.bind_relation_by_name(table_name.clone(), None)?;
let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?;
// For Source, it doesn't have table catalog so use get source to get column descs.
let (columns, pk_columns, indices): (Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>) = {
let (column_catalogs, pk_column_catalogs, indices) = match relation {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn get_columns_from_table(
table_name: ObjectName,
) -> Result<Vec<ColumnDesc>> {
let mut binder = Binder::new(session);
let relation = binder.bind_relation_by_name(table_name.clone(), None)?;
let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?;
let catalogs = match relation {
Relation::Source(s) => s.catalog.columns,
Relation::BaseTable(t) => t.table_catalog.columns,
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::expr::InputRef;
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive,
};
use crate::optimizer::plan_visitor::TemporalJoinValidator;
use crate::optimizer::property::Distribution;
use crate::utils::ColIndexMappingRewriteExt;
use crate::WithOptions;
Expand Down Expand Up @@ -160,6 +161,14 @@ impl PlanRoot {
// Logical optimization
let mut plan = self.gen_optimized_logical_plan_for_batch()?;

if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
return Err(ErrorCode::NotSupported(
"do not support temporal join for batch queries".to_string(),
"please use temporal join in streaming queries".to_string(),
)
.into());
}

// Convert to physical plan node
plan = plan.to_batch_with_order_required(&self.required_order)?;

Expand Down Expand Up @@ -349,6 +358,13 @@ impl PlanRoot {
#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
return Err(ErrorCode::NotSupported(
"exist dangling temporal scan".to_string(),
"please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_string(),
).into());
}

Ok(plan)
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/generic/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Scan {
pub predicate: Condition,
/// Help RowSeqScan executor use a better chunk size
pub chunk_size: Option<u32>,
pub for_system_time_as_of_now: bool,
}

impl Scan {
Expand Down
Loading