diff --git a/cli/commands/migrate_apply.go b/cli/commands/migrate_apply.go index b46aad68fd515..9fc6e642ef1db 100644 --- a/cli/commands/migrate_apply.go +++ b/cli/commands/migrate_apply.go @@ -41,6 +41,7 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { f.StringVar(&opts.downMigration, "down", "", "apply all or N down migration steps") f.StringVar(&opts.versionMigration, "version", "", "only apply this particular migration") f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag") + f.StringVar(&opts.goTo, "goto", "", "migrate to a particular version") f.BoolVar(&opts.skipExecution, "skip-execution", false, "skip executing the migration action, but mark them as applied") f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine") @@ -63,10 +64,11 @@ type migrateApplyOptions struct { versionMigration string migrationType string skipExecution bool + goTo string } func (o *migrateApplyOptions) run() error { - migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.skipExecution) + migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.goTo, o.skipExecution) if err != nil { return errors.Wrap(err, "error validating flags") } @@ -97,7 +99,7 @@ func (o *migrateApplyOptions) run() error { // Only one flag out of up, down and version can be set at a time. This function // checks whether that is the case and returns an error is not -func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType string, skipExecution bool) (string, int64, error) { +func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType, goTo string, skipExecution bool) (string, int64, error) { var flagCount = 0 var stepString = "all" var migrationName = "up" @@ -119,6 +121,11 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra flagCount++ } + if goTo != "" { + migrationName = "goto" + stepString = goTo + } + if flagCount > 1 { return "", 0, errors.New("Only one migration type can be applied at a time (--up, --down or --goto)") } diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 60b512ba2cd07..59b9a3092e8ad 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -484,6 +484,27 @@ func (m *Migrate) Migrate(version uint64, direction string) error { return m.unlockErr(m.runMigrations(ret)) } +func (m *Migrate) MigrateTo(version uint64) error { + mode, err := m.databaseDrv.GetSetting("migration_mode") + if err != nil { + return err + } + + if mode != "true" { + return ErrNoMigrationMode + } + + if err := m.lock(); err != nil { + return err + } + + ret := make(chan interface{}, m.PrefetchMigrations) + go m.readTo(version, ret) + + return m.unlockErr(m.runMigrations(ret)) + +} + // Steps looks at the currently active migration version. // It will migrate up if n > 0, and down if n < 0. func (m *Migrate) Steps(n int64) error { @@ -833,6 +854,7 @@ func (m *Migrate) read(version uint64, direction string, ret chan<- interface{}) } } + // readUp reads up migrations from `from` limitted by `limit`. // limit can be -1, implying no limit and reading until there are no more migrations. // Each migration is then written to the ret channel. @@ -960,6 +982,65 @@ func (m *Migrate) readUp(limit int64, ret chan<- interface{}) { } } +func (m *Migrate) readTo(to uint64, ret chan<- interface{}) { + defer close(ret) + + directions := m.sourceDrv.GetDirections(to) + + from, _, err := m.databaseDrv.Version() + if err != nil { + ret <- err + return + } + + curr := uint64(from) + + for curr != to { + if m.stop() { + return + } + + err = m.versionDownExists(curr) + if err != nil { + ret <- err + return + } + + next, ok := curr, true + if directions[source.Up] { + next, err = m.sourceDrv.Next(curr) + if err != nil { + ret <- err + return + } + } else { + next, ok = m.databaseDrv.Prev(curr) + if !ok { + return + } + } + + migr, err := m.metanewMigration(uint64(int64(curr)), int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(curr, int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + curr = next + } +} + // readDown reads down migrations from `from` limitted by `limit`. // limit can be -1, implying no limit and reading until there are no more migrations. // Each migration is then written to the ret channel.