Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support alter source refresh schema for shared source #19740

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

# age is new field
statement error
SELECT age FROM mv_user;

103 changes: 103 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
control substitution on

system ok
rpk topic delete pb_alter_source_shared_test || true; \
(rpk sr subject delete 'pb_alter_source_shared_test-value' && rpk sr subject delete 'pb_alter_source_shared_test-value' --permanent) || true;

system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 20 user

statement ok
CREATE SOURCE src_user
INCLUDE timestamp -- include explicitly here to test a bug found in https://github.com/risingwavelabs/risingwave/pull/17293
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_alter_source_shared_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

statement ok
CREATE MATERIALIZED VIEW mv_user_2 AS SELECT * FROM src_user;

statement ok
CREATE TABLE t_user WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'pb_alter_source_shared_test',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}',
message = 'test.User'
);

# age is new field
statement error
SELECT age FROM mv_user;

statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

statement ok
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

statement ok
flush;


query ????
SELECT COUNT(*) FROM mv_user;
----
25

query ????
SELECT COUNT(*) FROM mv_user_2;
----
25

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 104 0 510

# Push more events with extended fields
system ok
python3 e2e_test/source_inline/kafka/protobuf/pb.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" "pb_alter_source_shared_test" 5 user_with_more_fields

sleep 5s


query ????
SELECT COUNT(*) FROM mv_user;
----
30

query ????
SELECT COUNT(*) FROM mv_user_2;
----
30

query ????
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
30 104 0 1020


# statement ok
# DROP SOURCE src_user CASCADE;
39 changes: 30 additions & 9 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ use itertools::Itertools;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{max_column_id, ColumnCatalog};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
@@ -30,7 +31,9 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::Parser;

use super::alter_table_column::schema_has_schema_registry;
use super::create_source::{bind_columns_from_source, validate_compatibility};
use super::create_source::{
bind_columns_from_source, generate_stream_graph_for_source, validate_compatibility,
};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
@@ -214,9 +217,10 @@ pub async fn handle_alter_source_with_sr(
name: ObjectName,
format_encode: FormatEncodeOptions,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let session = handler_args.session.clone();
let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?;
let mut source = source.as_ref().clone();
let old_columns = source.columns.clone();

if source.associated_table_id.is_some() {
return Err(ErrorCode::NotSupported(
@@ -225,9 +229,6 @@ pub async fn handle_alter_source_with_sr(
)
.into());
};
if source.info.is_shared() {
bail_not_implemented!(issue = 16003, "alter shared source");
}

check_format_encode(&source, &format_encode)?;

@@ -272,14 +273,34 @@ pub async fn handle_alter_source_with_sr(
.format_encode_secret_refs
.extend(format_encode_secret_ref);

let mut pb_source = source.to_prost(schema_id, database_id);

// update version
pb_source.version += 1;
source.version += 1;

let pb_source = source.to_prost(schema_id, database_id);

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_source(pb_source).await?;
if source.info.is_shared() {
let graph = generate_stream_graph_for_source(handler_args, source.clone())?;

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
old_columns
.iter()
.map(|old_c| {
source
.columns
.iter()
.position(|new_c| new_c.column_id() == old_c.column_id())
})
.collect(),
source.columns.len(),
);
catalog_writer
.replace_source(pb_source, graph, col_index_mapping)
.await?
} else {
catalog_writer.alter_source(pb_source).await?;
}
Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}