Skip to content

Commit

Permalink
feat: ban alter source for shared source
Browse files Browse the repository at this point in the history
also examined all ALTER clauses for shared source

Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Sep 28, 2024
1 parent b4ac5ab commit 6fe0e51
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 60 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub async fn handle_alter_parallelism(
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
sink.id.sink_id()
}
// TODO: support alter parallelism for shared source
_ => bail!(
"invalid statement type for alter parallelism: {:?}",
stmt_type
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::max_column_id;
use risingwave_connector::source::{extract_source_struct, SourceEncode, SourceStruct};
use risingwave_sqlparser::ast::{
Expand Down Expand Up @@ -59,11 +60,15 @@ pub async fn handle_alter_source_column(
};

if catalog.associated_table_id.is_some() {
Err(ErrorCode::NotSupported(
return Err(ErrorCode::NotSupported(
"alter table with connector with ALTER SOURCE statement".to_string(),
"try to use ALTER TABLE instead".to_string(),
))?
)
.into());
};
if catalog.info.is_shared() {
bail_not_implemented!(issue = 123, "alter shared source");
}

// Currently only allow source without schema registry
let SourceStruct { encode, .. } = extract_source_struct(&catalog.info)?;
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ pub async fn handle_alter_source_with_sr(
)
.into());
};
if source.info.is_shared() {
bail_not_implemented!(issue = 123, "alter shared source");
}

check_format_encode(&source, &connector_schema)?;

Expand Down
7 changes: 7 additions & 0 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,10 @@ impl From<PbSource> for ActiveModel {
}
}
}

impl Model {
pub fn is_shared(&self) -> bool {
self.source_info
.is_some_and(|s| s.to_protobuf().is_shared())
}
}
1 change: 1 addition & 0 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl StreamManagerService for StreamServiceImpl {
}
};

// TODO: check whether shared source is correct
let mutation: ThrottleConfig = actor_to_apply
.iter()
.map(|(fragment_id, actors)| {
Expand Down
122 changes: 64 additions & 58 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,16 @@ impl CatalogController {
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));

// Note: For non-shared source, we don't update their state tables, which
// belongs to the MV.
if source.is_shared() {
update_internal_tables(
object_id,
object::Column::OwnerId,
Value::Int(Some(new_owner)),
)
}
}
ObjectType::Sink => {
let sink = Sink::find_by_id(object_id)
Expand All @@ -1678,34 +1688,11 @@ impl CatalogController {
.ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));

// internal tables.
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(&txn)
.await?;

Object::update_many()
.col_expr(
object::Column::OwnerId,
SimpleExpr::Value(Value::Int(Some(new_owner))),
)
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(&txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(&txn)
.await?;
for (table, table_obj) in table_objs {
relations.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
update_internal_tables(
object_id,
object::Column::OwnerId,
Value::Int(Some(new_owner)),
)
}
ObjectType::Subscription => {
let subscription = Subscription::find_by_id(object_id)
Expand Down Expand Up @@ -1888,6 +1875,16 @@ impl CatalogController {
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));

// Note: For non-shared source, we don't update their state tables, which
// belongs to the MV.
if source.is_shared() {
update_internal_tables(
object_id,
object::Column::SchemaId,
Value::Int(Some(new_schema)),
)?;
}
}
ObjectType::Sink => {
let sink = Sink::find_by_id(object_id)
Expand All @@ -1901,36 +1898,11 @@ impl CatalogController {
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));

// internal tables.
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(&txn)
.await?;

if !internal_tables.is_empty() {
Object::update_many()
.col_expr(
object::Column::SchemaId,
SimpleExpr::Value(Value::Int(Some(new_schema))),
)
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(&txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(&txn)
.await?;
for (table, table_obj) in table_objs {
relations.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
}
update_internal_tables(
object_id,
object::Column::SchemaId,
Value::Int(Some(new_schema)),
)?;
}
ObjectType::Subscription => {
let subscription = Subscription::find_by_id(object_id)
Expand Down Expand Up @@ -2452,6 +2424,7 @@ impl CatalogController {
}};
}

// TODO: check is there any thing to change for shared source?
let old_name = match object_type {
ObjectType::Table => rename_relation!(Table, table, table_id, object_id),
ObjectType::Source => rename_relation!(Source, source, source_id, object_id),
Expand Down Expand Up @@ -3395,6 +3368,39 @@ impl CatalogControllerInner {
}
}

async fn update_internal_tables(
object_id: i32,
column: object::Column,
new_value: Value,
) -> MetaResult<()> {
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(&txn)
.await?;

if !internal_tables.is_empty() {
Object::update_many()
.col_expr(column, SimpleExpr::Value(new_value))
.filter(object::Column::Oid.is_in(internal_tables.clone()))
.exec(&txn)
.await?;

let table_objs = Table::find()
.find_also_related(Object)
.filter(table::Column::TableId.is_in(internal_tables))
.all(&txn)
.await?;
for (table, table_obj) in table_objs {
relations.push(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
));
}
}
}

#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
Expand Down

0 comments on commit 6fe0e51

Please sign in to comment.