Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql-backend): change json column type to blob to allow proto field rename (#16090) #16275

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ message Subscription {

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
}

message Connection {
Expand Down
13 changes: 7 additions & 6 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_meta::stream::TableRevision;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_model_v2::catalog_version::VersionCategory;
use risingwave_meta_model_v2::compaction_status::LevelHandlers;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::hummock_sequence::{
COMPACTION_GROUP_ID, COMPACTION_TASK_ID, META_BACKUP_ID, SSTABLE_OBJECT_ID,
};
Expand Down Expand Up @@ -435,7 +434,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
table.schema_id = *schema_rewrite.get(&table.schema_id).unwrap();
});
let mut fragment = fragment.into_active_model();
fragment.stream_node = Set(StreamNode::from_protobuf(&stream_node));
fragment.stream_node = Set((&stream_node).into());
Fragment::insert(fragment)
.exec(&meta_store_sql.conn)
.await?;
Expand Down Expand Up @@ -683,7 +682,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
max_committed_epoch: Set(vd.max_committed_epoch as _),
safe_epoch: Set(vd.safe_epoch as _),
trivial_move: Set(vd.trivial_move),
full_version_delta: Set(vd.to_protobuf().into()),
full_version_delta: Set((&vd.to_protobuf()).into()),
})
.collect_vec(),
)
Expand Down Expand Up @@ -716,7 +715,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cg| compaction_config::ActiveModel {
compaction_group_id: Set(cg.group_id as _),
config: Set((*cg.compaction_config).clone().into()),
config: Set((&*cg.compaction_config).into()),
})
.collect_vec(),
)
Expand All @@ -733,7 +732,9 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.into_iter()
.map(|cs| compaction_status::ActiveModel {
compaction_group_id: Set(cs.compaction_group_id as _),
status: Set(LevelHandlers(cs.level_handlers.iter().map_into().collect())),
status: Set(LevelHandlers::from(
cs.level_handlers.iter().map_into().collect_vec(),
)),
})
.collect_vec(),
)
Expand All @@ -751,7 +752,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
compaction_task::ActiveModel {
id: Set(task.task_id as _),
context_id: Set(context_id as _),
task: Set(task.into()),
task: Set((&task).into()),
}
}))
.exec(&meta_store_sql.conn)
Expand Down
11 changes: 10 additions & 1 deletion src/meta/model_v2/migration/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Running Migrator CLI

> **WARNING:** Migration files are used to define schema changes for the database. Each migration file contains an up and down function,
> which are used to define upgrade and downgrade operations for the schema.
>
> When you need to make schema changes to the system catalog, you need to generate a new migration file and then apply it to the database.
> Note that each migration file can only be applied once and will be recorded in a system table, so for new schema changes, you need to
> generate a new migration file. Unless you are sure the modification of the migration file has not been included in any released version yet,
> **DO NOT** modify already published migration files.

## How to run the migrator CLI
- Generate a new migration file
```sh
cargo run -- generate MIGRATION_NAME
```
- Apply all pending migrations
- Apply all pending migrations for test purposes, `DATABASE_URL` required.
```sh
cargo run
```
Expand Down
57 changes: 23 additions & 34 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
.col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
.col(ColumnDef::new(User::CanLogin).boolean().not_null())
.col(ColumnDef::new(User::AuthInfo).json_binary())
.col(ColumnDef::new(User::AuthInfo).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -381,11 +381,7 @@ impl MigrationTrait for Migration {
.blob(BlobSize::Long)
.not_null(),
)
.col(
ColumnDef::new(Fragment::VnodeMapping)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Fragment::VnodeMapping).binary().not_null())
.col(ColumnDef::new(Fragment::StateTableIds).json_binary())
.col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
.foreign_key(
Expand All @@ -411,12 +407,12 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Actor::FragmentId).integer().not_null())
.col(ColumnDef::new(Actor::Status).string().not_null())
.col(ColumnDef::new(Actor::Splits).json_binary())
.col(ColumnDef::new(Actor::Splits).binary())
.col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
.col(ColumnDef::new(Actor::WorkerId).integer().not_null())
.col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
.col(ColumnDef::new(Actor::VnodeBitmap).json_binary())
.col(ColumnDef::new(Actor::ExprContext).json_binary().not_null())
.col(ColumnDef::new(Actor::VnodeBitmap).binary())
.col(ColumnDef::new(Actor::ExprContext).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_actor_fragment_id")
Expand Down Expand Up @@ -458,7 +454,7 @@ impl MigrationTrait for Migration {
.json_binary()
.not_null(),
)
.col(ColumnDef::new(ActorDispatcher::HashMapping).json_binary())
.col(ColumnDef::new(ActorDispatcher::HashMapping).binary())
.col(
ColumnDef::new(ActorDispatcher::DispatcherId)
.integer()
Expand Down Expand Up @@ -499,7 +495,7 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Connection::Name).string().not_null())
.col(ColumnDef::new(Connection::Info).json_binary().not_null())
.col(ColumnDef::new(Connection::Info).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_connection_object_id")
Expand All @@ -518,20 +514,16 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Source::SourceId).integer().primary_key())
.col(ColumnDef::new(Source::Name).string().not_null())
.col(ColumnDef::new(Source::RowIdIndex).integer())
.col(ColumnDef::new(Source::Columns).json_binary().not_null())
.col(ColumnDef::new(Source::Columns).binary().not_null())
.col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
.col(
ColumnDef::new(Source::WithProperties)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::Definition).text().not_null())
.col(ColumnDef::new(Source::SourceInfo).json_binary())
.col(
ColumnDef::new(Source::WatermarkDescs)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Source::SourceInfo).binary())
.col(ColumnDef::new(Source::WatermarkDescs).binary().not_null())
.col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
.col(ColumnDef::new(Source::ConnectionId).integer())
.col(ColumnDef::new(Source::Version).big_integer().not_null())
Expand Down Expand Up @@ -570,8 +562,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
.col(ColumnDef::new(Table::TableType).string().not_null())
.col(ColumnDef::new(Table::BelongsToJobId).integer())
.col(ColumnDef::new(Table::Columns).json_binary().not_null())
.col(ColumnDef::new(Table::Pk).json_binary().not_null())
.col(ColumnDef::new(Table::Columns).binary().not_null())
.col(ColumnDef::new(Table::Pk).binary().not_null())
.col(
ColumnDef::new(Table::DistributionKey)
.json_binary()
Expand Down Expand Up @@ -601,14 +593,14 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
.col(ColumnDef::new(Table::DmlFragmentId).integer())
.col(ColumnDef::new(Table::Cardinality).json_binary())
.col(ColumnDef::new(Table::Cardinality).binary())
.col(
ColumnDef::new(Table::CleanedByWatermark)
.boolean()
.not_null(),
)
.col(ColumnDef::new(Table::Description).string())
.col(ColumnDef::new(Table::Version).json_binary())
.col(ColumnDef::new(Table::Version).binary())
.col(ColumnDef::new(Table::RetentionSeconds).integer())
.col(
ColumnDef::new(Table::IncomingSinks)
Expand Down Expand Up @@ -650,7 +642,8 @@ impl MigrationTrait for Migration {
&mut ForeignKey::create()
.name("FK_table_optional_associated_source_id")
.from(Table::Table, Table::OptionalAssociatedSourceId)
.to(Source::Table, Source::SourceId)
.to(Object::Table, Object::Oid)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.to_owned(),
Expand All @@ -662,8 +655,8 @@ impl MigrationTrait for Migration {
.table(Sink::Table)
.col(ColumnDef::new(Sink::SinkId).integer().primary_key())
.col(ColumnDef::new(Sink::Name).string().not_null())
.col(ColumnDef::new(Sink::Columns).json_binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).json_binary().not_null())
.col(ColumnDef::new(Sink::Columns).binary().not_null())
.col(ColumnDef::new(Sink::PlanPk).binary().not_null())
.col(
ColumnDef::new(Sink::DistributionKey)
.json_binary()
Expand All @@ -676,7 +669,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Sink::ConnectionId).integer())
.col(ColumnDef::new(Sink::DbName).string().not_null())
.col(ColumnDef::new(Sink::SinkFromName).string().not_null())
.col(ColumnDef::new(Sink::SinkFormatDesc).json_binary())
.col(ColumnDef::new(Sink::SinkFormatDesc).binary())
.col(ColumnDef::new(Sink::TargetTable).integer())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -711,7 +704,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(View::Name).string().not_null())
.col(ColumnDef::new(View::Properties).json_binary().not_null())
.col(ColumnDef::new(View::Definition).text().not_null())
.col(ColumnDef::new(View::Columns).json_binary().not_null())
.col(ColumnDef::new(View::Columns).binary().not_null())
.foreign_key(
&mut ForeignKey::create()
.name("FK_view_object_id")
Expand All @@ -731,7 +724,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Index::Name).string().not_null())
.col(ColumnDef::new(Index::IndexTableId).integer().not_null())
.col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
.col(ColumnDef::new(Index::IndexItems).json_binary().not_null())
.col(ColumnDef::new(Index::IndexItems).binary().not_null())
.col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
.foreign_key(
&mut ForeignKey::create()
Expand Down Expand Up @@ -767,12 +760,8 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::FunctionId).integer().primary_key())
.col(ColumnDef::new(Function::Name).string().not_null())
.col(ColumnDef::new(Function::ArgNames).string().not_null())
.col(ColumnDef::new(Function::ArgTypes).json_binary().not_null())
.col(
ColumnDef::new(Function::ReturnType)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Function::ArgTypes).binary().not_null())
.col(ColumnDef::new(Function::ReturnType).binary().not_null())
.col(ColumnDef::new(Function::Language).string().not_null())
.col(ColumnDef::new(Function::Link).string())
.col(ColumnDef::new(Function::Identifier).string())
Expand Down
12 changes: 4 additions & 8 deletions src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(CompactionTask::Task)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(CompactionTask::Task).binary().not_null())
.col(
ColumnDef::new(CompactionTask::ContextId)
.integer()
Expand All @@ -54,7 +50,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionConfig::Config).json_binary())
.col(ColumnDef::new(CompactionConfig::Config).binary())
.to_owned(),
)
.await?;
Expand All @@ -69,7 +65,7 @@ impl MigrationTrait for Migration {
.not_null()
.primary_key(),
)
.col(ColumnDef::new(CompactionStatus::Status).json_binary())
.col(ColumnDef::new(CompactionStatus::Status).binary())
.to_owned(),
)
.await?;
Expand Down Expand Up @@ -142,7 +138,7 @@ impl MigrationTrait for Migration {
.boolean()
.not_null(),
)
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).json_binary())
.col(ColumnDef::new(HummockVersionDelta::FullVersionDelta).binary())
.to_owned(),
)
.await?;
Expand Down
18 changes: 8 additions & 10 deletions src/meta/model_v2/migration/src/m20240304_074901_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@ impl MigrationTrait for Migration {
.primary_key(),
)
.col(ColumnDef::new(Subscription::Name).string().not_null())
.col(
ColumnDef::new(Subscription::Columns)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(Subscription::PlanPk)
.json_binary()
.not_null(),
)
.col(ColumnDef::new(Subscription::Columns).binary().not_null())
.col(ColumnDef::new(Subscription::PlanPk).binary().not_null())
.col(
ColumnDef::new(Subscription::DistributionKey)
.json_binary()
Expand All @@ -40,6 +32,11 @@ impl MigrationTrait for Migration {
.not_null(),
)
.col(ColumnDef::new(Subscription::Definition).string().not_null())
.col(
ColumnDef::new(Subscription::SubscriptionFromName)
.string()
.not_null(),
)
.to_owned(),
)
.await?;
Expand All @@ -63,4 +60,5 @@ enum Subscription {
DistributionKey,
Properties,
Definition,
SubscriptionFromName,
}
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/actor_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl From<(u32, PbDispatcher)> for Model {
dispatcher_type: dispatcher.r#type().into(),
dist_key_indices: dispatcher.dist_key_indices.into(),
output_indices: dispatcher.output_indices.into(),
hash_mapping: dispatcher.hash_mapping.map(ActorMapping),
hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from),
dispatcher_id: dispatcher.dispatcher_id as _,
downstream_actor_ids: dispatcher.downstream_actor_id.into(),
}
Expand All @@ -74,7 +74,7 @@ impl From<Model> for PbDispatcher {
r#type: PbDispatcherType::from(model.dispatcher_type) as _,
dist_key_indices: model.dist_key_indices.into_u32_array(),
output_indices: model.output_indices.into_u32_array(),
hash_mapping: model.hash_mapping.map(|mapping| mapping.into_inner()),
hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()),
dispatcher_id: model.dispatcher_id as _,
downstream_actor_id: model.downstream_actor_ids.into_u32_array(),
}
Expand Down
3 changes: 1 addition & 2 deletions src/meta/model_v2/src/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_pb::hummock::CompactionConfig as PbCompactionConfig;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;
Expand All @@ -32,4 +31,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(CompactionConfig, PbCompactionConfig);
crate::derive_from_blob!(CompactionConfig, PbCompactionConfig);
4 changes: 1 addition & 3 deletions src/meta/model_v2/src/compaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use risingwave_pb::hummock::LevelHandler as PbLevelHandler;
use sea_orm::entity::prelude::*;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};

use crate::CompactionGroupId;

Expand All @@ -32,4 +30,4 @@ pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

crate::derive_from_json_struct!(LevelHandlers, Vec<PbLevelHandler>);
crate::derive_array_from_blob!(LevelHandlers, PbLevelHandler, PbLevelHandlerArray);
Loading
Loading