From 6afdfa8b6981580494b1c3039e73b39b6e5e372c Mon Sep 17 00:00:00 2001 From: PeiYu Cui Date: Thu, 19 Dec 2024 14:43:29 +0800 Subject: [PATCH] feat: migration with group name --- sea-orm-cli/src/cli.rs | 11 +++ sea-orm-cli/src/commands/migrate.rs | 4 + sea-orm-migration/src/cli.rs | 27 +++++-- sea-orm-migration/src/migrator.rs | 99 +++++++++++++++-------- sea-orm-migration/src/seaql_migrations.rs | 1 + sea-orm-migration/tests/main.rs | 45 ++++++----- 6 files changed, 127 insertions(+), 60 deletions(-) diff --git a/sea-orm-cli/src/cli.rs b/sea-orm-cli/src/cli.rs index 4c771da00..aef894410 100644 --- a/sea-orm-cli/src/cli.rs +++ b/sea-orm-cli/src/cli.rs @@ -98,6 +98,15 @@ you should provide the directory of that submodule.", )] database_url: Option, + #[arg( + global = true, + short = 'g', + long, + env = "MIGRATION_GROUP", + help = "Migration group name, defaults to the 'default' group." + )] + group: Option, + #[command(subcommand)] command: Option, }, @@ -333,12 +342,14 @@ pub async fn main() { migration_dir, database_schema, database_url, + group, command, } => run_migrate_command( command, &migration_dir, database_schema, database_url, + group, verbose, ) .unwrap_or_else(handle_error), diff --git a/sea-orm-cli/src/commands/migrate.rs b/sea-orm-cli/src/commands/migrate.rs index ce16ebb8b..1997a9ef6 100644 --- a/sea-orm-cli/src/commands/migrate.rs +++ b/sea-orm-cli/src/commands/migrate.rs @@ -18,6 +18,7 @@ pub fn run_migrate_command( migration_dir: &str, database_schema: Option, database_url: Option, + group: Option, verbose: bool, ) -> Result<(), Box> { match command { @@ -62,6 +63,9 @@ pub fn run_migrate_command( if let Some(database_schema) = &database_schema { args.extend(["-s", database_schema]); } + if let Some(group) = &group { + args.extend(["-g", group]); + } if verbose { args.push("-v"); } diff --git a/sea-orm-migration/src/cli.rs b/sea-orm-migration/src/cli.rs index aa0d3668b..ef2120315 100644 --- a/sea-orm-migration/src/cli.rs +++ b/sea-orm-migration/src/cli.rs @@ -21,6 +21,7 @@ where .database_url .expect("Environment variable 'DATABASE_URL' not set"); let schema = cli.database_schema.unwrap_or_else(|| "public".to_owned()); + let group = cli.group.unwrap_or_else(|| "default".to_owned()); let connect_options = ConnectOptions::new(url) .set_schema_search_path(schema) @@ -29,7 +30,7 @@ where .await .expect("Fail to acquire database connection"); - run_migrate(migrator, db, cli.command, cli.verbose) + run_migrate(migrator, db, &group, cli.command, cli.verbose) .await .unwrap_or_else(handle_error); } @@ -37,6 +38,7 @@ where pub async fn run_migrate( _: M, db: &DbConn, + group: &str, command: Option, verbose: bool, ) -> Result<(), Box> @@ -68,19 +70,19 @@ where }; match command { - Some(MigrateSubcommands::Fresh) => M::fresh(db).await?, - Some(MigrateSubcommands::Refresh) => M::refresh(db).await?, - Some(MigrateSubcommands::Reset) => M::reset(db).await?, - Some(MigrateSubcommands::Status) => M::status(db).await?, - Some(MigrateSubcommands::Up { num }) => M::up(db, num).await?, - Some(MigrateSubcommands::Down { num }) => M::down(db, Some(num)).await?, + Some(MigrateSubcommands::Fresh) => M::fresh(db, group).await?, + Some(MigrateSubcommands::Refresh) => M::refresh(db, group).await?, + Some(MigrateSubcommands::Reset) => M::reset(db, group).await?, + Some(MigrateSubcommands::Status) => M::status(db, group).await?, + Some(MigrateSubcommands::Up { num }) => M::up(db, group, num).await?, + Some(MigrateSubcommands::Down { num }) => M::down(db, group, Some(num)).await?, Some(MigrateSubcommands::Init) => run_migrate_init(MIGRATION_DIR)?, Some(MigrateSubcommands::Generate { migration_name, universal_time: _, local_time, }) => run_migrate_generate(MIGRATION_DIR, &migration_name, !local_time)?, - _ => M::up(db, None).await?, + _ => M::up(db, group, None).await?, }; Ok(()) @@ -112,6 +114,15 @@ pub struct Cli { )] database_url: Option, + #[arg( + global = true, + short = 'g', + long, + env = "MIGRATION_GROUP", + help = "Migration group name, defaults to the 'default' group." + )] + group: Option, + #[command(subcommand)] command: Option, } diff --git a/sea-orm-migration/src/migrator.rs b/sea-orm-migration/src/migrator.rs index 05329e5fe..33a6ebc73 100644 --- a/sea-orm-migration/src/migrator.rs +++ b/sea-orm-migration/src/migrator.rs @@ -9,11 +9,7 @@ use sea_orm::sea_query::{ self, extension::postgres::Type, Alias, Expr, ForeignKey, IntoIden, JoinType, Order, Query, SelectStatement, SimpleExpr, Table, }; -use sea_orm::{ - ActiveModelTrait, ActiveValue, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden, - DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement, - TransactionTrait, -}; +use sea_orm::{ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden, DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement, TransactionTrait}; use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite}; use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager}; @@ -77,13 +73,17 @@ pub trait MigratorTrait: Send { } /// Get list of applied migrations from database - async fn get_migration_models(db: &C) -> Result, DbErr> + async fn get_migration_models( + db: &C, + group: &str, + ) -> Result, DbErr> where C: ConnectionTrait, { Self::install(db).await?; let stmt = Query::select() .table_name(Self::migration_table_name()) + .cond_where(seaql_migrations::Column::Group.eq(group)) .columns(seaql_migrations::Column::iter().map(IntoIden::into_iden)) .order_by(seaql_migrations::Column::Version, Order::Asc) .to_owned(); @@ -94,13 +94,13 @@ pub trait MigratorTrait: Send { } /// Get list of migrations with status - async fn get_migration_with_status(db: &C) -> Result, DbErr> + async fn get_migration_with_status(db: &C, group: &str) -> Result, DbErr> where C: ConnectionTrait, { Self::install(db).await?; let mut migration_files = Self::get_migration_files(); - let migration_models = Self::get_migration_models(db).await?; + let migration_models = Self::get_migration_models(db, group).await?; let migration_in_db: HashSet = migration_models .into_iter() @@ -133,12 +133,12 @@ pub trait MigratorTrait: Send { } /// Get list of pending migrations - async fn get_pending_migrations(db: &C) -> Result, DbErr> + async fn get_pending_migrations(db: &C, group: &str) -> Result, DbErr> where C: ConnectionTrait, { Self::install(db).await?; - Ok(Self::get_migration_with_status(db) + Ok(Self::get_migration_with_status(db, group) .await? .into_iter() .filter(|file| file.status == MigrationStatus::Pending) @@ -146,12 +146,12 @@ pub trait MigratorTrait: Send { } /// Get list of applied migrations - async fn get_applied_migrations(db: &C) -> Result, DbErr> + async fn get_applied_migrations(db: &C, group: &str) -> Result, DbErr> where C: ConnectionTrait, { Self::install(db).await?; - Ok(Self::get_migration_with_status(db) + Ok(Self::get_migration_with_status(db, group) .await? .into_iter() .filter(|file| file.status == MigrationStatus::Applied) @@ -174,7 +174,7 @@ pub trait MigratorTrait: Send { } /// Check the status of all migrations - async fn status(db: &C) -> Result<(), DbErr> + async fn status(db: &C, group: &str) -> Result<(), DbErr> where C: ConnectionTrait, { @@ -182,67 +182,90 @@ pub trait MigratorTrait: Send { info!("Checking migration status"); - for Migration { migration, status } in Self::get_migration_with_status(db).await? { - info!("Migration '{}'... {}", migration.name(), status); + for Migration { migration, status } in Self::get_migration_with_status(db, group).await? { + info!( + "Migration '{}' => '{}'... {}", + group, + migration.name(), + status + ); } Ok(()) } /// Drop all tables from the database, then reapply all migrations - async fn fresh<'c, C>(db: C) -> Result<(), DbErr> + async fn fresh<'c, C, G: Into + Send>(db: C, group: G) -> Result<(), DbErr> where C: IntoSchemaManagerConnection<'c>, { + let group = group.into(); exec_with_connection::<'_, _, _>(db, move |manager| { - Box::pin(async move { exec_fresh::(manager).await }) + let group = group.clone(); + Box::pin(async move { exec_fresh::(manager, &group).await }) }) .await } /// Rollback all applied migrations, then reapply all migrations - async fn refresh<'c, C>(db: C) -> Result<(), DbErr> + async fn refresh<'c, C, G: Into + Send>(db: C, group: G) -> Result<(), DbErr> where C: IntoSchemaManagerConnection<'c>, { + let group = group.into(); exec_with_connection::<'_, _, _>(db, move |manager| { + let group = group.clone(); Box::pin(async move { - exec_down::(manager, None).await?; - exec_up::(manager, None).await + exec_down::(manager, &group, None).await?; + exec_up::(manager, &group, None).await }) }) .await } /// Rollback all applied migrations - async fn reset<'c, C>(db: C) -> Result<(), DbErr> + async fn reset<'c, C, G: Into + Send>(db: C, group: G) -> Result<(), DbErr> where C: IntoSchemaManagerConnection<'c>, { + let group = group.into(); exec_with_connection::<'_, _, _>(db, move |manager| { - Box::pin(async move { exec_down::(manager, None).await }) + let group = group.clone(); + Box::pin(async move { exec_down::(manager, &group, None).await }) }) .await } /// Apply pending migrations - async fn up<'c, C>(db: C, steps: Option) -> Result<(), DbErr> + async fn up<'c, C, G: Into + Send>( + db: C, + group: G, + steps: Option, + ) -> Result<(), DbErr> where C: IntoSchemaManagerConnection<'c>, { + let group = group.into(); exec_with_connection::<'_, _, _>(db, move |manager| { - Box::pin(async move { exec_up::(manager, steps).await }) + let group = group.clone(); + Box::pin(async move { exec_up::(manager, &group, steps).await }) }) .await } /// Rollback applied migrations - async fn down<'c, C>(db: C, steps: Option) -> Result<(), DbErr> + async fn down<'c, C, G: Into + Send>( + db: C, + group: G, + steps: Option, + ) -> Result<(), DbErr> where C: IntoSchemaManagerConnection<'c>, { + let group = group.into(); exec_with_connection::<'_, _, _>(db, move |manager| { - Box::pin(async move { exec_down::(manager, steps).await }) + let group = group.clone(); + Box::pin(async move { exec_down::(manager, &group, steps).await }) }) .await } @@ -271,7 +294,7 @@ where } } -async fn exec_fresh(manager: &SchemaManager<'_>) -> Result<(), DbErr> +async fn exec_fresh(manager: &SchemaManager<'_>, group: &str) -> Result<(), DbErr> where M: MigratorTrait + ?Sized, { @@ -353,10 +376,14 @@ where } // Reapply all migrations - exec_up::(manager, None).await + exec_up::(manager, group, None).await } -async fn exec_up(manager: &SchemaManager<'_>, mut steps: Option) -> Result<(), DbErr> +async fn exec_up( + manager: &SchemaManager<'_>, + group: &str, + mut steps: Option, +) -> Result<(), DbErr> where M: MigratorTrait + ?Sized, { @@ -370,7 +397,7 @@ where info!("Applying all pending migrations"); } - let migrations = M::get_pending_migrations(db).await?.into_iter(); + let migrations = M::get_pending_migrations(db, group).await?.into_iter(); if migrations.len() == 0 { info!("No pending migrations"); } @@ -388,6 +415,7 @@ where .duration_since(SystemTime::UNIX_EPOCH) .expect("SystemTime before UNIX EPOCH!"); seaql_migrations::Entity::insert(seaql_migrations::ActiveModel { + group: ActiveValue::Set(group.to_owned()), version: ActiveValue::Set(migration.name().to_owned()), applied_at: ActiveValue::Set(now.as_secs() as i64), }) @@ -399,7 +427,11 @@ where Ok(()) } -async fn exec_down(manager: &SchemaManager<'_>, mut steps: Option) -> Result<(), DbErr> +async fn exec_down( + manager: &SchemaManager<'_>, + group: &str, + mut steps: Option, +) -> Result<(), DbErr> where M: MigratorTrait + ?Sized, { @@ -413,7 +445,10 @@ where info!("Rolling back all applied migrations"); } - let migrations = M::get_applied_migrations(db).await?.into_iter().rev(); + let migrations = M::get_applied_migrations(db, group) + .await? + .into_iter() + .rev(); if migrations.len() == 0 { info!("No applied migrations"); } diff --git a/sea-orm-migration/src/seaql_migrations.rs b/sea-orm-migration/src/seaql_migrations.rs index 51da93009..9fcb869f7 100644 --- a/sea-orm-migration/src/seaql_migrations.rs +++ b/sea-orm-migration/src/seaql_migrations.rs @@ -4,6 +4,7 @@ use sea_orm::entity::prelude::*; // One should override the name of migration table via `MigratorTrait::migration_table_name` method #[sea_orm(table_name = "seaql_migrations")] pub struct Model { + pub group: String, #[sea_orm(primary_key, auto_increment = false)] pub version: String, pub applied_at: i64, diff --git a/sea-orm-migration/tests/main.rs b/sea-orm-migration/tests/main.rs index 26d579f79..73ac038ef 100644 --- a/sea-orm-migration/tests/main.rs +++ b/sea-orm-migration/tests/main.rs @@ -12,13 +12,15 @@ async fn main() -> Result<(), DbErr> { .init(); let url = &std::env::var("DATABASE_URL").expect("Environment variable 'DATABASE_URL' not set"); + let group = "default"; - run_migration(url, default::Migrator, "sea_orm_migration", "public").await?; + run_migration(url, default::Migrator, "sea_orm_migration", "public", group).await?; run_migration( url, default::Migrator, "sea_orm_migration_schema", "my_schema", + group, ) .await?; @@ -27,6 +29,7 @@ async fn main() -> Result<(), DbErr> { override_migration_table_name::Migrator, "sea_orm_migration_table_name", "public", + group, ) .await?; run_migration( @@ -34,6 +37,7 @@ async fn main() -> Result<(), DbErr> { override_migration_table_name::Migrator, "sea_orm_migration_table_name_schema", "my_schema", + group, ) .await?; @@ -45,6 +49,7 @@ async fn run_migration( _: Migrator, db_name: &str, schema: &str, + group: &str, ) -> Result<(), DbErr> where Migrator: MigratorTrait, @@ -98,7 +103,7 @@ where let manager = SchemaManager::new(db); println!("\nMigrator::status"); - Migrator::status(db).await?; + Migrator::status(db, group).await?; println!("\nMigrator::install"); Migrator::install(db).await?; @@ -111,22 +116,22 @@ where } println!("\nMigrator::reset"); - Migrator::reset(db).await?; + Migrator::reset(db, group).await?; assert!(!manager.has_table("cake").await?); assert!(!manager.has_table("fruit").await?); println!("\nMigrator::up"); - Migrator::up(db, Some(0)).await?; + Migrator::up(db, group, Some(0)).await?; assert!(!manager.has_table("cake").await?); assert!(!manager.has_table("fruit").await?); println!("\nMigrator::up"); - Migrator::up(db, Some(1)).await?; + Migrator::up(db, group, Some(1)).await?; println!("\nMigrator::get_pending_migrations"); - let migrations = Migrator::get_pending_migrations(db).await?; + let migrations = Migrator::get_pending_migrations(db, group).await?; assert_eq!(migrations.len(), 5); let migration = migrations.get(0).unwrap(); @@ -137,13 +142,13 @@ where assert!(!manager.has_table("fruit").await?); println!("\nMigrator::down"); - Migrator::down(db, Some(0)).await?; + Migrator::down(db, group, Some(0)).await?; assert!(manager.has_table("cake").await?); assert!(!manager.has_table("fruit").await?); println!("\nMigrator::down"); - Migrator::down(db, Some(1)).await?; + Migrator::down(db, group, Some(1)).await?; assert!(!manager.has_table("cake").await?); assert!(!manager.has_table("fruit").await?); @@ -158,14 +163,14 @@ where // Should throw an error println!("\nMigrator::up"); assert_eq!( - Migrator::up(db, None).await, + Migrator::up(db, group, None).await, Err(DbErr::Migration( "Abort migration and rollback changes".into() )) ); println!("\nMigrator::status"); - Migrator::status(db).await?; + Migrator::status(db, group).await?; // Check migrations have been rolled back assert!(!manager.has_table("cake").await?); @@ -176,10 +181,10 @@ where } println!("\nMigrator::up"); - Migrator::up(db, None).await?; + Migrator::up(db, group, None).await?; println!("\nMigrator::get_applied_migrations"); - let migrations = Migrator::get_applied_migrations(db).await?; + let migrations = Migrator::get_applied_migrations(db, group).await?; assert_eq!(migrations.len(), 6); assert!(!manager.has_index("cake", "non_existent_index").await?); @@ -190,7 +195,7 @@ where assert_eq!(migration.status(), MigrationStatus::Applied); println!("\nMigrator::status"); - Migrator::status(db).await?; + Migrator::status(db, group).await?; assert!(manager.has_table("cake").await?); assert!(manager.has_table("fruit").await?); @@ -208,14 +213,14 @@ where // Should throw an error println!("\nMigrator::down"); assert_eq!( - Migrator::down(db, None).await, + Migrator::down(db, group, None).await, Err(DbErr::Migration( "Abort migration and rollback changes".into() )) ); println!("\nMigrator::status"); - Migrator::status(db).await?; + Migrator::status(db, group).await?; // Check migrations have been rolled back assert!(manager.has_table("cake").await?); @@ -226,7 +231,7 @@ where } println!("\nMigrator::down"); - Migrator::down(db, None).await?; + Migrator::down(db, group, None).await?; assert!(manager.has_table(migration_table_name).await?); if migration_table_name != "seaql_migrations" { @@ -237,25 +242,25 @@ where assert!(!manager.has_table("fruit").await?); println!("\nMigrator::fresh"); - Migrator::fresh(db).await?; + Migrator::fresh(db, group).await?; assert!(manager.has_table("cake").await?); assert!(manager.has_table("fruit").await?); println!("\nMigrator::refresh"); - Migrator::refresh(db).await?; + Migrator::refresh(db, group).await?; assert!(manager.has_table("cake").await?); assert!(manager.has_table("fruit").await?); println!("\nMigrator::reset"); - Migrator::reset(db).await?; + Migrator::reset(db, group).await?; assert!(!manager.has_table("cake").await?); assert!(!manager.has_table("fruit").await?); println!("\nMigrator::status"); - Migrator::status(db).await?; + Migrator::status(db, group).await?; Ok(()) }