diff --git a/cmd/collect.go b/cmd/collect.go index 2576e764..332847fc 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -16,7 +16,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/modconfig" "github.com/turbot/pipe-fittings/v2/parse" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" @@ -61,8 +60,9 @@ Every time you run tailpipe collect, Tailpipe refreshes its views over all colle } func runCollectCmd(cmd *cobra.Command, args []string) { - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() + ctx, cancel := context.WithCancel(ctx) //nolint:govet // cancel is needed for the doCollect func var err error defer func() { @@ -83,7 +83,7 @@ func runCollectCmd(cmd *cobra.Command, args []string) { // if diagnostic mode is set, print out config and return if _, ok := os.LookupEnv(constants.EnvConfigDump); ok { localcmdconfig.DisplayConfig() - return + return //nolint:govet // this is explicitly used in tests } err = doCollect(ctx, cancel, args) diff --git a/cmd/compact.go b/cmd/compact.go index c777b203..a02605c0 100644 --- a/cmd/compact.go +++ b/cmd/compact.go @@ -13,7 +13,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" @@ -38,8 +37,8 @@ func compactCmd() *cobra.Command { func runCompactCmd(cmd *cobra.Command, args []string) { var err error - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() defer func() { if r := recover(); r != nil { diff --git a/cmd/connect.go b/cmd/connect.go index a7b4da56..7b83a59d 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -10,8 +10,6 @@ import ( "strings" "time" - "golang.org/x/exp/maps" - "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/thediveo/enumflag/v2" @@ -19,7 +17,6 @@ import ( "github.com/turbot/pipe-fittings/v2/cmdconfig" "github.com/turbot/pipe-fittings/v2/connection" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" pfilepaths "github.com/turbot/pipe-fittings/v2/filepaths" "github.com/turbot/pipe-fittings/v2/parse" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" @@ -27,6 +24,7 @@ import ( "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/database" error_helpers "github.com/turbot/tailpipe/internal/error_helpers" + "golang.org/x/exp/maps" ) // variable used to assign the output mode flag @@ -96,8 +94,8 @@ The generated script can be used with DuckDB: func runConnectCmd(cmd *cobra.Command, _ []string) { var err error var initFilePath string - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() defer func() { if r := recover(); r != nil { diff --git a/cmd/format.go b/cmd/format.go index 5f5d40de..94f8759d 100644 --- a/cmd/format.go +++ b/cmd/format.go @@ -1,7 +1,6 @@ package cmd import ( - "context" "fmt" "os" "strings" @@ -11,7 +10,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/printers" "github.com/turbot/pipe-fittings/v2/utils" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" @@ -69,9 +67,8 @@ func formatListCmd() *cobra.Command { } func runFormatListCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runFormatListCmd start") var err error defer func() { @@ -86,7 +83,7 @@ func runFormatListCmd(cmd *cobra.Command, args []string) { } else { error_helpers.ShowError(ctx, err) } - setExitCodeForFormatError(err,1) + setExitCodeForFormatError(err) } }() @@ -133,9 +130,8 @@ func formatShowCmd() *cobra.Command { } func runFormatShowCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runFormatShowCmd start") var err error defer func() { @@ -150,7 +146,7 @@ func runFormatShowCmd(cmd *cobra.Command, args []string) { } else { error_helpers.ShowError(ctx, err) } - setExitCodeForFormatError(err, 1) + setExitCodeForFormatError(err) } }() @@ -172,7 +168,7 @@ func runFormatShowCmd(cmd *cobra.Command, args []string) { } } -func setExitCodeForFormatError(err error, nonCancelCode int) { +func setExitCodeForFormatError(err error) { // set exit code only if an error occurred and no exit code is already set if exitCode != 0 || err == nil { return @@ -183,5 +179,5 @@ func setExitCodeForFormatError(err error, nonCancelCode int) { return } // no dedicated format exit code exists yet; use generic nonzero failure - exitCode = nonCancelCode + exitCode = 1 } diff --git a/cmd/partition.go b/cmd/partition.go index 78029017..3a72c583 100644 --- a/cmd/partition.go +++ b/cmd/partition.go @@ -15,7 +15,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/printers" "github.com/turbot/pipe-fittings/v2/statushooks" "github.com/turbot/pipe-fittings/v2/utils" @@ -76,9 +75,8 @@ func partitionListCmd() *cobra.Command { } func runPartitionListCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runPartitionListCmd start") var err error defer func() { @@ -103,7 +101,6 @@ func runPartitionListCmd(cmd *cobra.Command, args []string) { return } - // open a readonly db connection db, err := database.NewDuckDb(database.WithDuckLakeReadonly()) error_helpers.FailOnError(err) defer db.Close() @@ -146,10 +143,10 @@ func partitionShowCmd() *cobra.Command { } func runPartitionShowCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() utils.LogTime("runPartitionShowCmd start") var err error defer func() { @@ -233,10 +230,10 @@ func partitionDeleteCmd() *cobra.Command { } func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() var err error defer func() { if r := recover(); r != nil { diff --git a/cmd/plugin.go b/cmd/plugin.go index ba7dc795..25879a51 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -15,7 +15,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/filepaths" "github.com/turbot/pipe-fittings/v2/installationstate" pociinstaller "github.com/turbot/pipe-fittings/v2/ociinstaller" @@ -238,10 +237,10 @@ var pluginInstallSteps = []string{ } func runPluginInstallCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() utils.LogTime("runPluginInstallCmd install") var err error defer func() { @@ -386,10 +385,10 @@ func doPluginInstall(ctx context.Context, bar *uiprogress.Bar, pluginName string } func runPluginUpdateCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() utils.LogTime("runPluginUpdateCmd start") var err error defer func() { @@ -667,10 +666,10 @@ func installPlugin(ctx context.Context, resolvedPlugin pplugin.ResolvedPluginVer } func runPluginUninstallCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() utils.LogTime("runPluginUninstallCmd uninstall") var err error @@ -768,9 +767,8 @@ func resolveUpdatePluginsFromArgs(args []string) ([]string, error) { } func runPluginListCmd(cmd *cobra.Command, _ []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runPluginListCmd list") @@ -822,18 +820,18 @@ func runPluginListCmd(cmd *cobra.Command, _ []string) { } func runPluginShowCmd(cmd *cobra.Command, args []string) { + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() + // we expect 1 argument, the plugin name if len(args) != 1 { - error_helpers.ShowError(cmd.Context(), fmt.Errorf("you need to provide the name of a plugin")) + error_helpers.ShowError(ctx, fmt.Errorf("you need to provide the name of a plugin")) exitCode = pconstants.ExitCodeInsufficientOrWrongInputs return } - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) - utils.LogTime("runPluginShowCmd start") // Clean up plugin temporary directories from previous crashes/interrupted installations diff --git a/cmd/source.go b/cmd/source.go index ecc2785a..dcb9d5e6 100644 --- a/cmd/source.go +++ b/cmd/source.go @@ -1,7 +1,6 @@ package cmd import ( - "context" "fmt" "os" "strings" @@ -11,13 +10,12 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" + "github.com/turbot/pipe-fittings/v2/error_helpers" "github.com/turbot/pipe-fittings/v2/printers" "github.com/turbot/pipe-fittings/v2/utils" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/display" - error_helpers "github.com/turbot/tailpipe/internal/error_helpers" ) func sourceCmd() *cobra.Command { @@ -66,9 +64,8 @@ func sourceListCmd() *cobra.Command { } func runSourceListCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runSourceListCmd start") var err error defer func() { @@ -130,10 +127,10 @@ func sourceShowCmd() *cobra.Command { } func runSourceShowCmd(cmd *cobra.Command, args []string) { - //setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - //TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a cancellation error. Cancellation won't work right now - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + // TODO: https://github.com/turbot/tailpipe/issues/563 none of the functions called in this command will return a + // cancellation error. Cancellation won't work right now + ctx := cmd.Context() utils.LogTime("runSourceShowCmd start") var err error defer func() { diff --git a/cmd/table.go b/cmd/table.go index 5200e0b9..7574d84d 100644 --- a/cmd/table.go +++ b/cmd/table.go @@ -1,7 +1,6 @@ package cmd import ( - "context" "fmt" "os" "strings" @@ -11,7 +10,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" - "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/printers" "github.com/turbot/pipe-fittings/v2/utils" localcmdconfig "github.com/turbot/tailpipe/internal/cmdconfig" @@ -68,9 +66,8 @@ func tableListCmd() *cobra.Command { } func runTableListCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runSourceListCmd start") var err error defer func() { @@ -138,9 +135,8 @@ func tableShowCmd() *cobra.Command { } func runTableShowCmd(cmd *cobra.Command, args []string) { - // setup a cancel context and start cancel handler - ctx, cancel := context.WithCancel(cmd.Context()) - contexthelpers.StartCancelHandler(cancel) + // use the signal-aware/cancelable context created upstream in preRunHook + ctx := cmd.Context() utils.LogTime("runTableShowCmd start") var err error defer func() { diff --git a/go.mod b/go.mod index c7d1e144..9654b36e 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ toolchain go1.24.0 replace ( github.com/c-bata/go-prompt => github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 - // github.com/turbot/pipe-fittings/v2 => ../pipe-fittings - //github.com/turbot/tailpipe-plugin-core => ../tailpipe-plugin-core - github.com/turbot/tailpipe-plugin-sdk => ../tailpipe-plugin-sdk +// github.com/turbot/pipe-fittings/v2 => ../pipe-fittings +//github.com/turbot/tailpipe-plugin-core => ../tailpipe-plugin-core +// github.com/turbot/tailpipe-plugin-sdk => ../tailpipe-plugin-sdk ) require ( @@ -19,8 +19,8 @@ require ( github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 github.com/turbot/go-kit v1.3.0 - github.com/turbot/pipe-fittings/v2 v2.7.0-rc.0 - github.com/turbot/tailpipe-plugin-sdk v0.9.2 + github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1 + github.com/turbot/tailpipe-plugin-sdk v0.9.3 github.com/zclconf/go-cty v1.16.3 golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 ) diff --git a/go.sum b/go.sum index 19ecc374..a2c06c77 100644 --- a/go.sum +++ b/go.sum @@ -1308,12 +1308,14 @@ github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0= github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ= github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1:zs87uA6QZsYLk4RRxDOIxt8ro/B2V6HzoMWm05Lo7ao= github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw= -github.com/turbot/pipe-fittings/v2 v2.7.0-rc.0 h1:p9/Hf0BNNjZVs5C4AqZlQgmihKt/nboh5OrfGcH8Mhk= -github.com/turbot/pipe-fittings/v2 v2.7.0-rc.0/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE= +github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1 h1:4Y/51FNwJqavbz/O8T8NQkpp6+roiyoT7BrD/GLR2FU= +github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE= github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U= github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc= github.com/turbot/tailpipe-plugin-core v0.2.10 h1:2+B7W4hzyS/pBr1y5ns9w84piWGq/x+WdCUjyPaPreQ= github.com/turbot/tailpipe-plugin-core v0.2.10/go.mod h1:dHzPUR1p5GksSvDqqEeZEvvJX6wTEwK/ZDev//9nSLw= +github.com/turbot/tailpipe-plugin-sdk v0.9.3 h1:JpGpGPwehqdXnRO3aqkQTpd96Vx2blY+AkXP8lYB32g= +github.com/turbot/tailpipe-plugin-sdk v0.9.3/go.mod h1:Egojp0j7+th/4Bh6muMuF6aZa5iE3MuiJ4pzBo0J2mg= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7/go.mod h1:5hzpfalEjfcJWp9yq75/EZoEu2Mzm34eJAPm3HOW2tw= github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= diff --git a/internal/cmdconfig/cmd_hooks.go b/internal/cmdconfig/cmd_hooks.go index 78e5b478..c07cfac7 100644 --- a/internal/cmdconfig/cmd_hooks.go +++ b/internal/cmdconfig/cmd_hooks.go @@ -13,6 +13,7 @@ import ( "github.com/turbot/pipe-fittings/v2/app_specific" "github.com/turbot/pipe-fittings/v2/cmdconfig" pconstants "github.com/turbot/pipe-fittings/v2/constants" + "github.com/turbot/pipe-fittings/v2/contexthelpers" "github.com/turbot/pipe-fittings/v2/error_helpers" "github.com/turbot/pipe-fittings/v2/filepaths" pparse "github.com/turbot/pipe-fittings/v2/parse" @@ -22,6 +23,7 @@ import ( "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/logger" + "github.com/turbot/tailpipe/internal/migration" "github.com/turbot/tailpipe/internal/parse" "github.com/turbot/tailpipe/internal/plugin" ) @@ -58,7 +60,33 @@ func preRunHook(cmd *cobra.Command, args []string) error { // set the max memory if specified setMemoryLimit() - return nil + // create cancel context and set back on command + baseCtx := cmd.Context() + ctx, cancel := context.WithCancel(baseCtx) + + // start the cancel handler to call cancel on interrupt signals + contexthelpers.StartCancelHandler(cancel) + cmd.SetContext(ctx) + + // migrate legacy data to DuckLake: + // Prior to Tailpipe v0.7.0 we stored data as native Parquet files alongside a tailpipe.db + // (DuckDB) that defined SQL views. From v0.7.0 onward Tailpipe uses DuckLake, which + // introduces a metadata database (metadata.sqlite). We run a one-time migration here to + // move existing user data into DuckLake’s layout so it can be queried and managed via + // the new metadata model. + // start migration + err := migration.MigrateDataToDucklake(cmd.Context()) + if error_helpers.IsContextCancelledError(err) { + // suppress Cobra's usage/errors only for this cancelled invocation + // Cobra prints usage when a command returns an error. The cancellation returns an error (context cancelled) + // from preRun, so Cobra assumes "user error" and shows help. + // This conditional block sets cmd.SilenceUsage = true and cmd.SilenceErrors = true only for cancellation, + // telling Cobra "don't print usage or re-print the error". Without it, you get the usage dump. + cmd.SilenceUsage = true + cmd.SilenceErrors = true + } + // return (possibly nil) error from migration + return err } func displayStartupLog() { diff --git a/internal/database/convertor.go b/internal/database/convertor.go index c34e7bcc..36e873f5 100644 --- a/internal/database/convertor.go +++ b/internal/database/convertor.go @@ -2,12 +2,14 @@ package database import ( "context" + "database/sql" "errors" "fmt" "log/slog" "sync" "sync/atomic" + "github.com/turbot/pipe-fittings/v2/backend" "github.com/turbot/tailpipe-plugin-sdk/schema" "github.com/turbot/tailpipe/internal/config" ) @@ -174,8 +176,8 @@ func (w *Converter) onFirstChunk(executionId string, chunk int32) error { // err will be returned by the parent function return err } - // create the DuckDB table fpr this partition if it does not already exist - if err := w.ensureDuckLakeTable(w.Partition.TableName); err != nil { + // create the DuckDB table for this partition if it does not already exist + if err := EnsureDuckLakeTable(w.conversionSchema.Columns, w.db, w.Partition.TableName); err != nil { return fmt.Errorf("failed to create DuckDB table: %w", err) } w.readJsonQueryFormat = buildReadJsonQueryFormat(w.conversionSchema, w.Partition) @@ -225,3 +227,70 @@ func (w *Converter) updateRowCount(count int64) { // call the status function with the new row count w.statusFunc(atomic.LoadInt64(&w.rowCount), atomic.LoadInt64(&w.failedRowCount)) } + +// CheckTableSchema checks if the specified table exists in the DuckDB database and compares its schema with the +// provided schema. +// it returns a TableSchemaStatus indicating whether the table exists, whether the schema matches, and any differences. +// THis is not used at present but will be used when we implement ducklake schema evolution handling +func (w *Converter) CheckTableSchema(db *sql.DB, tableName string, conversionSchema schema.ConversionSchema) (TableSchemaStatus, error) { + // Check if table exists + exists, err := w.tableExists(db, tableName) + if err != nil { + return TableSchemaStatus{}, err + } + + if !exists { + return TableSchemaStatus{}, nil + } + + // Get existing schema + existingSchema, err := w.getTableSchema(db, tableName) + if err != nil { + return TableSchemaStatus{}, fmt.Errorf("failed to retrieve schema: %w", err) + } + + // Use constructor to create status from comparison + diff := NewTableSchemaStatusFromComparison(existingSchema, conversionSchema) + return diff, nil +} + +func (w *Converter) tableExists(db *sql.DB, tableName string) (bool, error) { + sanitizedTableName, err := backend.SanitizeDuckDBIdentifier(tableName) + if err != nil { + return false, fmt.Errorf("invalid table name %s: %w", tableName, err) + } + //nolint:gosec // table name is sanitized + query := fmt.Sprintf("select exists (select 1 from information_schema.tables where table_name = '%s')", sanitizedTableName) + var exists int + if err := db.QueryRow(query).Scan(&exists); err != nil { + return false, err + } + return exists == 1, nil +} + +func (w *Converter) getTableSchema(db *sql.DB, tableName string) (map[string]schema.ColumnSchema, error) { + query := fmt.Sprintf("pragma table_info(%s);", tableName) + rows, err := db.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + schemaMap := make(map[string]schema.ColumnSchema) + for rows.Next() { + var name, dataType string + var notNull, pk int + var dfltValue sql.NullString + + if err := rows.Scan(&name, &dataType, ¬Null, &dfltValue, &pk); err != nil { + return nil, err + } + + schemaMap[name] = schema.ColumnSchema{ + ColumnName: name, + Type: dataType, + } + } + + return schemaMap, nil +} diff --git a/internal/database/convertor_ducklake.go b/internal/database/convertor_ducklake.go deleted file mode 100644 index bb80dd1c..00000000 --- a/internal/database/convertor_ducklake.go +++ /dev/null @@ -1,176 +0,0 @@ -package database - -import ( - "database/sql" - "fmt" - "strings" - - "github.com/turbot/pipe-fittings/v2/backend" - "github.com/turbot/tailpipe-plugin-sdk/constants" - "github.com/turbot/tailpipe-plugin-sdk/schema" -) - -// determine whether we have a ducklake table for this table, and if so, whether it needs schema updating -func (w *Converter) ensureDuckLakeTable(tableName string) error { - query := fmt.Sprintf("select exists (select 1 from information_schema.tables where table_name = '%s')", tableName) - var exists bool - if err := w.db.QueryRow(query).Scan(&exists); err != nil { - return err - } - if !exists { - return w.createDuckLakeTable(tableName) - } - return nil -} - -// createDuckLakeTable creates a DuckLake table based on the ConversionSchema -func (w *Converter) createDuckLakeTable(tableName string) error { - - // Generate the CREATE TABLE SQL - createTableSQL := w.buildCreateDucklakeTableSQL(tableName) - - // Execute the CREATE TABLE statement - _, err := w.db.Exec(createTableSQL) - if err != nil { - return fmt.Errorf("failed to create table %s: %w", tableName, err) - } - - // Set partitioning using ALTER TABLE - // partition by the partition, index, year and month - partitionColumns := []string{constants.TpPartition, constants.TpIndex, fmt.Sprintf("year(%s)", constants.TpTimestamp), fmt.Sprintf("month(%s)", constants.TpTimestamp)} - alterTableSQL := fmt.Sprintf(`alter table "%s" set partitioned by (%s);`, - tableName, - strings.Join(partitionColumns, ", ")) - - _, err = w.db.Exec(alterTableSQL) - if err != nil { - return fmt.Errorf("failed to set partitioning for table %s: %w", tableName, err) - } - - return nil -} - -// buildCreateDucklakeTableSQL generates the CREATE TABLE SQL statement based on the ConversionSchema -func (w *Converter) buildCreateDucklakeTableSQL(tableName string) string { - // Build column definitions in sorted order - var columnDefinitions []string - for _, column := range w.conversionSchema.Columns { - columnDef := w.buildColumnDefinition(column) - columnDefinitions = append(columnDefinitions, columnDef) - } - - return fmt.Sprintf(`create table if not exists "%s" ( -%s -);`, - tableName, - strings.Join(columnDefinitions, ",\n")) -} - -// buildColumnDefinition generates the SQL definition for a single column -func (w *Converter) buildColumnDefinition(column *schema.ColumnSchema) string { - columnName := fmt.Sprintf("\"%s\"", column.ColumnName) - - // Handle different column types - switch column.Type { - case "struct": - // For struct types, we need to build the struct definition - structDef := w.buildStructDefinition(column) - return fmt.Sprintf("\t%s %s", columnName, structDef) - case "json": - // json type - return fmt.Sprintf("\t%s json", columnName) - default: - // For scalar types, just use the type directly (lower case) - return fmt.Sprintf("\t%s %s", columnName, strings.ToLower(column.Type)) - } -} - -// buildStructDefinition generates the SQL struct definition for a struct column -func (w *Converter) buildStructDefinition(column *schema.ColumnSchema) string { - if len(column.StructFields) == 0 { - return "struct" - } - - var fieldDefinitions []string - for _, field := range column.StructFields { - fieldName := fmt.Sprintf("\"%s\"", field.ColumnName) - fieldType := strings.ToLower(field.Type) - - if field.Type == "struct" { - // Recursively build nested struct definition - nestedStruct := w.buildStructDefinition(field) - fieldDefinitions = append(fieldDefinitions, fmt.Sprintf("%s %s", fieldName, nestedStruct)) - } else { - fieldDefinitions = append(fieldDefinitions, fmt.Sprintf("%s %s", fieldName, fieldType)) - } - } - - return fmt.Sprintf("struct(%s)", strings.Join(fieldDefinitions, ", ")) -} - -// CheckTableSchema checks if the specified table exists in the DuckDB database and compares its schema with the -// provided schema. -// it returns a TableSchemaStatus indicating whether the table exists, whether the schema matches, and any differences. -// THis is not used at present but will be used when we implement ducklake schema evolution handling -func (w *Converter) CheckTableSchema(db *sql.DB, tableName string, conversionSchema schema.ConversionSchema) (TableSchemaStatus, error) { - // Check if table exists - exists, err := w.tableExists(db, tableName) - if err != nil { - return TableSchemaStatus{}, err - } - - if !exists { - return TableSchemaStatus{}, nil - } - - // Get existing schema - existingSchema, err := w.getTableSchema(db, tableName) - if err != nil { - return TableSchemaStatus{}, fmt.Errorf("failed to retrieve schema: %w", err) - } - - // Use constructor to create status from comparison - diff := NewTableSchemaStatusFromComparison(existingSchema, conversionSchema) - return diff, nil -} - -func (w *Converter) tableExists(db *sql.DB, tableName string) (bool, error) { - sanitizedTableName, err := backend.SanitizeDuckDBIdentifier(tableName) - if err != nil { - return false, fmt.Errorf("invalid table name %s: %w", tableName, err) - } - //nolint:gosec // table name is sanitized - query := fmt.Sprintf("select exists (select 1 from information_schema.tables where table_name = '%s')", sanitizedTableName) - var exists int - if err := db.QueryRow(query).Scan(&exists); err != nil { - return false, err - } - return exists == 1, nil -} - -func (w *Converter) getTableSchema(db *sql.DB, tableName string) (map[string]schema.ColumnSchema, error) { - query := fmt.Sprintf("pragma table_info(%s);", tableName) - rows, err := db.Query(query) - if err != nil { - return nil, err - } - defer rows.Close() - - schemaMap := make(map[string]schema.ColumnSchema) - for rows.Next() { - var name, dataType string - var notNull, pk int - var dfltValue sql.NullString - - if err := rows.Scan(&name, &dataType, ¬Null, &dfltValue, &pk); err != nil { - return nil, err - } - - schemaMap[name] = schema.ColumnSchema{ - ColumnName: name, - Type: dataType, - } - } - - return schemaMap, nil -} diff --git a/internal/database/ducklake_table.go b/internal/database/ducklake_table.go new file mode 100644 index 00000000..b774b17c --- /dev/null +++ b/internal/database/ducklake_table.go @@ -0,0 +1,107 @@ +package database + +import ( + "fmt" + "strings" + + "github.com/turbot/tailpipe-plugin-sdk/constants" + "github.com/turbot/tailpipe-plugin-sdk/schema" +) + +// EnsureDuckLakeTable determines whether we have a ducklake table for this table, and if so, whether it needs schema updating +func EnsureDuckLakeTable(columns []*schema.ColumnSchema, db *DuckDb, tableName string) error { + query := fmt.Sprintf("select exists (select 1 from information_schema.tables where table_name = '%s')", tableName) + var exists bool + if err := db.QueryRow(query).Scan(&exists); err != nil { + return err + } + if !exists { + return createDuckLakeTable(columns, db, tableName) + } + return nil +} + +// createDuckLakeTable creates a DuckLake table based on the ConversionSchema +func createDuckLakeTable(columns []*schema.ColumnSchema, db *DuckDb, tableName string) error { + + // Generate the CREATE TABLE SQL + createTableSQL := buildCreateDucklakeTableSQL(columns, tableName) + + // Execute the CREATE TABLE statement + _, err := db.Exec(createTableSQL) + if err != nil { + return fmt.Errorf("failed to create table %s: %w", tableName, err) + } + + // Set partitioning using ALTER TABLE + // partition by the partition, index, year and month + partitionColumns := []string{constants.TpPartition, constants.TpIndex, fmt.Sprintf("year(%s)", constants.TpTimestamp), fmt.Sprintf("month(%s)", constants.TpTimestamp)} + alterTableSQL := fmt.Sprintf(`alter table "%s" set partitioned by (%s);`, + tableName, + strings.Join(partitionColumns, ", ")) + + _, err = db.Exec(alterTableSQL) + if err != nil { + return fmt.Errorf("failed to set partitioning for table %s: %w", tableName, err) + } + + return nil +} + +// buildCreateDucklakeTableSQL generates the CREATE TABLE SQL statement based on the ConversionSchema +func buildCreateDucklakeTableSQL(columns []*schema.ColumnSchema, tableName string) string { + // Build column definitions in sorted order + var columnDefinitions []string + for _, column := range columns { + columnDef := buildColumnDefinition(column) + columnDefinitions = append(columnDefinitions, columnDef) + } + + return fmt.Sprintf(`create table if not exists "%s" ( +%s +);`, + tableName, + strings.Join(columnDefinitions, ",\n")) +} + +// buildColumnDefinition generates the SQL definition for a single column +func buildColumnDefinition(column *schema.ColumnSchema) string { + columnName := fmt.Sprintf("\"%s\"", column.ColumnName) + + // Handle different column types + switch column.Type { + case "struct": + // For struct types, we need to build the struct definition + structDef := buildStructDefinition(column) + return fmt.Sprintf("\t%s %s", columnName, structDef) + case "json": + // json type + return fmt.Sprintf("\t%s json", columnName) + default: + // For scalar types, just use the type directly (lower case) + return fmt.Sprintf("\t%s %s", columnName, strings.ToLower(column.Type)) + } +} + +// buildStructDefinition generates the SQL struct definition for a struct column +func buildStructDefinition(column *schema.ColumnSchema) string { + if len(column.StructFields) == 0 { + return "struct" + } + + var fieldDefinitions []string + for _, field := range column.StructFields { + fieldName := fmt.Sprintf("\"%s\"", field.ColumnName) + fieldType := strings.ToLower(field.Type) + + if field.Type == "struct" { + // Recursively build nested struct definition + nestedStruct := buildStructDefinition(field) + fieldDefinitions = append(fieldDefinitions, fmt.Sprintf("%s %s", fieldName, nestedStruct)) + } else { + fieldDefinitions = append(fieldDefinitions, fmt.Sprintf("%s %s", fieldName, fieldType)) + } + } + + return fmt.Sprintf("struct(%s)", strings.Join(fieldDefinitions, ", ")) +} diff --git a/internal/database/tables.go b/internal/database/tables.go index e2684719..4d8f7acb 100644 --- a/internal/database/tables.go +++ b/internal/database/tables.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/turbot/pipe-fittings/v2/constants" + "github.com/turbot/tailpipe-plugin-sdk/schema" ) // GetTables returns the list of tables in the DuckLake metadata catalog @@ -64,3 +65,189 @@ order by c.column_name;`, constants.DuckLakeMetadataCatalog, constants.DuckLakeM return schema, nil } + +// GetLegacyTableViews retrieves the names of all table views in the legacy database(tailpipe.db) file +func GetLegacyTableViews(ctx context.Context, db *DuckDb) ([]string, error) { + query := "select table_name from information_schema.tables where table_type='VIEW';" + rows, err := db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to get table views: %w", err) + } + defer rows.Close() + + var tableViews []string + for rows.Next() { + var tableView string + err = rows.Scan(&tableView) + if err != nil { + return nil, fmt.Errorf("failed to scan table view: %w", err) + } + tableViews = append(tableViews, tableView) + } + return tableViews, nil +} + +// GetLegacyTableViewSchema retrieves the schema of a table view in the legacy database(tailpipe.db) file +func GetLegacyTableViewSchema(ctx context.Context, viewName string, db *DuckDb) (*schema.TableSchema, error) { + query := ` + select column_name, data_type + from information_schema.columns + where table_name = ? ORDER BY columns.column_name; + ` + rows, err := db.QueryContext(ctx, query, viewName) + if err != nil { + return nil, fmt.Errorf("failed to get view schema for %s: %w", viewName, err) + } + defer rows.Close() + + ts := &schema.TableSchema{ + Name: viewName, + Columns: []*schema.ColumnSchema{}, + } + for rows.Next() { + // here each row is a column, so we need to populate the TableSchema.Columns, particularly the + // ColumnName, Type and StructFields + var columnName, columnType string + err = rows.Scan(&columnName, &columnType) + if err != nil { + return nil, fmt.Errorf("failed to scan column schema: %w", err) + } + + col := buildColumnSchema(columnName, columnType) + ts.Columns = append(ts.Columns, col) + } + + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating over view schema rows: %w", err) + } + + return ts, nil +} + +// buildColumnSchema constructs a ColumnSchema from a DuckDB data type string. +// It handles primitive types as well as struct and struct[] recursively, populating StructFields. +func buildColumnSchema(columnName string, duckdbType string) *schema.ColumnSchema { + t := strings.TrimSpace(duckdbType) + lower := strings.ToLower(t) + + // Helper to set basic column properties + newCol := func(name, typ string, children []*schema.ColumnSchema) *schema.ColumnSchema { + return &schema.ColumnSchema{ + ColumnName: name, + SourceName: name, + Type: typ, + StructFields: children, + } + } + + // Detect struct or struct[] + if strings.HasPrefix(lower, "struct(") || strings.HasPrefix(lower, "struct ") { + isArray := false + // Handle optional [] suffix indicating array of struct + if strings.HasSuffix(lower, ")[]") { + isArray = true + } + // Extract inner content between the first '(' and the matching ')' + start := strings.Index(t, "(") + end := strings.LastIndex(t, ")") + inner := "" + if start >= 0 && end > start { + inner = strings.TrimSpace(t[start+1 : end]) + } + + fields := parseStructFields(inner) + typ := "struct" + if isArray { + typ = "struct[]" + } + return newCol(columnName, typ, fields) + } + + // Primitive or other complex types - just set as-is + return newCol(columnName, lower, nil) +} + +// parseStructFields parses the content inside a DuckDB struct(...) definition into ColumnSchemas. +// It supports nested struct/struct[] types by recursively building ColumnSchemas for child fields. +func parseStructFields(inner string) []*schema.ColumnSchema { + // Split by top-level commas (not within nested parentheses) + parts := splitTopLevel(inner, ',') + var fields []*schema.ColumnSchema + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + // parse field name (optionally quoted) and type + name, typ := parseFieldNameAndType(p) + if name == "" || typ == "" { + continue + } + col := buildColumnSchema(name, typ) + fields = append(fields, col) + } + return fields +} + +// parseFieldNameAndType parses a single struct field spec of the form: +// +// name type +// "name with spaces" type +// +// where type may itself be struct(...)[]. Returns name and the raw type string. +func parseFieldNameAndType(s string) (string, string) { + s = strings.TrimSpace(s) + if s == "" { + return "", "" + } + if s[0] == '"' { + // quoted name + // find closing quote + i := 1 + for i < len(s) && s[i] != '"' { + i++ + } + if i >= len(s) { + return "", "" + } + name := s[1:i] + rest := strings.TrimSpace(s[i+1:]) + // rest should start with the type + return name, rest + } + // unquoted name up to first space + idx := strings.IndexFunc(s, func(r rune) bool { return r == ' ' || r == '\t' }) + if idx == -1 { + // no type specified + return "", "" + } + name := strings.TrimSpace(s[:idx]) + typ := strings.TrimSpace(s[idx+1:]) + return name, typ +} + +// splitTopLevel splits s by sep, ignoring separators enclosed in parentheses. +func splitTopLevel(s string, sep rune) []string { + var res []string + level := 0 + start := 0 + for i, r := range s { + switch r { + case '(': + level++ + case ')': + if level > 0 { + level-- + } + } + if r == sep && level == 0 { + res = append(res, strings.TrimSpace(s[start:i])) + start = i + 1 + } + } + // add last segment + if start <= len(s) { + res = append(res, strings.TrimSpace(s[start:])) + } + return res +} diff --git a/internal/migration/migration.go b/internal/migration/migration.go new file mode 100644 index 00000000..bddd383a --- /dev/null +++ b/internal/migration/migration.go @@ -0,0 +1,511 @@ +package migration + +import ( + "context" + "database/sql" + "errors" + "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/briandowns/spinner" + perr "github.com/turbot/pipe-fittings/v2/error_helpers" + "github.com/turbot/pipe-fittings/v2/utils" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/database" + "github.com/turbot/tailpipe/internal/filepaths" +) + +// MigrateDataToDucklake performs migration of views from tailpipe.db and associated parquet files +// into the new DuckLake metadata catalog +func MigrateDataToDucklake(ctx context.Context) error { + // Determine source and migration directories + dataDefaultDir := config.GlobalWorkspaceProfile.GetDataDir() + migratingDefaultDir := config.GlobalWorkspaceProfile.GetMigratingDir() + // failed dir is derived via GetMigrationFailedDir() where needed + + var matchedTableDirs, unmatchedTableDirs []string + status := NewMigrationStatus(0) + cancelledHandled := false + defer func() { + if ctx.Err() != nil && !cancelledHandled { + _ = onCancelled(status) + } + }() + + // if the ~/.tailpipe/data directory has a .db file, it means that this is the first time we are migrating + // if the ~/.tailpipe/migration/migrating directory has a .db file, it means that this is a resume migration + initialMigration := hasTailpipeDb(dataDefaultDir) + continueMigration := hasTailpipeDb(migratingDefaultDir) + + // validate: both should not be true - return that last migration left things in a bad state + if initialMigration && continueMigration { + return fmt.Errorf("Invalid migration state: found tailpipe.db in both data and migrating directories. This should not happen. Please contact Turbot support for assistance.") + } + + // STEP 1: Check if migration is needed + // We need to migrate if it is the first time we are migrating or if we are resuming a migration + if !initialMigration && !continueMigration { + slog.Info("No migration needed - no tailpipe.db found in data or migrating directory") + return nil + } + + // Choose DB path for discovery + // If this is the first time we are migrating, we need to use .db file from the ~/.tailpipe/data directory + // If this is a resume migration, we need to use .db file from the ~/.tailpipe/migration/migrating directory + var discoveryDbPath string + if initialMigration { + discoveryDbPath = filepath.Join(dataDefaultDir, "tailpipe.db") + } else { + discoveryDbPath = filepath.Join(migratingDefaultDir, "tailpipe.db") + } + + // STEP 2: Discover legacy tables and their schemas (from chosen DB path) + // This returns the list of views and a map of view name to its schema + views, schemas, err := discoverLegacyTablesAndSchemas(ctx, discoveryDbPath) + if err != nil { + return fmt.Errorf("failed to discover legacy tables: %w", err) + } + + slog.Info("Views: ", "views", views) + slog.Info("Schemas: ", "schemas", schemas) + + // STEP 3: If this is the first time we are migrating(tables in ~/.tailpipe/data) then move the whole contents of data dir + // into ~/.tailpipe/migration/migrating respecting the same folder structure. + // First-run: transactionally move contents via moveDirContents: copy data to migrating, move tailpipe.db to migrated, + // then empty the original data directory. On any failure, the migrating directory is removed. + if initialMigration { + if err := moveDirContents(ctx, dataDefaultDir, migratingDefaultDir); err != nil { + return err + } + } + + // STEP 4: We have now moved the data into migrating. We have the list of views from the legacy DB. + // We now need to find the matching table directories in migrating/default by scanning migrating/ + // for tp_table=* directories. + // The matching table directories are the ones that have a view in the database. + // The unmatched table directories are the ones that have data(.parquet files) but no view in the database. + // We will move these to migrated/default. + + // set the base directory to ~.tailpipe/migration/migrating/ + baseDir := migratingDefaultDir + matchedTableDirs, unmatchedTableDirs, err = findMatchingTableDirs(baseDir, views) + if err != nil { + return err + } + // move the unmatched table directories to 'unmigrated' + if err = archiveUnmatchedDirs(ctx, unmatchedTableDirs); err != nil { + return err + } + + // Initialize status with total tables to migrate + status.Total = len(matchedTableDirs) + status.update() + + // Pre-compute total parquet files across matched directories + totalFiles, err := countParquetFiles(ctx, matchedTableDirs) + if err == nil { + status.TotalFiles = totalFiles + status.updateFiles() + } + + // Spinner for migration progress + sp := spinner.New( + spinner.CharSets[14], + 100*time.Millisecond, + spinner.WithHiddenCursor(true), + spinner.WithWriter(os.Stdout), + ) + sp.Suffix = fmt.Sprintf(" Migrating tables to DuckLake (%d/%d, %0.1f%%) | parquet files (%d/%d)", status.Migrated, status.Total, status.ProgressPercent, status.MigratedFiles, status.TotalFiles) + sp.Start() + + updateStatus := func(st *MigrationStatus) { + sp.Suffix = fmt.Sprintf(" Migrating tables to DuckLake (%d/%d, %0.1f%%) | parquet files (%d/%d)", st.Migrated, st.Total, st.ProgressPercent, st.MigratedFiles, st.TotalFiles) + } + + // STEP 5: Do Migration: Traverse matched table directories, find leaf nodes with parquet files, + // and perform INSERT within a transaction. On success, move leaf dir to migrated. + err = doMigration(ctx, matchedTableDirs, schemas, status, updateStatus) + sp.Stop() + + // If cancellation arrived after doMigration returned, prefer the CANCELLED outcome + if perr.IsContextCancelledError(ctx.Err()) { + _ = onCancelled(status) + cancelledHandled = true + return ctx.Err() + } + + // Post-migration outcomes + if status.Failed > 0 { + if err := onFailed(status); err != nil { + return err + } + } else { + if err := onSuccessful(status); err != nil { + return err + } + } + + return err +} + +// moveDirContents handles the initial migration move: copy data dir into migrating and move the legacy DB +// into migrated. If any step fails, it removes the migrating directory and shows a support warning. +func moveDirContents(ctx context.Context, dataDefaultDir, migratingDefaultDir string) (err error) { + migratedDir := config.GlobalWorkspaceProfile.GetMigratedDir() + defer func() { + if err != nil { + _ = os.RemoveAll(migratingDefaultDir) + perr.ShowWarning(fmt.Sprintf("Migration initialisation failed. Cleaned up '%s'. Please contact Turbot support.", migratingDefaultDir)) + } + }() + + // 1) Ensure the destination for the DB exists first + // Reason: we will move tailpipe.db after copying data; guaranteeing the target avoids partial moves later. + if err = os.MkdirAll(migratedDir, 0755); err != nil { + return err + } + // 2) Copy ALL data from data/default -> migration/migrating/default (do not delete source yet) + // Reason: copying first keeps the legacy data readable if the process crashes midway. + if err = utils.CopyDir(ctx, dataDefaultDir, migratingDefaultDir); err != nil { + return err + } + // 3) Move the DB file from data/default -> migration/migrated/default + // Reason: once data copy succeeded, moving tailpipe.db signals the backup exists and clarifies resume semantics. + if err = utils.MoveFile(filepath.Join(dataDefaultDir, "tailpipe.db"), filepath.Join(migratedDir, "tailpipe.db")); err != nil { + return err + } + // 4) Empty the original data directory last to emulate an atomic move + // Reason: only after successful copy+db move do we clear the source so we never strand users without their legacy data. + if err = utils.EmptyDir(dataDefaultDir); err != nil { + return err + } + return nil +} + +// discoverLegacyTablesAndSchemas enumerates legacy DuckDB views and, for each view, its schema. +// It returns the list of view names and a map of view name to its schema (column->type). +// If the legacy database contains no views, both return values are empty. +func discoverLegacyTablesAndSchemas(ctx context.Context, dbPath string) ([]string, map[string]*schema.TableSchema, error) { + // open a duckdb connection to the legacy legacyDb + legacyDb, err := database.NewDuckDb(database.WithDbFile(dbPath)) + if err != nil { + return nil, nil, err + } + defer legacyDb.Close() + + views, err := database.GetLegacyTableViews(ctx, legacyDb) + if err != nil || len(views) == 0 { + return []string{}, map[string]*schema.TableSchema{}, err + } + if perr.IsContextCancelledError(ctx.Err()) { + return nil, nil, ctx.Err() + } + + schemas := make(map[string]*schema.TableSchema) + for _, v := range views { + if perr.IsContextCancelledError(ctx.Err()) { + return nil, nil, ctx.Err() + } + // get row count for the view (optional future optimization) and schema + ts, err := database.GetLegacyTableViewSchema(ctx, v, legacyDb) + if err != nil { + continue + } + schemas[v] = ts + } + return views, schemas, nil +} + +// migrateTableDirectory recursively traverses a table directory, finds leaf nodes containing +// parquet files, and for each leaf executes a placeholder INSERT within a transaction. +// On success, it moves the leaf directory from migrating to migrated. +func migrateTableDirectory(ctx context.Context, db *database.DuckDb, tableName string, dirPath string, ts *schema.TableSchema, status *MigrationStatus) error { + // create the table if not exists + err := database.EnsureDuckLakeTable(ts.Columns, db, tableName) + if err != nil { + // fatal – move table dir to failed and return error + moveTableDirToFailed(ctx, dirPath) + return err + } + entries, err := os.ReadDir(dirPath) + if err != nil { + // fatal – move table dir to failed and return error + moveTableDirToFailed(ctx, dirPath) + return err + } + + var parquetFiles []string + var errList []error + for _, entry := range entries { + // early exit on cancellation + if ctx.Err() != nil { + errList = append(errList, ctx.Err()) + // TODO format better + return errors.Join(errList...) + } + + if entry.IsDir() { + subDir := filepath.Join(dirPath, entry.Name()) + if err := migrateTableDirectory(ctx, db, tableName, subDir, ts, status); err != nil { + // just add to error list and continue with other entries + errList = append(errList, err) + } + } + + if strings.HasSuffix(strings.ToLower(entry.Name()), ".parquet") { + parquetFiles = append(parquetFiles, filepath.Join(dirPath, entry.Name())) + } + } + + // If this directory contains parquet files, treat it as a leaf node for migration + if len(parquetFiles) > 0 { + err = migrateParquetFiles(ctx, db, tableName, dirPath, ts, status, parquetFiles) + if err != nil { + errList = append(errList, err) + } + } + + // TODO format better + return errors.Join(errList...) +} + +func migrateParquetFiles(ctx context.Context, db *database.DuckDb, tableName string, dirPath string, ts *schema.TableSchema, status *MigrationStatus, parquetFiles []string) error { + filesInLeaf := len(parquetFiles) + // Placeholder: validate schema (from 'ts') against parquet files if needed + slog.Info("Found leaf node with parquet files", "table", tableName, "dir", dirPath, "files", filesInLeaf) + + // Begin transaction + tx, err := db.BeginTx(ctx, nil) + if err != nil { + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + + // Build and execute the parquet insert + if err := insertFromParquetFiles(ctx, tx, tableName, ts.Columns, parquetFiles); err != nil { + slog.Debug("Rolling back transaction", "table", tableName, "error", err) + txErr := tx.Rollback() + if txErr != nil { + slog.Error("Transaction rollback failed", "table", tableName, "error", txErr) + } + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + // Note: cancellation will be handled by outer logic; if needed, you can check and rollback here. + + if err := tx.Commit(); err != nil { + slog.Error("Error committing transaction", "table", tableName, "error", err) + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + + slog.Info("Successfully committed transaction", "table", tableName, "dir", dirPath, "files", filesInLeaf) + + // On success, move the entire leaf directory from migrating to migrated + migratingRoot := config.GlobalWorkspaceProfile.GetMigratingDir() + migratedRoot := config.GlobalWorkspaceProfile.GetMigratedDir() + rel, err := filepath.Rel(migratingRoot, dirPath) + if err != nil { + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + destDir := filepath.Join(migratedRoot, rel) + if err := os.MkdirAll(filepath.Dir(destDir), 0755); err != nil { + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + if err := utils.MoveDirContents(ctx, dirPath, destDir); err != nil { + moveTableDirToFailed(ctx, dirPath) + status.OnFilesFailed(filesInLeaf) + return err + } + _ = os.Remove(dirPath) + status.OnFilesMigrated(filesInLeaf) + slog.Info("Migrated leaf node", "table", tableName, "source", dirPath, "destination", destDir) + return nil +} + +// move any table directories with no corresponding view to ~/.tailpipe/migration/unmigrated/ - we will not migrate them +func archiveUnmatchedDirs(ctx context.Context, unmatchedTableDirs []string) error { + for _, d := range unmatchedTableDirs { + // move to ~/.tailpipe/migration/migrated/ + tname := strings.TrimPrefix(filepath.Base(d), "tp_table=") + slog.Warn("Table %s has data but no view in database; moving without migration", "table", tname, "dir", d) + migratingRoot := config.GlobalWorkspaceProfile.GetMigratingDir() + unmigratedRoot := config.GlobalWorkspaceProfile.GetUnmigratedDir() + // get the relative path from migrating root to d + rel, err := filepath.Rel(migratingRoot, d) + if err != nil { + return err + } + // build a dest path by joining unmigrated root with this relative path + destPath := filepath.Join(unmigratedRoot, rel) + if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { + return err + } + // move the entire directory + if err := utils.MoveDirContents(ctx, d, destPath); err != nil { + return err + } + err = os.Remove(d) + if err != nil { + return err + } + } + return nil +} + +// doMigration performs the migration of the matched table directories and updates status +func doMigration(ctx context.Context, matchedTableDirs []string, schemas map[string]*schema.TableSchema, status *MigrationStatus, onUpdate func(*MigrationStatus)) error { + if onUpdate == nil { + return fmt.Errorf("onUpdate callback is required") + } + ducklakeDb, err := database.NewDuckDb(database.WithDuckLake()) + if err != nil { + return err + } + defer ducklakeDb.Close() + + for _, tableDir := range matchedTableDirs { + tableName := strings.TrimPrefix(filepath.Base(tableDir), "tp_table=") + if tableName == "" { + continue + } + ts := schemas[tableName] + if err := migrateTableDirectory(ctx, ducklakeDb, tableName, tableDir, ts, status); err != nil { + slog.Warn("Migration failed for table; moving to migration/failed", "table", tableName, "error", err) + status.OnTableFailed(tableName) + } else { + status.OnTableMigrated() + } + // update our status + onUpdate(status) + } + return nil +} + +// moveTableDirToFailed moves a table directory from migrating to failed, preserving relative path. +func moveTableDirToFailed(ctx context.Context, dirPath string) { + migratingRoot := config.GlobalWorkspaceProfile.GetMigratingDir() + failedRoot := config.GlobalWorkspaceProfile.GetMigrationFailedDir() + rel, err := filepath.Rel(migratingRoot, dirPath) + if err != nil { + return + } + destDir := filepath.Join(failedRoot, rel) + err = os.MkdirAll(filepath.Dir(destDir), 0755) + if err != nil { + slog.Error("moveTableDirToFailed: Failed to create parent for failed dir", "error", err, "dir", destDir) + return + } + err = utils.MoveDirContents(ctx, dirPath, destDir) + if err != nil { + slog.Error("moveTableDirToFailed: Failed to move dir to failed", "error", err, "source", dirPath, "destination", destDir) + return + } + err = os.Remove(dirPath) + if err != nil { + slog.Error("moveTableDirToFailed: Failed to remove original dir after move", "error", err, "dir", dirPath) + } +} + +// countParquetFiles walks all matched table directories and counts parquet files +func countParquetFiles(ctx context.Context, dirs []string) (int, error) { + total := 0 + for _, root := range dirs { + // early exit on cancellation + if ctx.Err() != nil { + return 0, ctx.Err() + } + if err := filepath.WalkDir(root, func(p string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() && strings.HasSuffix(strings.ToLower(d.Name()), ".parquet") { + total++ + } + return nil + }); err != nil { + return 0, err + } + } + return total, nil +} + +// insertFromParquetFiles builds and executes an INSERT … SELECT read_parquet(...) for a set of parquet files +func insertFromParquetFiles(ctx context.Context, tx *sql.Tx, tableName string, columns []*schema.ColumnSchema, parquetFiles []string) error { + var colList []string + for _, c := range columns { + colList = append(colList, fmt.Sprintf(`"%s"`, c.ColumnName)) + } + cols := strings.Join(colList, ", ") + escape := func(p string) string { return strings.ReplaceAll(p, "'", "''") } + var fileSQL string + if len(parquetFiles) == 1 { + fileSQL = fmt.Sprintf("'%s'", escape(parquetFiles[0])) + } else { + var quoted []string + for _, f := range parquetFiles { + quoted = append(quoted, fmt.Sprintf("'%s'", escape(f))) + } + fileSQL = "[" + strings.Join(quoted, ", ") + "]" + } + //nolint:gosec // file paths are sanitized + query := fmt.Sprintf(` + insert into "%s" (%s) + select %s from read_parquet(%s) + `, tableName, cols, cols, fileSQL) + _, err := tx.ExecContext(ctx, query) + return err +} + +// onSuccessful handles success outcome: cleans migrating db, prunes empty dirs, prints summary +func onSuccessful(status *MigrationStatus) error { + // Remove any leftover db in migrating + _ = os.Remove(filepath.Join(config.GlobalWorkspaceProfile.GetMigratingDir(), "tailpipe.db")) + // Prune empty dirs in migrating + if err := filepaths.PruneTree(config.GlobalWorkspaceProfile.GetMigratingDir()); err != nil { + return fmt.Errorf("failed to prune empty directories in migrating: %w", err) + } + status.Finish("SUCCESS") + perr.ShowWarning(status.StatusMessage()) + return nil +} + +// onCancelled handles cancellation outcome: keep migrating db, prune empties, print summary +func onCancelled(status *MigrationStatus) error { + // Do not move db; just prune empties so tree is clean + _ = filepaths.PruneTree(config.GlobalWorkspaceProfile.GetMigratingDir()) + status.Finish("CANCELLED") + perr.ShowWarning(status.StatusMessage()) + return nil +} + +// onFailed handles failure outcome: move db to failed, prune empties, print summary +func onFailed(status *MigrationStatus) error { + failedDefaultDir := config.GlobalWorkspaceProfile.GetMigrationFailedDir() + if err := os.MkdirAll(failedDefaultDir, 0755); err != nil { + return err + } + srcDb := filepath.Join(config.GlobalWorkspaceProfile.GetMigratingDir(), "tailpipe.db") + if _, err := os.Stat(srcDb); err == nil { + if err := utils.MoveFile(srcDb, filepath.Join(failedDefaultDir, "tailpipe.db")); err != nil { + return fmt.Errorf("failed to move legacy db to failed: %w", err) + } + } + _ = filepaths.PruneTree(config.GlobalWorkspaceProfile.GetMigratingDir()) + status.Finish("INCOMPLETE") + perr.ShowWarning(status.StatusMessage()) + return nil +} diff --git a/internal/migration/status.go b/internal/migration/status.go new file mode 100644 index 00000000..2f223f40 --- /dev/null +++ b/internal/migration/status.go @@ -0,0 +1,125 @@ +package migration + +import ( + "fmt" + "strings" + "time" + + "github.com/turbot/tailpipe/internal/config" +) + +type MigrationStatus struct { + Status string `json:"status"` + Total int `json:"total"` + Migrated int `json:"migrated"` + Failed int `json:"failed"` + Remaining int `json:"remaining"` + ProgressPercent float64 `json:"progress_percent"` + + TotalFiles int `json:"total_files"` + MigratedFiles int `json:"migrated_files"` + FailedFiles int `json:"failed_files"` + RemainingFiles int `json:"remaining_files"` + + FailedTables []string `json:"failed_tables,omitempty"` + StartTime time.Time `json:"start_time"` + Duration time.Duration `json:"duration"` +} + +func NewMigrationStatus(total int) *MigrationStatus { + return &MigrationStatus{Total: total, Remaining: total, StartTime: time.Now()} +} + +func (s *MigrationStatus) OnTableMigrated() { + s.Migrated++ + s.update() +} + +func (s *MigrationStatus) OnTableFailed(tableName string) { + s.Failed++ + s.FailedTables = append(s.FailedTables, tableName) + s.update() +} + +func (s *MigrationStatus) OnFilesMigrated(n int) { + if n <= 0 { + return + } + s.MigratedFiles += n + s.updateFiles() +} + +func (s *MigrationStatus) OnFilesFailed(n int) { + if n <= 0 { + return + } + s.FailedFiles += n + s.updateFiles() +} + +func (s *MigrationStatus) update() { + s.Remaining = s.Total - s.Migrated - s.Failed + if s.Total > 0 { + s.ProgressPercent = float64(s.Migrated) * 100.0 / float64(s.Total) + } +} + +func (s *MigrationStatus) updateFiles() { + s.RemainingFiles = s.TotalFiles - s.MigratedFiles - s.FailedFiles +} + +func (s *MigrationStatus) Finish(outcome string) { + s.Status = outcome + s.Duration = time.Since(s.StartTime) +} + +// StatusMessage returns a user-facing status message (with stats) based on current migration status +func (s *MigrationStatus) StatusMessage() string { + migratedDir := config.GlobalWorkspaceProfile.GetMigratedDir() + failedDir := config.GlobalWorkspaceProfile.GetMigrationFailedDir() + migratingDir := config.GlobalWorkspaceProfile.GetMigratingDir() + + switch s.Status { + case "SUCCESS": + return fmt.Sprintf( + "DuckLake migration complete.\n"+ + "- Tables: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Parquet files: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Backup of migrated legacy data: '%s'\n", + s.Migrated, s.Total, s.Failed, s.Remaining, + s.MigratedFiles, s.TotalFiles, s.FailedFiles, s.RemainingFiles, + migratedDir, + ) + case "CANCELLED": + return fmt.Sprintf( + "DuckLake migration cancelled.\n"+ + "- Tables: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Parquet files: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Legacy DB preserved: '%s/tailpipe.db'\n\n"+ + "Re-run Tailpipe to resume migrating your data.\n", + s.Migrated, s.Total, s.Failed, s.Remaining, + s.MigratedFiles, s.TotalFiles, s.FailedFiles, s.RemainingFiles, + migratingDir, + ) + case "INCOMPLETE": + failedList := "(none)" + if len(s.FailedTables) > 0 { + failedList = strings.Join(s.FailedTables, ", ") + } + return fmt.Sprintf( + "DuckLake migration completed with issues.\n"+ + "- Tables: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Parquet files: %d/%d migrated (failed: %d, remaining: %d)\n"+ + "- Failed tables (%d): %s\n"+ + "- Failed data and legacy DB: '%s'\n"+ + "- Backup of migrated legacy data: '%s'\n", + s.Migrated, s.Total, s.Failed, s.Remaining, + s.MigratedFiles, s.TotalFiles, s.FailedFiles, s.RemainingFiles, + len(s.FailedTables), failedList, + failedDir, + migratedDir, + ) + default: + return "DuckLake migration status unknown" + } +} diff --git a/internal/migration/utils.go b/internal/migration/utils.go new file mode 100644 index 00000000..7cf485e5 --- /dev/null +++ b/internal/migration/utils.go @@ -0,0 +1,50 @@ +package migration + +import ( + "os" + "path/filepath" + "strings" +) + +// findMatchingTableDirs lists subdirectories of baseDir whose names start with +// "tp_table=" and whose table names exist in the provided tables slice. +// Also returns unmatched tp_table directories for which there is no view in the DB. +func findMatchingTableDirs(baseDir string, tables []string) ([]string, []string, error) { + entries, err := os.ReadDir(baseDir) + if err != nil { + return nil, nil, err + } + tableSet := make(map[string]struct{}, len(tables)) + for _, t := range tables { + tableSet[t] = struct{}{} + } + var matches []string + var unmatched []string + const prefix = "tp_table=" + for _, e := range entries { + if !e.IsDir() { + continue + } + name := e.Name() + if !strings.HasPrefix(name, prefix) { + continue + } + tableName := strings.TrimPrefix(name, prefix) + if _, ok := tableSet[tableName]; ok { + matches = append(matches, filepath.Join(baseDir, name)) + } else { + unmatched = append(unmatched, filepath.Join(baseDir, name)) + } + } + return matches, unmatched, nil +} + +// hasTailpipeDb checks if a tailpipe.db file exists in the provided directory. +func hasTailpipeDb(dir string) bool { + if dir == "" { + return false + } + p := filepath.Join(dir, "tailpipe.db") + _, err := os.Stat(p) + return err == nil +}