From f2b4c68fa94c0fcd0f3b41f75995d4a4be296e57 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 14 Jan 2020 18:46:05 +0530 Subject: [PATCH 01/13] cli: add --goto flag to migrate command --- cli/commands/migrate.go | 2 ++ cli/commands/migrate_apply.go | 22 ++++++++++++++------ cli/migrate/cmd/commands.go | 39 +++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 6 deletions(-) diff --git a/cli/commands/migrate.go b/cli/commands/migrate.go index 15ddfb1b4480d..d56b01ea6549d 100644 --- a/cli/commands/migrate.go +++ b/cli/commands/migrate.go @@ -54,6 +54,8 @@ func ExecuteMigration(cmd string, t *migrate.Migrate, stepOrVersion int64) error err = mig.UpCmd(t, stepOrVersion) case "down": err = mig.DownCmd(t, stepOrVersion) + case "gotoVersion": + err = mig.GotoVersionCmd(t, uint64(stepOrVersion)) case "version": var direction string if stepOrVersion >= 0 { diff --git a/cli/commands/migrate_apply.go b/cli/commands/migrate_apply.go index bf49aa2540973..0fa21e00f2568 100644 --- a/cli/commands/migrate_apply.go +++ b/cli/commands/migrate_apply.go @@ -17,8 +17,8 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { EC: ec, } migrateApplyCmd := &cobra.Command{ - Use: "apply", - Short: "Apply migrations on the database", + Use: "apply", + Short: "Apply migrations on the database", Example: ` # Apply all migrations hasura migrate apply @@ -65,12 +65,15 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { }, } f := migrateApplyCmd.Flags() + f.SortFlags = false f.StringVar(&opts.upMigration, "up", "", "apply all or N up migration steps") f.StringVar(&opts.downMigration, "down", "", "apply all or N down migration steps") + f.StringVar(&opts.gotoVersion, "goto", "", "apply migration chain up to to the version specified") + f.StringVar(&opts.versionMigration, "version", "", "only apply this particular migration") - f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag") f.BoolVar(&opts.skipExecution, "skip-execution", false, "skip executing the migration action, but mark them as applied") + f.StringVar(&opts.migrationType, "type", "up", "type of migration (up, down) to be used with version flag") f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine") f.String("admin-secret", "", "admin secret for Hasura GraphQL Engine") @@ -91,11 +94,13 @@ type migrateApplyOptions struct { downMigration string versionMigration string migrationType string - skipExecution bool + // version up to which migration chain has to be applied + gotoVersion string + skipExecution bool } func (o *migrateApplyOptions) run() error { - migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.skipExecution) + migrationType, step, err := getMigrationTypeAndStep(o.upMigration, o.downMigration, o.versionMigration, o.migrationType, o.gotoVersion, o.skipExecution) if err != nil { return errors.Wrap(err, "error validating flags") } @@ -126,7 +131,7 @@ func (o *migrateApplyOptions) run() error { // Only one flag out of up, down and version can be set at a time. This function // checks whether that is the case and returns an error is not -func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType string, skipExecution bool) (string, int64, error) { +func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migrationType, gotoVersion string, skipExecution bool) (string, int64, error) { var flagCount = 0 var stepString = "all" var migrationName = "up" @@ -147,6 +152,11 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra } flagCount++ } + if gotoVersion != "" { + migrationName = "gotoVersion" + stepString = gotoVersion + flagCount++ + } if flagCount > 1 { return "", 0, errors.New("Only one migration type can be applied at a time (--up, --down or --goto)") diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 534793bad4920..16f1bca4f118d 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -255,3 +255,42 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s return } + +func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { + currentVersion, dirty, err := m.Version() + if err != nil { + return errors.Wrap(err, "cannot determine the current version of migrations") + } + if dirty { + return errors.New("stopping now, database is in dirty state") + } + + status, err := m.GetStatus() + if err != nil { + errors.Wrap(err, "cannot determine status of migrations") + } + var gotoStep, currentStep int + for index, migrationVersion := range status.Index { + if migrationVersion == gotoVersion { + gotoStep = index + } + if currentVersion == migrationVersion { + currentStep = index + } + } + + if gotoStep < currentStep { + for step := currentStep; step >= gotoStep; step-- { + m.Migrate(status.Index[step], "down") + + } + } + if gotoStep > currentStep { + for step := currentStep; step <= gotoStep; step++ { + m.Migrate(status.Index[step], "up") + + } + + } + return nil +} From 7f3612c5e5b50f3c466f3cb00d17bb5a8beb7258 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Wed, 15 Jan 2020 18:48:08 +0530 Subject: [PATCH 02/13] cli: fix error encoutered in goto, when on version -1 --- cli/migrate/cmd/commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 16f1bca4f118d..48cbefb1f43ed 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -258,7 +258,7 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { currentVersion, dirty, err := m.Version() - if err != nil { + if err != nil && err != migrate.ErrNilVersion{ return errors.Wrap(err, "cannot determine the current version of migrations") } if dirty { From 37870580b5f738533ef6e43d5c90a54768513ad9 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Thu, 16 Jan 2020 10:33:17 +0530 Subject: [PATCH 03/13] do down migrations one step down --- cli/migrate/cmd/commands.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 48cbefb1f43ed..69acfc53b0f34 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -258,7 +258,7 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { currentVersion, dirty, err := m.Version() - if err != nil && err != migrate.ErrNilVersion{ + if err != nil && err != migrate.ErrNilVersion { return errors.Wrap(err, "cannot determine the current version of migrations") } if dirty { @@ -280,7 +280,7 @@ func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { } if gotoStep < currentStep { - for step := currentStep; step >= gotoStep; step-- { + for step := currentStep; step > gotoStep; step-- { m.Migrate(status.Index[step], "down") } From 64ecf6d4efac5ae3592c45bad614ec4e1fd4cf55 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Mon, 27 Jan 2020 11:28:25 +0530 Subject: [PATCH 04/13] reorganize code --- cli/migrate/cmd/commands.go | 37 +------------------------------- cli/migrate/migrate.go | 42 +++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 69acfc53b0f34..5b1d8f6606b5a 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -257,40 +257,5 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s } func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { - currentVersion, dirty, err := m.Version() - if err != nil && err != migrate.ErrNilVersion { - return errors.Wrap(err, "cannot determine the current version of migrations") - } - if dirty { - return errors.New("stopping now, database is in dirty state") - } - - status, err := m.GetStatus() - if err != nil { - errors.Wrap(err, "cannot determine status of migrations") - } - var gotoStep, currentStep int - for index, migrationVersion := range status.Index { - if migrationVersion == gotoVersion { - gotoStep = index - } - if currentVersion == migrationVersion { - currentStep = index - } - } - - if gotoStep < currentStep { - for step := currentStep; step > gotoStep; step-- { - m.Migrate(status.Index[step], "down") - - } - } - if gotoStep > currentStep { - for step := currentStep; step <= gotoStep; step++ { - m.Migrate(status.Index[step], "up") - - } - - } - return nil + return m.GotoVersion(gotoVersion) } diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 94be78004dd9b..da87e6c0eb194 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/hasura/graphql-engine/cli/migrate/database" "github.com/hasura/graphql-engine/cli/migrate/source" @@ -1470,3 +1472,43 @@ func (m *Migrate) unlockErr(prevErr error) error { } return prevErr } + +// GotoVersion will apply a version also applying the migration chain +// leading to it +func (m *Migrate) GotoVersion(gotoVersion uint64) error { + currentVersion, dirty, err := m.Version() + if err != nil && err != ErrNilVersion { + return errors.Wrap(err, "cannot determine the current version of migrations") + } + if dirty { + return errors.New("stopping now, database is in dirty state") + } + + status, err := m.GetStatus() + if err != nil { + errors.Wrap(err, "cannot determine status of migrations") + } + var gotoStep, currentStep int + for index, migrationVersion := range status.Index { + if migrationVersion == gotoVersion { + gotoStep = index + } + if currentVersion == migrationVersion { + currentStep = index + } + } + + if gotoStep < currentStep { + for step := currentStep; step > gotoStep; step-- { + m.Migrate(status.Index[step], "down") + + } + } + if gotoStep > currentStep { + for step := currentStep; step <= gotoStep; step++ { + m.Migrate(status.Index[step], "up") + } + + } + return nil +} From 2327082ef9ef91446d9fa6da4dbeef8a1055d749 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Mon, 27 Jan 2020 11:42:47 +0530 Subject: [PATCH 05/13] in case of up "gotos" make sure that all previous migration in migration chain are applied --- cli/migrate/migrate.go | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index da87e6c0eb194..0da415186f7af 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1505,8 +1505,41 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { } } if gotoStep > currentStep { - for step := currentStep; step <= gotoStep; step++ { - m.Migrate(status.Index[step], "up") + /* When applying up migration make sure that all previous steps are applied + If we have the following state, + + 1580104374862 create_table_public_T1 Present Present + 1580104397702 create_table_public_T2 Present Not Present + 1580104421111 create_table_public_T3 Present Present + 1580104557536 create_table_public_T4 Present Not Present + 1580104573373 create_table_public_T5 Present Not Present + + and on running a + $ hasura migrate apply --goto 1580104573373 + + It SHOULD NOT yield the following output + + VERSION NAME SOURCE STATUS DATABASE STATUS + 1580104374862 create_table_public_T1 Present Present + 1580104397702 create_table_public_T2 Present Not Present + 1580104421111 create_table_public_T3 Present Present + 1580104557536 create_table_public_T4 Present Present + 1580104573373 create_table_public_T5 Present Present + + It SHOULD result in the following state + + VERSION NAME SOURCE STATUS DATABASE STATUS + 1580104374862 create_table_public_T1 Present Present + 1580104397702 create_table_public_T2 Present Present + 1580104421111 create_table_public_T3 Present Present + 1580104557536 create_table_public_T4 Present Present + 1580104573373 create_table_public_T5 Present Present + + */ + for step := 0; step <= gotoStep; step++ { + if applied := status.Migrations[status.Index[step]].IsApplied; !applied { + m.Migrate(status.Index[step], "up") + } } } From 1063199fead33f98da34a88470b183233a99606c Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 28 Jan 2020 13:20:38 +0530 Subject: [PATCH 06/13] modify readUp and readDown functions to account for --goto use case --- cli/migrate/migrate.go | 265 ++++++++++++++++++++++++++++++++++------- 1 file changed, 223 insertions(+), 42 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 0da415186f7af..f4418d365c596 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1488,60 +1488,241 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { if err != nil { errors.Wrap(err, "cannot determine status of migrations") } - var gotoStep, currentStep int + var gotoStep, currentStep int64 for index, migrationVersion := range status.Index { if migrationVersion == gotoVersion { - gotoStep = index + gotoStep = int64(index) } if currentVersion == migrationVersion { - currentStep = index + currentStep = int64(index) } } - if gotoStep < currentStep { - for step := currentStep; step > gotoStep; step-- { - m.Migrate(status.Index[step], "down") + if err := m.lock(); err != nil { + return err + } + ret := make(chan interface{}) + if gotoStep > currentStep { + go m.readUpFromVersion(-1, gotoStep, ret) + } else if gotoStep < currentStep { + go m.readDownFromVersion(int64(currentVersion), currentStep-gotoStep, ret) + } + + return m.unlockErr(m.runMigrations(ret)) + +} + +// readUpFromVersion reads up migrations from `from` limitted by `limit`. (is a modified version of readUp) +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readUpFromVersion is done reading it will close the ret channel. +func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interface{}) { + defer close(ret) + if limit == 0 { + ret <- ErrNoChange + return + } + + count := int64(0) + for count < limit || limit == -1 { + if m.stop() { + return + } + + if from == -1 { + firstVersion, err := m.sourceDrv.First() + if err != nil { + ret <- err + return + } + + // Check if this version present in DB + ok := m.databaseDrv.Read(firstVersion) + if ok { + from = int64(firstVersion) + continue + } + + // Check if firstVersion files exists (yaml or sql) + if err = m.versionUpExists(firstVersion); err != nil { + ret <- err + return + } + + migr, err := m.newMigration(firstVersion, int64(firstVersion)) + if err != nil { + ret <- err + return + } + ret <- migr + go migr.Buffer() + migr, err = m.metanewMigration(firstVersion, int64(firstVersion)) + if err != nil { + ret <- err + return + } + ret <- migr + go migr.Buffer() + from = int64(firstVersion) + count++ + continue + } + + // apply next migration + next, err := m.sourceDrv.Next(suint64(from)) + if os.IsNotExist(err) { + // no limit, but no migrations applied? + if limit == -1 && count == 0 { + ret <- ErrNoChange + return + } + + // no limit, reached end + if limit == -1 { + return + } + + // reached end, and didn't apply any migrations + if limit > 0 && count == 0 { + ret <- ErrNoChange + return + } + + // applied less migrations than limit? + if count < limit { + // This case is normal when comaparting to the actual original readUp function + // ret <- ErrShortLimit{suint64(limit - count)} + return + } + } + + if err != nil { + ret <- err + return } + + // Check if this version present in DB + ok := m.databaseDrv.Read(next) + if ok { + from = int64(next) + // we have to increment the count even if we are not applying any migrations + count++ + continue + } + + // Check if next files exists (yaml or sql) + if err = m.versionUpExists(next); err != nil { + ret <- err + return + } + + migr, err := m.newMigration(next, int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.metanewMigration(next, int64(next)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + from = int64(next) + count++ } - if gotoStep > currentStep { - /* When applying up migration make sure that all previous steps are applied - If we have the following state, - - 1580104374862 create_table_public_T1 Present Present - 1580104397702 create_table_public_T2 Present Not Present - 1580104421111 create_table_public_T3 Present Present - 1580104557536 create_table_public_T4 Present Not Present - 1580104573373 create_table_public_T5 Present Not Present - - and on running a - $ hasura migrate apply --goto 1580104573373 - - It SHOULD NOT yield the following output - - VERSION NAME SOURCE STATUS DATABASE STATUS - 1580104374862 create_table_public_T1 Present Present - 1580104397702 create_table_public_T2 Present Not Present - 1580104421111 create_table_public_T3 Present Present - 1580104557536 create_table_public_T4 Present Present - 1580104573373 create_table_public_T5 Present Present - - It SHOULD result in the following state - - VERSION NAME SOURCE STATUS DATABASE STATUS - 1580104374862 create_table_public_T1 Present Present - 1580104397702 create_table_public_T2 Present Present - 1580104421111 create_table_public_T3 Present Present - 1580104557536 create_table_public_T4 Present Present - 1580104573373 create_table_public_T5 Present Present - - */ - for step := 0; step <= gotoStep; step++ { - if applied := status.Migrations[status.Index[step]].IsApplied; !applied { - m.Migrate(status.Index[step], "up") +} + +// readDownFromVersion reads down migrations from `from` limitted by `limit`. (modified version of readDown) +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readDownFromVersion is done reading it will close the ret channel. +func (m *Migrate) readDownFromVersion(from int64, limit int64, ret chan<- interface{}) { + defer close(ret) + var err error + if limit == 0 { + ret <- ErrNoChange + return + } + + // no change if already at nil version + if from == -1 && limit == -1 { + ret <- ErrNoChange + return + } + + // can't go over limit if already at nil version + if from == -1 && limit > 0 { + ret <- ErrNoChange + return + } + + count := int64(0) + for count < limit || limit == -1 { + if m.stop() { + return + } + + err = m.versionDownExists(suint64(from)) + if err != nil { + ret <- err + return + } + + prev, ok := m.databaseDrv.Prev(suint64(from)) + if !ok { + // no limit or haven't reached limit, apply "first" migration + if limit == -1 || limit-count > 0 { + migr, err := m.metanewMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + ret <- migr + go migr.Buffer() + count++ + } + + if count < limit { + // ret <- ErrShortLimit{suint64(limit - count)} } + return } + migr, err := m.metanewMigration(suint64(from), int64(prev)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(suint64(from), int64(prev)) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + from = int64(prev) + count++ } - return nil } From 9442c9ae2358c564d20a4dc3a967d11e4586a7e8 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 28 Jan 2020 16:15:34 +0530 Subject: [PATCH 07/13] refactor to clean up and adopt a better implementation addressing the comments --- cli/migrate/migrate.go | 114 +++++++++-------------------------------- 1 file changed, 23 insertions(+), 91 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index f4418d365c596..ffdf3e327b545 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -9,12 +9,11 @@ import ( "bytes" "container/list" "fmt" + "github.com/pkg/errors" "os" "sync" "time" - "github.com/pkg/errors" - "github.com/hasura/graphql-engine/cli/migrate/database" "github.com/hasura/graphql-engine/cli/migrate/source" @@ -1477,35 +1476,26 @@ func (m *Migrate) unlockErr(prevErr error) error { // leading to it func (m *Migrate) GotoVersion(gotoVersion uint64) error { currentVersion, dirty, err := m.Version() - if err != nil && err != ErrNilVersion { - return errors.Wrap(err, "cannot determine the current version of migrations") + if err != nil { + return errors.Wrap(err, "cannot determine version") } + if dirty { - return errors.New("stopping now, database is in dirty state") + return ErrDirty{} } - status, err := m.GetStatus() - if err != nil { - errors.Wrap(err, "cannot determine status of migrations") - } - var gotoStep, currentStep int64 - for index, migrationVersion := range status.Index { - if migrationVersion == gotoVersion { - gotoStep = int64(index) - } - if currentVersion == migrationVersion { - currentStep = int64(index) - } + if currentVersion == gotoVersion { + return ErrNoChange } if err := m.lock(); err != nil { return err } ret := make(chan interface{}) - if gotoStep > currentStep { - go m.readUpFromVersion(-1, gotoStep, ret) - } else if gotoStep < currentStep { - go m.readDownFromVersion(int64(currentVersion), currentStep-gotoStep, ret) + if currentVersion < gotoVersion { + go m.readUpFromVersion(-1, int64(gotoVersion), ret) + } else if currentVersion > gotoVersion { + go m.readDownFromVersion(int64(currentVersion), int64(gotoVersion), ret) } return m.unlockErr(m.runMigrations(ret)) @@ -1517,15 +1507,17 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { // Each migration is then written to the ret channel. // If an error occurs during reading, that error is written to the ret channel, too. // Once readUpFromVersion is done reading it will close the ret channel. -func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interface{}) { +func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{}) { defer close(ret) - if limit == 0 { + if from == to { ret <- ErrNoChange return } - count := int64(0) - for count < limit || limit == -1 { + for { + if from == to { + return + } if m.stop() { return } @@ -1566,36 +1558,12 @@ func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interfac ret <- migr go migr.Buffer() from = int64(firstVersion) - count++ continue } // apply next migration next, err := m.sourceDrv.Next(suint64(from)) if os.IsNotExist(err) { - // no limit, but no migrations applied? - if limit == -1 && count == 0 { - ret <- ErrNoChange - return - } - - // no limit, reached end - if limit == -1 { - return - } - - // reached end, and didn't apply any migrations - if limit > 0 && count == 0 { - ret <- ErrNoChange - return - } - - // applied less migrations than limit? - if count < limit { - // This case is normal when comaparting to the actual original readUp function - // ret <- ErrShortLimit{suint64(limit - count)} - return - } } if err != nil { @@ -1607,8 +1575,6 @@ func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interfac ok := m.databaseDrv.Read(next) if ok { from = int64(next) - // we have to increment the count even if we are not applying any migrations - count++ continue } @@ -1636,7 +1602,6 @@ func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interfac ret <- migr go migr.Buffer() from = int64(next) - count++ } } @@ -1645,28 +1610,19 @@ func (m *Migrate) readUpFromVersion(from int64, limit int64, ret chan<- interfac // Each migration is then written to the ret channel. // If an error occurs during reading, that error is written to the ret channel, too. // Once readDownFromVersion is done reading it will close the ret channel. -func (m *Migrate) readDownFromVersion(from int64, limit int64, ret chan<- interface{}) { +func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface{}) { defer close(ret) var err error - if limit == 0 { - ret <- ErrNoChange - return - } - // no change if already at nil version - if from == -1 && limit == -1 { - ret <- ErrNoChange - return - } - - // can't go over limit if already at nil version - if from == -1 && limit > 0 { + if from == to { ret <- ErrNoChange return } - count := int64(0) - for count < limit || limit == -1 { + for { + if from == to { + return + } if m.stop() { return } @@ -1679,29 +1635,6 @@ func (m *Migrate) readDownFromVersion(from int64, limit int64, ret chan<- interf prev, ok := m.databaseDrv.Prev(suint64(from)) if !ok { - // no limit or haven't reached limit, apply "first" migration - if limit == -1 || limit-count > 0 { - migr, err := m.metanewMigration(suint64(from), -1) - if err != nil { - ret <- err - return - } - ret <- migr - go migr.Buffer() - - migr, err = m.newMigration(suint64(from), -1) - if err != nil { - ret <- err - return - } - ret <- migr - go migr.Buffer() - count++ - } - - if count < limit { - // ret <- ErrShortLimit{suint64(limit - count)} - } return } @@ -1723,6 +1656,5 @@ func (m *Migrate) readDownFromVersion(from int64, limit int64, ret chan<- interf ret <- migr go migr.Buffer() from = int64(prev) - count++ } } From 1c0607fe1777928003f39e3730ff485aa636a9fc Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 28 Jan 2020 16:53:43 +0530 Subject: [PATCH 08/13] remove empty error handling step --- cli/migrate/migrate.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index ffdf3e327b545..81a1b03e3f493 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1563,9 +1563,6 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} // apply next migration next, err := m.sourceDrv.Next(suint64(from)) - if os.IsNotExist(err) { - } - if err != nil { ret <- err return From e6e61052158d7a169cc304dbc485f162f2d231bd Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 28 Jan 2020 18:12:02 +0530 Subject: [PATCH 09/13] return ErrNoChange when no migrations were applied --- cli/migrate/migrate.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 81a1b03e3f493..ce14975f02422 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1513,9 +1513,12 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} ret <- ErrNoChange return } - + var noOfAppliedMigrations int for { if from == to { + if noOfAppliedMigrations == 0 { + ret <- ErrNoChange + } return } if m.stop() { @@ -1556,6 +1559,9 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} return } ret <- migr + + noOfAppliedMigrations++ + go migr.Buffer() from = int64(firstVersion) continue @@ -1598,6 +1604,7 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} ret <- migr go migr.Buffer() + noOfAppliedMigrations++ from = int64(next) } } @@ -1615,9 +1622,12 @@ func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface ret <- ErrNoChange return } - + var noOfAppliedMigrations int for { if from == to { + if noOfAppliedMigrations == 0 { + ret <- ErrNoChange + } return } if m.stop() { @@ -1651,6 +1661,7 @@ func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface } ret <- migr + noOfAppliedMigrations++ go migr.Buffer() from = int64(prev) } From b8384e9f3d4126a6bfbc93f9bbba8159cbfef470 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Tue, 28 Jan 2020 18:28:52 +0530 Subject: [PATCH 10/13] respect m.stop() can panic so place conditional cases after that and add a couple other small fixes --- cli/migrate/migrate.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index ce14975f02422..47403607fbde7 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1481,7 +1481,7 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { } if dirty { - return ErrDirty{} + return ErrDirty{int64(currentVersion)} } if currentVersion == gotoVersion { @@ -1509,21 +1509,17 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { // Once readUpFromVersion is done reading it will close the ret channel. func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{}) { defer close(ret) - if from == to { - ret <- ErrNoChange - return - } var noOfAppliedMigrations int for { + if m.stop() { + return + } if from == to { if noOfAppliedMigrations == 0 { ret <- ErrNoChange } return } - if m.stop() { - return - } if from == -1 { firstVersion, err := m.sourceDrv.First() @@ -1560,10 +1556,9 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} } ret <- migr - noOfAppliedMigrations++ - go migr.Buffer() from = int64(firstVersion) + noOfAppliedMigrations++ continue } @@ -1604,8 +1599,8 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} ret <- migr go migr.Buffer() - noOfAppliedMigrations++ from = int64(next) + noOfAppliedMigrations++ } } @@ -1617,22 +1612,18 @@ func (m *Migrate) readUpFromVersion(from int64, to int64, ret chan<- interface{} func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface{}) { defer close(ret) var err error - // no change if already at nil version - if from == to { - ret <- ErrNoChange - return - } var noOfAppliedMigrations int for { + if m.stop() { + return + } + if from == to { if noOfAppliedMigrations == 0 { ret <- ErrNoChange } return } - if m.stop() { - return - } err = m.versionDownExists(suint64(from)) if err != nil { @@ -1661,8 +1652,8 @@ func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface } ret <- migr - noOfAppliedMigrations++ go migr.Buffer() from = int64(prev) + noOfAppliedMigrations++ } } From a575361511aa214d7a04a82e04abfd91a134e484 Mon Sep 17 00:00:00 2001 From: scriptonist Date: Thu, 30 Jan 2020 13:47:44 +0530 Subject: [PATCH 11/13] fix bug caused by early checking of versions --- cli/migrate/migrate.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 47403607fbde7..a7ffb7079ec51 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -9,11 +9,12 @@ import ( "bytes" "container/list" "fmt" - "github.com/pkg/errors" "os" "sync" "time" + "github.com/pkg/errors" + "github.com/hasura/graphql-engine/cli/migrate/database" "github.com/hasura/graphql-engine/cli/migrate/source" @@ -1484,15 +1485,11 @@ func (m *Migrate) GotoVersion(gotoVersion uint64) error { return ErrDirty{int64(currentVersion)} } - if currentVersion == gotoVersion { - return ErrNoChange - } - if err := m.lock(); err != nil { return err } ret := make(chan interface{}) - if currentVersion < gotoVersion { + if currentVersion <= gotoVersion { go m.readUpFromVersion(-1, int64(gotoVersion), ret) } else if currentVersion > gotoVersion { go m.readDownFromVersion(int64(currentVersion), int64(gotoVersion), ret) From 284f5a16212572fd2cb864da319cb6ab84bf66d6 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 30 Jan 2020 17:12:43 +0530 Subject: [PATCH 12/13] fix to add migration_mode and nil version check --- cli/commands/migrate.go | 2 +- cli/commands/migrate_apply.go | 32 ++++++++---------- cli/migrate/cmd/commands.go | 2 +- cli/migrate/migrate.go | 61 +++++++++++++++++++++++++++++------ 4 files changed, 68 insertions(+), 29 deletions(-) diff --git a/cli/commands/migrate.go b/cli/commands/migrate.go index d56b01ea6549d..77c0ce32a9042 100644 --- a/cli/commands/migrate.go +++ b/cli/commands/migrate.go @@ -55,7 +55,7 @@ func ExecuteMigration(cmd string, t *migrate.Migrate, stepOrVersion int64) error case "down": err = mig.DownCmd(t, stepOrVersion) case "gotoVersion": - err = mig.GotoVersionCmd(t, uint64(stepOrVersion)) + err = mig.GotoVersionCmd(t, stepOrVersion) case "version": var direction string if stepOrVersion >= 0 { diff --git a/cli/commands/migrate_apply.go b/cli/commands/migrate_apply.go index 0fa21e00f2568..bcad6a4c766f1 100644 --- a/cli/commands/migrate_apply.go +++ b/cli/commands/migrate_apply.go @@ -58,7 +58,18 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { err := opts.run() opts.EC.Spinner.Stop() if err != nil { - return err + if err == migrate.ErrNoChange { + opts.EC.Logger.Info("nothing to apply") + return nil + } + if e, ok := err.(*os.PathError); ok { + // If Op is first, then log No migrations to apply + if e.Op == "first" { + opts.EC.Logger.Info("nothing to apply") + return nil + } + } + return errors.Wrap(err, "apply failed") } opts.EC.Logger.Info("migrations applied") return nil @@ -111,22 +122,7 @@ func (o *migrateApplyOptions) run() error { } migrateDrv.SkipExecution = o.skipExecution - err = ExecuteMigration(migrationType, migrateDrv, step) - if err != nil { - if err == migrate.ErrNoChange { - o.EC.Logger.Info("nothing to apply") - return nil - } - if e, ok := err.(*os.PathError); ok { - // If Op is first, then log No migrations to apply - if e.Op == "first" { - o.EC.Logger.Info("No migrations to apply") - return nil - } - } - return errors.Wrap(err, "apply failed") - } - return nil + return ExecuteMigration(migrationType, migrateDrv, step) } // Only one flag out of up, down and version can be set at a time. This function @@ -172,7 +168,7 @@ func getMigrationTypeAndStep(upMigration, downMigration, versionMigration, migra step, err := strconv.ParseInt(stepString, 10, 64) if err != nil { - return "", 0, errors.Wrap(err, "not a valid input for steps") + return "", 0, errors.Wrap(err, "not a valid input for steps/version") } return migrationName, step, nil } diff --git a/cli/migrate/cmd/commands.go b/cli/migrate/cmd/commands.go index 5b1d8f6606b5a..464b21c7c25fe 100644 --- a/cli/migrate/cmd/commands.go +++ b/cli/migrate/cmd/commands.go @@ -256,6 +256,6 @@ func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory s return } -func GotoVersionCmd(m *migrate.Migrate, gotoVersion uint64) error { +func GotoVersionCmd(m *migrate.Migrate, gotoVersion int64) error { return m.GotoVersion(gotoVersion) } diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index a7ffb7079ec51..6f557f3cdb514 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -1233,7 +1233,7 @@ func (m *Migrate) versionUpExists(version uint64) error { // versionDownExists checks the source if either the up or down migration for // the specified migration version exists. func (m *Migrate) versionDownExists(version uint64) error { - // try up migration first + // try down migration first directions := m.sourceDrv.GetDirections(version) if !directions[source.Down] && !directions[source.MetaDown] { return fmt.Errorf("%d down migration not found", version) @@ -1475,24 +1475,37 @@ func (m *Migrate) unlockErr(prevErr error) error { // GotoVersion will apply a version also applying the migration chain // leading to it -func (m *Migrate) GotoVersion(gotoVersion uint64) error { - currentVersion, dirty, err := m.Version() +func (m *Migrate) GotoVersion(gotoVersion int64) error { + mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return errors.Wrap(err, "cannot determine version") + return err + } + if mode != "true" { + return ErrNoMigrationMode } + currentVersion, dirty, err := m.Version() + currVersion := int64(currentVersion) + if err != nil { + if err == ErrNilVersion { + currVersion = database.NilVersion + } else { + return errors.Wrap(err, "cannot determine version") + } + } if dirty { - return ErrDirty{int64(currentVersion)} + return ErrDirty{currVersion} } if err := m.lock(); err != nil { return err } + ret := make(chan interface{}) - if currentVersion <= gotoVersion { - go m.readUpFromVersion(-1, int64(gotoVersion), ret) - } else if currentVersion > gotoVersion { - go m.readDownFromVersion(int64(currentVersion), int64(gotoVersion), ret) + if currVersion <= gotoVersion { + go m.readUpFromVersion(-1, gotoVersion, ret) + } else if currVersion > gotoVersion { + go m.readDownFromVersion(currVersion, gotoVersion, ret) } return m.unlockErr(m.runMigrations(ret)) @@ -1630,6 +1643,36 @@ func (m *Migrate) readDownFromVersion(from int64, to int64, ret chan<- interface prev, ok := m.databaseDrv.Prev(suint64(from)) if !ok { + // Check if any prev version available in source + prev, err = m.sourceDrv.Prev(suint64(from)) + if os.IsNotExist(err) && to == -1 { + // apply nil migration + migr, err := m.metanewMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + migr, err = m.newMigration(suint64(from), -1) + if err != nil { + ret <- err + return + } + + ret <- migr + go migr.Buffer() + + from = database.NilVersion + noOfAppliedMigrations++ + continue + } else if err != nil { + ret <- err + return + } + ret <- fmt.Errorf("%v not applied on database", prev) return } From fcd7a38a8fa8501b7640fd96fe8dc432303a557c Mon Sep 17 00:00:00 2001 From: Shahidh K Muhammed Date: Mon, 3 Feb 2020 11:13:17 +0530 Subject: [PATCH 13/13] add some more examples for migrate apply command --- cli/commands/migrate_apply.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cli/commands/migrate_apply.go b/cli/commands/migrate_apply.go index bcad6a4c766f1..59953e4eec7e1 100644 --- a/cli/commands/migrate_apply.go +++ b/cli/commands/migrate_apply.go @@ -42,6 +42,12 @@ func newMigrateApplyCmd(ec *cli.ExecutionContext) *cobra.Command { # Apply only a particular version hasura migrate apply --type up --version "" + + # Apply all up migrations upto version 125, last applied is 100 + hasura migrate apply --goto 125 + + # Apply all down migrations upto version 125, last applied is 150 + hasura migrate apply --goto 125 # Rollback a particular version: hasura migrate apply --type down --version ""