Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support system column _rw_timestamp for tables #19232

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions e2e_test/batch/basic/rw_timestamp.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a int, id int primary key);

statement ok
create index idx on t(a);

statement ok
insert into t values (1, 1), (2, 2);

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
t 1 1
t 2 2

sleep 3s

statement ok
update t set a = 11 where id = 1;

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
f 2 2
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where id = 1;
----
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where a = 11;
----
t 11 1

statement ok
delete from t;

statement ok
drop table t;
3 changes: 3 additions & 0 deletions e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description t3 NULL volutpat vitae
Expand All @@ -37,6 +38,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL

statement ok
create index idx1 on t3 (v1,v2);
Expand Down Expand Up @@ -65,6 +67,7 @@ v1 integer false Nemo enim ipsam
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1) NULL NULL
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/extended_mode/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ v1 integer false NULL
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description t3 NULL NULL
Expand All @@ -57,6 +58,7 @@ v1 integer false NULL
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL

statement ok
drop table t3;
Expand Down
1 change: 1 addition & 0 deletions e2e_test/source_legacy/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ productId bigint false NULL
productName character varying false NULL
tags character varying[] false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description kafka_json_schema_plain NULL NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ describe my_products;
id integer false NULL
name character varying false NULL
description character varying false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description my_products NULL NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ id bigint false NULL
modified timestamp without time zone false NULL
name character varying false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand All @@ -66,6 +67,7 @@ name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -96,6 +98,7 @@ name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -126,6 +129,7 @@ describe rw_customers;
id bigint false NULL
name character varying false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_change_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ v7 timestamp without time zone false NULL
v8 timestamp with time zone false NULL
v9 interval false NULL
v10 jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down Expand Up @@ -100,6 +101,7 @@ v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down Expand Up @@ -145,6 +147,7 @@ v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ describe rw_customers;
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -153,6 +154,7 @@ c_datetime timestamp without time zone false NULL
c_timestamp timestamp with time zone false NULL
c_enum character varying false NULL
c_json jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_mysql_types_test NULL NULL
Expand Down
1 change: 1 addition & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ c_interval_array interval[] false NULL
c_jsonb_array jsonb[] false NULL
c_uuid_array character varying[] false NULL
c_enum_array character varying[] false NULL
_rw_timestamp timestamp with time zone true NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_postgres_types_test NULL NULL
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() {
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
Expand Down
31 changes: 27 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,40 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());

let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
let res = if table.has_epoch_idx() {
// has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead.
let range_bounds = (Bound::<OwnedRow>::Unbounded, Bound::Unbounded);
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
range_bounds,
false,
1,
PrefetchOptions::new(false, false),
)
.await?;
pin_mut!(iter);
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
if let Some(chunk) = chunk {
let row = chunk.row_at(0).0.to_owned_row();
Ok(Some(row))
} else {
Ok(None)
}
Comment on lines +405 to +421
Copy link
Contributor

@kwannoel kwannoel Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring this into a separate method like get_row_with_rw_timestamp, and calling it here seems more readable.

} else {
// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
Ok(row)
};

if let Some(timer) = timer {
timer.observe_duration()
}

Ok(row)
res
}

#[try_stream(ok = DataChunk, error = BatchError)]
Expand Down
66 changes: 65 additions & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
};

use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
use super::{
row_id_column_desc, rw_timestamp_column_desc, RW_TIMESTAMP_COLUMN_ID, RW_TIMESTAMP_COLUMN_NAME,
USER_COLUMN_ID_OFFSET,
};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::DataType;
use crate::util::value_encoding::DatumToProtoExt;
Expand Down Expand Up @@ -101,6 +104,11 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SystemColumn {
RwTimestamp,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
Expand All @@ -112,6 +120,9 @@ pub struct ColumnDesc {
pub description: Option<String>,
pub additional_column: AdditionalColumn,
pub version: ColumnDescVersion,
/// Currently the system column is used for `_rw_timestamp` only and is generated at runtime,
/// so this field is not persisted.
pub system_column: Option<SystemColumn>,
}

impl ColumnDesc {
Expand All @@ -126,6 +137,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -140,6 +152,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand Down Expand Up @@ -180,6 +193,27 @@ impl ColumnDesc {
description: None,
additional_column: additional_column_type,
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

pub fn named_with_system_column(
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
system_column: SystemColumn,
) -> ColumnDesc {
ColumnDesc {
data_type,
column_id,
name: name.into(),
field_descs: vec![],
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: Some(system_column),
}
}

Expand Down Expand Up @@ -229,6 +263,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -252,6 +287,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -270,6 +306,7 @@ impl ColumnDesc {
generated_or_default_column: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand Down Expand Up @@ -304,6 +341,14 @@ impl From<PbColumnDesc> for ColumnDesc {
.into_iter()
.map(ColumnDesc::from)
.collect();
// In case we convert a system column into proto and then back to ColumnDesc, we need to restore the system column meta.
let system_column = if prost.column_id == RW_TIMESTAMP_COLUMN_ID.get_id()
&& prost.name == RW_TIMESTAMP_COLUMN_NAME
{
Some(SystemColumn::RwTimestamp)
} else {
None
};
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
Self {
data_type: DataType::from(prost.column_type.as_ref().unwrap()),
column_id: ColumnId::new(prost.column_id),
Expand All @@ -314,6 +359,7 @@ impl From<PbColumnDesc> for ColumnDesc {
description: prost.description.clone(),
additional_column,
version,
system_column,
}
}
}
Expand Down Expand Up @@ -372,6 +418,10 @@ impl ColumnCatalog {
self.column_desc.is_generated()
}

pub fn can_dml(&self) -> bool {
!self.is_generated() && !self.is_rw_timestamp_column()
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down Expand Up @@ -424,6 +474,20 @@ impl ColumnCatalog {
}
}

pub fn rw_timestamp_column() -> Self {
Self {
column_desc: rw_timestamp_column_desc(),
is_hidden: true,
}
}

pub fn is_rw_timestamp_column(&self) -> bool {
matches!(
self.column_desc.system_column,
Some(SystemColumn::RwTimestamp)
)
}

pub fn offset_column() -> Self {
Self {
column_desc: offset_column_desc(),
Expand Down
Loading
Loading