这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/turbot/go-kit v1.3.0
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2
github.com/turbot/tailpipe-plugin-sdk v0.9.3
github.com/zclconf/go-cty v1.16.3
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1308,8 +1308,8 @@ github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0=
github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ=
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1:zs87uA6QZsYLk4RRxDOIxt8ro/B2V6HzoMWm05Lo7ao=
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1 h1:4Y/51FNwJqavbz/O8T8NQkpp6+roiyoT7BrD/GLR2FU=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.1/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2 h1:FfKLkfbNmwxyPQIqDCd7m6o9bmtPB7D8a5txbVzjZp4=
github.com/turbot/pipe-fittings/v2 v2.7.0-rc.2/go.mod h1:V619+tgfLaqoEXFDNzA2p24TBZVf4IkDL9FDLQecMnE=
github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U=
github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
github.com/turbot/tailpipe-plugin-core v0.2.10 h1:2+B7W4hzyS/pBr1y5ns9w84piWGq/x+WdCUjyPaPreQ=
Expand Down
11 changes: 11 additions & 0 deletions internal/error_helpers/error_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func ShowWarning(warning string) {
fmt.Fprintf(opStream, "%s: %v\n", constants.ColoredWarn, warning)
}

// ShowInfo prints a non-critical info message to the appropriate output stream.
// Behaves like ShowWarning but with a calmer label (Note) to avoid alarming users
// for successful outcomes or informational messages.
func ShowInfo(info string) {
if len(info) == 0 {
return
}
opStream := GetWarningOutputStream()
fmt.Fprintf(opStream, "%s: %v\n", color.YellowString("Note"), info)
}

func PrefixError(err error, prefix string) error {
return fmt.Errorf("%s: %s\n", prefix, TransformErrorToTailpipe(err).Error())
}
Expand Down
33 changes: 33 additions & 0 deletions internal/migration/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package migration

import (
"fmt"
)

// MigrationError is an aggregate error that wraps multiple child errors
// encountered during migration.
type MigrationError struct {
errors []error
}

func NewMigrationError() *MigrationError {
return &MigrationError{errors: make([]error, 0)}
}

func (m *MigrationError) Append(err error) {
if err == nil {
return
}
m.errors = append(m.errors, err)
}

func (m *MigrationError) Len() int { return len(m.errors) }

// Error provides a compact summary string
func (m *MigrationError) Error() string {
return fmt.Sprintf("%d error(s) occurred during migration", len(m.errors))
}

// Unwrap returns the list of child errors so that errors.Is/As can walk them
// (supported since Go 1.20 with Unwrap() []error)
func (m *MigrationError) Unwrap() []error { return m.errors }
33 changes: 20 additions & 13 deletions internal/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package migration
import (
"context"
"database/sql"
"errors"
"fmt"
"io/fs"
"log/slog"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/turbot/tailpipe-plugin-sdk/schema"
"github.com/turbot/tailpipe/internal/config"
"github.com/turbot/tailpipe/internal/database"
"github.com/turbot/tailpipe/internal/error_helpers"
"github.com/turbot/tailpipe/internal/filepaths"
)

Expand Down Expand Up @@ -133,9 +133,12 @@ func MigrateDataToDucklake(ctx context.Context) error {
err = doMigration(ctx, matchedTableDirs, schemas, status, updateStatus)
sp.Stop()

logPath := filepath.Join(config.GlobalWorkspaceProfile.GetMigrationDir(), "migration.log")
// If cancellation arrived after doMigration returned, prefer the CANCELLED outcome
if perr.IsContextCancelledError(ctx.Err()) {
_ = onCancelled(status)
status.Finish("CANCELLED")
_ = status.WriteStatusToFile()
perr.ShowWarning(fmt.Sprintf("Migration cancelled. It will automatically resume next time you run Tailpipe.\nFor details, see %s\n", logPath))
cancelledHandled = true
return ctx.Err()
}
Expand All @@ -145,10 +148,12 @@ func MigrateDataToDucklake(ctx context.Context) error {
if err := onFailed(status); err != nil {
return err
}
perr.ShowWarning(fmt.Sprintf("Your data has been migrated to DuckLake with issues.\nFor details, see %s\n", logPath))
} else {
if err := onSuccessful(status); err != nil {
return err
}
error_helpers.ShowInfo(fmt.Sprintf("Your data has been migrated to DuckLake.\nFor details, see %s\n", logPath))
}

return err
Expand Down Expand Up @@ -241,20 +246,19 @@ func migrateTableDirectory(ctx context.Context, db *database.DuckDb, tableName s
}

var parquetFiles []string
var errList []error
aggErr := NewMigrationError()
for _, entry := range entries {
// early exit on cancellation
if ctx.Err() != nil {
errList = append(errList, ctx.Err())
// TODO format better
return errors.Join(errList...)
aggErr.Append(ctx.Err())
return aggErr
}

if entry.IsDir() {
subDir := filepath.Join(dirPath, entry.Name())
if err := migrateTableDirectory(ctx, db, tableName, subDir, ts, status); err != nil {
// just add to error list and continue with other entries
errList = append(errList, err)
aggErr.Append(err)
}
}

Expand All @@ -267,12 +271,15 @@ func migrateTableDirectory(ctx context.Context, db *database.DuckDb, tableName s
if len(parquetFiles) > 0 {
err = migrateParquetFiles(ctx, db, tableName, dirPath, ts, status, parquetFiles)
if err != nil {
errList = append(errList, err)
aggErr.Append(err)
status.AddError(fmt.Errorf("failed migrating parquet files for table '%s' at '%s': %w", tableName, dirPath, err))
}
}

// TODO format better
return errors.Join(errList...)
if aggErr.Len() == 0 {
return nil
}
return aggErr
}

func migrateParquetFiles(ctx context.Context, db *database.DuckDb, tableName string, dirPath string, ts *schema.TableSchema, status *MigrationStatus, parquetFiles []string) error {
Expand Down Expand Up @@ -479,7 +486,7 @@ func onSuccessful(status *MigrationStatus) error {
return fmt.Errorf("failed to prune empty directories in migrating: %w", err)
}
status.Finish("SUCCESS")
perr.ShowWarning(status.StatusMessage())
_ = status.WriteStatusToFile()
return nil
}

Expand All @@ -488,7 +495,7 @@ func onCancelled(status *MigrationStatus) error {
// Do not move db; just prune empties so tree is clean
_ = filepaths.PruneTree(config.GlobalWorkspaceProfile.GetMigratingDir())
status.Finish("CANCELLED")
perr.ShowWarning(status.StatusMessage())
_ = status.WriteStatusToFile()
return nil
}

Expand All @@ -506,6 +513,6 @@ func onFailed(status *MigrationStatus) error {
}
_ = filepaths.PruneTree(config.GlobalWorkspaceProfile.GetMigratingDir())
status.Finish("INCOMPLETE")
perr.ShowWarning(status.StatusMessage())
_ = status.WriteStatusToFile()
return nil
}
34 changes: 33 additions & 1 deletion internal/migration/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package migration

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

Expand All @@ -24,6 +26,8 @@ type MigrationStatus struct {
FailedTables []string `json:"failed_tables,omitempty"`
StartTime time.Time `json:"start_time"`
Duration time.Duration `json:"duration"`

Errors []string `json:"errors,omitempty"`
}

func NewMigrationStatus(total int) *MigrationStatus {
Expand Down Expand Up @@ -57,6 +61,13 @@ func (s *MigrationStatus) OnFilesFailed(n int) {
s.updateFiles()
}

func (s *MigrationStatus) AddError(err error) {
if err == nil {
return
}
s.Errors = append(s.Errors, err.Error())
}

func (s *MigrationStatus) update() {
s.Remaining = s.Total - s.Migrated - s.Failed
if s.Total > 0 {
Expand Down Expand Up @@ -106,7 +117,7 @@ func (s *MigrationStatus) StatusMessage() string {
if len(s.FailedTables) > 0 {
failedList = strings.Join(s.FailedTables, ", ")
}
return fmt.Sprintf(
base := fmt.Sprintf(
"DuckLake migration completed with issues.\n"+
"- Tables: %d/%d migrated (failed: %d, remaining: %d)\n"+
"- Parquet files: %d/%d migrated (failed: %d, remaining: %d)\n"+
Expand All @@ -119,7 +130,28 @@ func (s *MigrationStatus) StatusMessage() string {
failedDir,
migratedDir,
)
if len(s.Errors) > 0 {
base += fmt.Sprintf("\nErrors: %d error(s) occurred during migration\n", len(s.Errors))
base += "Details:\n"
for _, e := range s.Errors {
base += "- " + e + "\n"
}
}
return base
default:
return "DuckLake migration status unknown"
}
}

// WriteStatusToFile writes the status message to a migration stats file under the migration directory.
// The file is overwritten on each run (resume will update it).
func (s *MigrationStatus) WriteStatusToFile() error {
// Place the file under the migration root (e.g., ~/.tailpipe/migration/migration.log)
migrationRootDir := config.GlobalWorkspaceProfile.GetMigrationDir()
statsFile := filepath.Join(migrationRootDir, "migration.log")
msg := s.StatusMessage()
if err := os.MkdirAll(migrationRootDir, 0755); err != nil {
return err
}
return os.WriteFile(statsFile, []byte(msg), 0600)
}