这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/commands/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func ExecuteMigration(cmd string, t *migrate.Migrate, stepOrVersion int64) error
err = mig.UpCmd(t, stepOrVersion)
case "down":
err = mig.DownCmd(t, stepOrVersion)
case "gotoVersion":
err = mig.GotoVersionCmd(t, stepOrVersion)
case "version":
var direction string
if stepOrVersion >= 0 {
Expand Down
60 changes: 36 additions & 24 deletions cli/commands/migrate_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {
EC: ec,
}
migrateApplyCmd := &cobra.Command{
Use: "apply",
Short: "Apply migrations on the database",
Use: "apply",
Short: "Apply migrations on the database",
Example: ` # Apply all migrations
hasura migrate apply

Expand All @@ -42,6 +42,12 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {

# Apply only a particular version
hasura migrate apply --type up --version "<version>"

# Apply all up migrations upto version 125, last applied is 100
hasura migrate apply --goto 125

# Apply all down migrations upto version 125, last applied is 150
hasura migrate apply --goto 125

# Rollback a particular version:
hasura migrate apply --type down --version "<version>"
Expand All @@ -58,19 +64,33 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command {
err := opts.run()
opts.EC.Spinner.Stop()
if err != nil {
return err
if err == migrate.ErrNoChange {
opts.EC.Logger.Info("nothing to apply")
return nil
}
if e, ok := err.(*os.PathError); ok {
// If Op is first, then log No migrations to apply
if e.Op == "first" {
opts.EC.Logger.Info("nothing to apply")
return nil
}
}
return errors.Wrap(err, "apply failed")
}
opts.EC.Logger.Info("migrations applied")
return nil
},
}
f := migrateApplyCmd.Flags()
f.SortFlags = false

f.StringVar(&opts.upMigration, "up", "", "apply all or N up migration steps")
f.StringVar(&opts.downMigration, "down", "", "apply all or N down migration steps")
f.StringVar(&opts.gotoVersion, "goto", "", "apply migration chain up to to the version specified")

f.StringVar(&opts.versionMigration, "version", "", "only apply this particular migration")
f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag")
f.BoolVar(&opts.skipExecution, "skip-execution", false, "skip executing the migration action, but mark them as applied")
f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag")

f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine")
f.String("admin-secret", "", "admin secret for Hasura GraphQL Engine")
Expand All @@ -91,11 +111,13 @@ type migrateApplyOptions struct {
downMigration string
versionMigration string
migrationType string
skipExecution bool
// version up to which migration chain has to be applied
gotoVersion string
skipExecution bool
}

func (o *migrateApplyOptions) run() error {
migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.skipExecution)
migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.gotoVersion, o.skipExecution)
if err != nil {
return errors.Wrap(err, "error validating flags")
}
Expand All @@ -106,27 +128,12 @@ func (o *migrateApplyOptions) run() error {
}
migrateDrv.SkipExecution = o.skipExecution

err = ExecuteMigration(migrationType, migrateDrv, step)
if err != nil {
if err == migrate.ErrNoChange {
o.EC.Logger.Info("nothing to apply")
return nil
}
if e, ok := err.(*os.PathError); ok {
// If Op is first, then log No migrations to apply
if e.Op == "first" {
o.EC.Logger.Info("No migrations to apply")
return nil
}
}
return errors.Wrap(err, "apply failed")
}
return nil
return ExecuteMigration(migrationType, migrateDrv, step)
}

// Only one flag out of up, down and version can be set at a time. This function
// checks whether that is the case and returns an error is not
func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType string, skipExecution bool) (string, int64, error) {
func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType, gotoVersion string, skipExecution bool) (string, int64, error) {
var flagCount = 0
var stepString = "all"
var migrationName = "up"
Expand All @@ -147,6 +154,11 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra
}
flagCount++
}
if gotoVersion != "" {
migrationName = "gotoVersion"
stepString = gotoVersion
flagCount++
}

if flagCount > 1 {
return "", 0, errors.New("Only one migration type can be applied at a time (--up, --down or --goto)")
Expand All @@ -162,7 +174,7 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra

step, err := strconv.ParseInt(stepString, 10, 64)
if err != nil {
return "", 0, errors.Wrap(err, "not a valid input for steps")
return "", 0, errors.Wrap(err, "not a valid input for steps/version")
}
return migrationName, step, nil
}
4 changes: 4 additions & 0 deletions cli/migrate/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,7 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s

return
}

func GotoVersionCmd(m *migrate.Migrate, gotoVersion int64) error {
return m.GotoVersion(gotoVersion)
}
229 changes: 228 additions & 1 deletion cli/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/hasura/graphql-engine/cli/migrate/database"
"github.com/hasura/graphql-engine/cli/migrate/source"

Expand Down Expand Up @@ -1231,7 +1233,7 @@ func (m *Migrate) versionUpExists(version uint64) error {
// versionDownExists checks the source if either the up or down migration for
// the specified migration version exists.
func (m *Migrate) versionDownExists(version uint64) error {
// try up migration first
// try down migration first
directions := m.sourceDrv.GetDirections(version)
if !directions[source.Down] && !directions[source.MetaDown] {
return fmt.Errorf("%d down migration not found", version)
Expand Down Expand Up @@ -1470,3 +1472,228 @@ func (m *Migrate) unlockErr(prevErr error) error {
}
return prevErr
}

// GotoVersion will apply a version also applying the migration chain
// leading to it
func (m *Migrate) GotoVersion(gotoVersion int64) error {
mode, err := m.databaseDrv.GetSetting("migration_mode")
if err != nil {
return err
}
if mode != "true" {
return ErrNoMigrationMode
}

currentVersion, dirty, err := m.Version()
currVersion := int64(currentVersion)
if err != nil {
if err == ErrNilVersion {
currVersion = database.NilVersion
} else {
return errors.Wrap(err, "cannot determine version")
}
}
if dirty {
return ErrDirty{currVersion}
}

if err := m.lock(); err != nil {
return err
}

ret := make(chan interface{})
if currVersion <= gotoVersion {
go m.readUpFromVersion(-1, gotoVersion, ret)
} else if currVersion > gotoVersion {
go m.readDownFromVersion(currVersion, gotoVersion, ret)
}

return m.unlockErr(m.runMigrations(ret))

}

// readUpFromVersion reads up migrations from `from` limitted by `limit`. (is a modified version of readUp)
// limit can be -1, implying no limit and reading until there are no more migrations.
// Each migration is then written to the ret channel.
// If an error occurs during reading, that error is written to the ret channel, too.
// Once readUpFromVersion is done reading it will close the ret channel.
func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{}) {
defer close(ret)
var noOfAppliedMigrations int
for {
if m.stop() {
return
}
if from == to {
if noOfAppliedMigrations == 0 {
ret <- ErrNoChange
}
return
}

if from == -1 {
firstVersion, err := m.sourceDrv.First()
if err != nil {
ret <- err
return
}

// Check if this version present in DB
ok := m.databaseDrv.Read(firstVersion)
if ok {
from = int64(firstVersion)
continue
}

// Check if firstVersion files exists (yaml or sql)
if err = m.versionUpExists(firstVersion); err != nil {
ret <- err
return
}

migr, err := m.newMigration(firstVersion, int64(firstVersion))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()

migr, err = m.metanewMigration(firstVersion, int64(firstVersion))
if err != nil {
ret <- err
return
}
ret <- migr

go migr.Buffer()
from = int64(firstVersion)
noOfAppliedMigrations++
continue
}

// apply next migration
next, err := m.sourceDrv.Next(suint64(from))
if err != nil {
ret <- err
return
}

// Check if this version present in DB
ok := m.databaseDrv.Read(next)
if ok {
from = int64(next)
continue
}

// Check if next files exists (yaml or sql)
if err = m.versionUpExists(next); err != nil {
ret <- err
return
}

migr, err := m.newMigration(next, int64(next))
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()

migr, err = m.metanewMigration(next, int64(next))
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()
from = int64(next)
noOfAppliedMigrations++
}
}

// readDownFromVersion reads down migrations from `from` limitted by `limit`. (modified version of readDown)
// limit can be -1, implying no limit and reading until there are no more migrations.
// Each migration is then written to the ret channel.
// If an error occurs during reading, that error is written to the ret channel, too.
// Once readDownFromVersion is done reading it will close the ret channel.
func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface{}) {
defer close(ret)
var err error
var noOfAppliedMigrations int
for {
if m.stop() {
return
}

if from == to {
if noOfAppliedMigrations == 0 {
ret <- ErrNoChange
}
return
}

err = m.versionDownExists(suint64(from))
if err != nil {
ret <- err
return
}

prev, ok := m.databaseDrv.Prev(suint64(from))
if !ok {
// Check if any prev version available in source
prev, err = m.sourceDrv.Prev(suint64(from))
if os.IsNotExist(err) && to == -1 {
// apply nil migration
migr, err := m.metanewMigration(suint64(from), -1)
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()

migr, err = m.newMigration(suint64(from), -1)
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()

from = database.NilVersion
noOfAppliedMigrations++
continue
} else if err != nil {
ret <- err
return
}
ret <- fmt.Errorf("%v not applied on database", prev)
return
}

migr, err := m.metanewMigration(suint64(from), int64(prev))
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()

migr, err = m.newMigration(suint64(from), int64(prev))
if err != nil {
ret <- err
return
}

ret <- migr
go migr.Buffer()
from = int64(prev)
noOfAppliedMigrations++
}
}