这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"errors"
"os"

"github.com/spf13/cobra"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/turbot/pipe-fittings/v2/filepaths"
"github.com/turbot/pipe-fittings/v2/utils"
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/migration"
)

var exitCode int
Expand Down Expand Up @@ -68,8 +70,14 @@ func Execute() int {
// set the error output to stdout (as it;s common usage to redirect stderr to a file to capture logs
rootCmd.SetErr(os.Stdout)

// if the error is dues to unsupported migration, set a specific exit code - this will bve picked up by powerpipe
if err := rootCmd.Execute(); err != nil {
exitCode = -1
var unsupportedErr *migration.UnsupportedError
if errors.As(err, &unsupportedErr) {
exitCode = pconstants.ExitCodeMigrationUnsupported
} else {
exitCode = 1
}
}
return exitCode
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.24.0

replace (
github.com/c-bata/go-prompt => github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50
// github.com/turbot/pipe-fittings/v2 => ../pipe-fittings
//github.com/turbot/pipe-fittings/v2 => ../pipe-fittings
//github.com/turbot/tailpipe-plugin-core => ../tailpipe-plugin-core
// github.com/turbot/tailpipe-plugin-sdk => ../tailpipe-plugin-sdk
)
Expand All @@ -19,7 +19,7 @@ require (
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.11.0
github.com/turbot/go-kit v1.3.0
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2
github.com/turbot/pipe-fittings/v2 v2.7.0
github.com/turbot/tailpipe-plugin-sdk v0.9.3
github.com/zclconf/go-cty v1.16.3
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1308,8 +1308,8 @@ github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0=
github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ=
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1:zs87uA6QZsYLk4RRxDOIxt8ro/B2V6HzoMWm05Lo7ao=
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2 h1:FfKLkfbNmwxyPQIqDCd7m6o9bmtPB7D8a5txbVzjZp4=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE=
github.com/turbot/pipe-fittings/v2 v2.7.0 h1:eCmpMNlVtV3AxOzsn8njE3O6aoHc74WVAHOntia2hqY=
github.com/turbot/pipe-fittings/v2 v2.7.0/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE=
github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U=
github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
github.com/turbot/tailpipe-plugin-core v0.2.10 h1:2+B7W4hzyS/pBr1y5ns9w84piWGq/x+WdCUjyPaPreQ=
Expand Down
27 changes: 27 additions & 0 deletions internal/migration/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package migration

import "fmt"

// UnsupportedError represents an error when migration is not supported
// due to specific command line arguments or configuration
type UnsupportedError struct {
Reason string
}

func (e *UnsupportedError) Error() string {
msgFormat := "data must be migrated to Ducklake format - migration is not supported with '%s'.\n\nRun 'tailpipe query' to migrate your data to DuckLake format"
return fmt.Sprintf(msgFormat, e.Reason)
}

func (e *UnsupportedError) Is(target error) bool {
_, ok := target.(*UnsupportedError)
return ok
}

func (e *UnsupportedError) As(target interface{}) bool {
if t, ok := target.(**UnsupportedError); ok {
*t = e
return true
}
return false
}
33 changes: 22 additions & 11 deletions internal/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
// MigrateDataToDucklake performs migration of views from tailpipe.db and associated parquet files
// into the new DuckLake metadata catalog
func MigrateDataToDucklake(ctx context.Context) (err error) {
slog.Info("Starting data migration to DuckLake format")
// define a status message var - this will be set when we encounter any issues - or when we are successful
// this will be printed at the end of the function
var statusMsg string
Expand Down Expand Up @@ -82,11 +83,8 @@ func MigrateDataToDucklake(ctx context.Context) (err error) {
// if the output for this command is a machine readable format (csv/json) or progress is false,
// it is possible/likely that tailpipe is being used in a non interactive way - in this case,
// we should not prompt the user, instead return an error
msgFormat := "data must be migrated to Ducklake format - migration is not supported with '%s'.\n\nRun 'tailpipe query' to migrate your data to DuckLake format"
if error_helpers.IsMachineReadableOutput() {
return fmt.Errorf(msgFormat, "--output "+viper.GetString(constants.ArgOutput))
} else if viper.IsSet(constants.ArgProgress) && !viper.GetBool(constants.ArgProgress) {
return fmt.Errorf(msgFormat, "--progress=false")
if err := checkMigrationSupported(); err != nil {
return err
}

// Prompt the user to confirm migration
Expand Down Expand Up @@ -131,9 +129,6 @@ func MigrateDataToDucklake(ctx context.Context) (err error) {
return fmt.Errorf("failed to discover legacy tables: %w", err)
}

slog.Info("Views: ", "views", views)
slog.Info("Schemas: ", "schemas", schemas)

// STEP 3: If this is the first time we are migrating(tables in ~/.tailpipe/data) then move the whole contents of data dir
// into ~/.tailpipe/migration/migrating respecting the same folder structure.
// We do this by simply renaming the directory.
Expand Down Expand Up @@ -246,6 +241,24 @@ func MigrateDataToDucklake(ctx context.Context) (err error) {
return err
}

// check if the data migration is supported, based on the current arguments
// if the output for this command is a machine readable format (csv/json) or progress is false,
// it is possible/likely that tailpipe is being used in a non interactive way - in this case,
// we should not prompt the user, instead return an error
// NOTE: set exit code to
func checkMigrationSupported() error {
if error_helpers.IsMachineReadableOutput() {
return &UnsupportedError{
Reason: "--output " + viper.GetString(constants.ArgOutput),
}
} else if viper.IsSet(constants.ArgProgress) && !viper.GetBool(constants.ArgProgress) {
return &UnsupportedError{
Reason: "--progress=false",
}
}
return nil
}

// moveDataToMigrating ensures the migration folder exists and handles any existing migrating folder
func moveDataToMigrating(ctx context.Context, dataDefaultDir, migratingDefaultDir string) error {
// Ensure the 'migrating' folder exists
Expand Down Expand Up @@ -430,8 +443,6 @@ func migrateTableDirectory(ctx context.Context, db *database.DuckDb, tableName s

func migrateParquetFiles(ctx context.Context, db *database.DuckDb, tableName string, dirPath string, ts *schema.TableSchema, status *MigrationStatus, parquetFiles []string) error {
filesInLeaf := len(parquetFiles)
// Placeholder: validate schema (from 'ts') against parquet files if needed
slog.Info("Found leaf node with parquet files", "table", tableName, "dir", dirPath, "files", filesInLeaf)

// Begin transaction
tx, err := db.BeginTx(ctx, nil)
Expand Down Expand Up @@ -469,7 +480,7 @@ func migrateParquetFiles(ctx context.Context, db *database.DuckDb, tableName str
slog.Warn("Cleanup: could not remove migrated leaf directory", "table", tableName, "dir", dirPath, "error", err)
}
status.OnFilesMigrated(filesInLeaf)
slog.Info("Migrated leaf node", "table", tableName, "source", dirPath)
slog.Debug("Migrated leaf node", "table", tableName, "source", dirPath)
return nil
}

Expand Down