diff --git a/cmd/connect.go b/cmd/connect.go index 7b83a59d..dafb73ca 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -5,11 +5,14 @@ import ( "encoding/json" "fmt" "log" + "log/slog" "os" "path/filepath" "strings" "time" + "github.com/turbot/tailpipe/internal/filepaths" + "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/thediveo/enumflag/v2" @@ -23,7 +26,7 @@ import ( "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/database" - error_helpers "github.com/turbot/tailpipe/internal/error_helpers" + "github.com/turbot/tailpipe/internal/error_helpers" "golang.org/x/exp/maps" ) @@ -97,6 +100,17 @@ func runConnectCmd(cmd *cobra.Command, _ []string) { // use the signal-aware/cancelable context created upstream in preRunHook ctx := cmd.Context() + // first - determine whether we still have unmigrated data - if so, run the legacy connect command + // (NOTE: if a migration is in progress, fail with an error) + // TODO - remove this check in a future release + legacyDb, err := checkForLegacyDb() + error_helpers.FailOnError(err) + if legacyDb { + slog.Warn("Legacy data detected - running legacy connect command. Data will be migrated to ducklake when running any command except connect ") + runLegacyConnectCmd(cmd, nil) + return + } + defer func() { if r := recover(); r != nil { err = helpers.ToError(r) @@ -124,6 +138,22 @@ func runConnectCmd(cmd *cobra.Command, _ []string) { } +func checkForLegacyDb() (bool, error) { + legacyDbPath := filepaths.TailpipeLegacyDbFilePath() + _, err := os.Stat(legacyDbPath) + legacyDbExists := err == nil + + // if we are in the middle of a migration, return an error + migratingDir := config.GlobalWorkspaceProfile.GetMigratingDir() + migratingDbPath := filepath.Join(migratingDir, "tailpipe.db") + _, err = os.Stat(migratingDbPath) + migrationInProgress := err == nil + if migrationInProgress { + return false, fmt.Errorf("a data migration is in progress - run any tailpipe command apart from 'connect' to complete migration") + } + return legacyDbExists, nil +} + func generateInitFile(ctx context.Context) (string, error) { // cleanup the old db files if not in use err := cleanupOldInitFiles() diff --git a/cmd/connect_legacy.go b/cmd/connect_legacy.go new file mode 100644 index 00000000..9a9ebbbb --- /dev/null +++ b/cmd/connect_legacy.go @@ -0,0 +1,201 @@ +package cmd + +import ( + "context" + "encoding/json" + "fmt" + "github.com/spf13/cobra" + "github.com/turbot/go-kit/helpers" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/spf13/viper" + "github.com/turbot/pipe-fittings/v2/connection" + pconstants "github.com/turbot/pipe-fittings/v2/constants" + "github.com/turbot/pipe-fittings/v2/error_helpers" + pfilepaths "github.com/turbot/pipe-fittings/v2/filepaths" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/constants" + "github.com/turbot/tailpipe/internal/database" + "github.com/turbot/tailpipe/internal/filepaths" +) + +func runLegacyConnectCmd(cmd *cobra.Command, _ []string) { + var err error + var databaseFilePath string + ctx := cmd.Context() + + defer func() { + if r := recover(); r != nil { + err = helpers.ToError(r) + } + setExitCodeForConnectError(err) + displayOutputLegacy(ctx, databaseFilePath, err) + }() + + databaseFilePath, err = generateDbFile(ctx) + + // we are done - the defer block will print either the filepath (if successful) or the error (if not) +} + +func generateDbFile(ctx context.Context) (string, error) { + databaseFilePath := generateTempDBFilename(config.GlobalWorkspaceProfile.GetDataDir()) + + // cleanup the old db files if not in use + err := cleanupOldDbFiles() + if err != nil { + return "", err + } + + // first build the filters + filters, err := getFilters() + if err != nil { + return "", fmt.Errorf("error building filters: %w", err) + } + + // if there are no filters, just copy the db file + if len(filters) == 0 { + err = copyDBFile(filepaths.TailpipeLegacyDbFilePath(), databaseFilePath) + return databaseFilePath, err + } + + // Open a DuckDB connection (creates the file if it doesn't exist) + db, err := database.NewDuckDb(database.WithDbFile(databaseFilePath)) + + if err != nil { + return "", fmt.Errorf("failed to open DuckDB connection: %w", err) + } + defer db.Close() + + err = database.AddTableViews(ctx, db, filters...) + return databaseFilePath, err +} + +func displayOutputLegacy(ctx context.Context, databaseFilePath string, err error) { + switch viper.GetString(pconstants.ArgOutput) { + case pconstants.OutputFormatText: + if err == nil { + // output the filepath + fmt.Println(databaseFilePath) //nolint:forbidigo // ui output + } else { + error_helpers.ShowError(ctx, err) + } + case pconstants.OutputFormatJSON: + res := connection.TailpipeConnectResponse{ + DatabaseFilepath: databaseFilePath, + } + if err != nil { + res.Error = err.Error() + } + b, err := json.Marshal(res) + if err == nil { + fmt.Println(string(b)) //nolint:forbidigo // ui output + } else { + fmt.Printf(`{"error": "failed to marshal response: %s"}`, err) //nolint:forbidigo // ui output + } + + default: + // unexpected - cobras validation should prevent + error_helpers.ShowError(ctx, fmt.Errorf("unsupported output format %q", viper.GetString(pconstants.ArgOutput))) + } +} + +// generateTempDBFilename generates a temporary filename with a timestamp +func generateTempDBFilename(dataDir string) string { + timestamp := time.Now().Format("20060102150405") // e.g., 20241031103000 + return filepath.Join(dataDir, fmt.Sprintf("tailpipe_%s.db", timestamp)) +} + +// copyDBFile copies the source database file to the destination +func copyDBFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(dst) + if err != nil { + return err + } + defer destFile.Close() + + _, err = io.Copy(destFile, sourceFile) + return err +} + +// cleanupOldDbFiles deletes old db files(older than a day) that are not in use +func cleanupOldDbFiles() error { + baseDir := pfilepaths.GetDataDir() + log.Printf("[INFO] Cleaning up old db files in %s\n", baseDir) + cutoffTime := time.Now().Add(-constants.LegacyDbFileMaxAge) // Files older than 1 day + + // The baseDir ("$TAILPIPE_INSTALL_DIR/data") is expected to have subdirectories for different workspace + // profiles(default, work etc). Each subdirectory may contain multiple .db files. + // Example structure: + // data/ + // ├── default/ + // │ ├── tailpipe_20250115182129.db + // │ ├── tailpipe_20250115193816.db + // │ ├── tailpipe.db + // │ └── ... + // ├── work/ + // │ ├── tailpipe_20250115182129.db + // │ ├── tailpipe_20250115193816.db + // │ ├── tailpipe.db + // │ └── ... + // So we traverse all these subdirectories for each workspace and process the relevant files. + err := filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %s: %v", path, err) + } + + // skip directories and non-`.db` files + if info.IsDir() || !strings.HasSuffix(info.Name(), ".db") { + return nil + } + + // skip `tailpipe.db` file + if info.Name() == "tailpipe.db" { + return nil + } + + // only process `tailpipe_*.db` files + if !strings.HasPrefix(info.Name(), "tailpipe_") { + return nil + } + + // check if the file is older than the cutoff time + if info.ModTime().After(cutoffTime) { + log.Printf("[DEBUG] Skipping deleting file %s(%s) as it is not older than %s\n", path, info.ModTime().String(), cutoffTime) + return nil + } + + // check for a lock on the file + db, err := database.NewDuckDb(database.WithDbFile(path)) + if err != nil { + log.Printf("[INFO] Skipping deletion of file %s due to error: %v\n", path, err) + return nil + } + defer db.Close() + + // if no lock, delete the file + err = os.Remove(path) + if err != nil { + log.Printf("[INFO] Failed to delete db file %s: %v", path, err) + } else { + log.Printf("[DEBUG] Cleaned up old unused db file: %s\n", path) + } + + return nil + }) + + if err != nil { + return err + } + return nil +} diff --git a/internal/cmdconfig/cmd_hooks.go b/internal/cmdconfig/cmd_hooks.go index c07cfac7..72e1a793 100644 --- a/internal/cmdconfig/cmd_hooks.go +++ b/internal/cmdconfig/cmd_hooks.go @@ -68,6 +68,15 @@ func preRunHook(cmd *cobra.Command, args []string) error { contexthelpers.StartCancelHandler(cancel) cmd.SetContext(ctx) + // skip migration for the 'connect' command (and its subcommands) + // this is necessary since connect is used by powerpipe to connect to the tailpipe + // db, and it could take a long time to migrate if there are large number of rows and tables. + // this can cause no-feedback in powerpipe (while it is trying to connect to tailpipe db). + // to avoid this scenario we skip migration for the 'connect' command + if isConnectCommand(cmd) { + return nil + } + // migrate legacy data to DuckLake: // Prior to Tailpipe v0.7.0 we stored data as native Parquet files alongside a tailpipe.db // (DuckDB) that defined SQL views. From v0.7.0 onward Tailpipe uses DuckLake, which @@ -128,6 +137,16 @@ func setMemoryLimit() { } } +// isConnectCommand returns true if the current command or any of its parents is 'connect' +func isConnectCommand(cmd *cobra.Command) bool { + for c := cmd; c != nil; c = c.Parent() { + if c.Name() == "connect" { + return true + } + } + return false +} + // runScheduledTasks runs the task runner and returns a channel which is closed when // task run is complete // diff --git a/internal/constants/legacy_database.go b/internal/constants/legacy_database.go new file mode 100644 index 00000000..4850ec4f --- /dev/null +++ b/internal/constants/legacy_database.go @@ -0,0 +1,8 @@ +package constants + +import "time" + +const ( + TailpipeLegacyDbName = "tailpipe.db" + LegacyDbFileMaxAge = 24 * time.Hour +) diff --git a/internal/database/compaction_types.go b/internal/database/compaction_types.go index 4a2f98e1..bf052f7e 100644 --- a/internal/database/compaction_types.go +++ b/internal/database/compaction_types.go @@ -17,7 +17,7 @@ func getTimeRangesToReorder(ctx context.Context, db *DuckDb, pk *partitionKey, r if reindex { rm, err := newReorderMetadata(ctx, db, pk) if err != nil { - return nil, fmt.Errorf("failed to retiever stats for partition key: %w", err) + return nil, fmt.Errorf("failed to retrieve stats for partition key: %w", err) } // make a single time range @@ -52,7 +52,7 @@ func getTimeRangesToReorder(ctx context.Context, db *DuckDb, pk *partitionKey, r // get stats for the partition key rm, err := newReorderMetadata(ctx, db, pk) if err != nil { - return nil, fmt.Errorf("failed to retiever stats for partition key: %w", err) + return nil, fmt.Errorf("failed to retrieve stats for partition key: %w", err) } rm.unorderedRanges = unorderedRanges return rm, nil diff --git a/internal/database/legacy_tables.go b/internal/database/legacy_tables.go new file mode 100644 index 00000000..436f8760 --- /dev/null +++ b/internal/database/legacy_tables.go @@ -0,0 +1,147 @@ +package database + +import ( + "context" + "fmt" + "log/slog" + "os" + "strings" + + "github.com/turbot/pipe-fittings/v2/error_helpers" + "github.com/turbot/tailpipe-plugin-sdk/helpers" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/filepaths" +) + +// AddTableViews creates a view for each table in the data directory, applying the provided duck db filters to the view query +func AddTableViews(ctx context.Context, db *DuckDb, filters ...string) error { + tables, err := getDirNames(config.GlobalWorkspaceProfile.GetDataDir()) + if err != nil { + return fmt.Errorf("failed to get tables: %w", err) + } + + // optimisation - it seems the first time DuckDB creates a view which inspects the file system it is slow + // creating and empty view first and then dropping it seems to speed up the process + createAndDropEmptyView(ctx, db) + + //create a view for each table + for _, tableFolder := range tables { + // create a view for the table + // the tab;le folder is a hive partition folder so will have the format tp_table=table_name + table := strings.TrimPrefix(tableFolder, "tp_table=") + err = AddTableView(ctx, table, db, filters...) + if err != nil { + return err + } + } + return nil +} + +// NOTE: tactical optimisation - it seems the first time DuckDB creates a view which inspects the file system it is slow +// creating and empty view first and then dropping it seems to speed up the process +func createAndDropEmptyView(ctx context.Context, db *DuckDb) { + _ = AddTableView(ctx, "empty", db) + // drop again + _, _ = db.ExecContext(ctx, "DROP VIEW empty") +} + +func AddTableView(ctx context.Context, tableName string, db *DuckDb, filters ...string) error { + slog.Info("creating view", "table", tableName, "filters", filters) + + dataDir := config.GlobalWorkspaceProfile.GetDataDir() + // Path to the Parquet directory + // hive structure is /tp_table=/tp_partition=/tp_index=/tp_date=.parquet + parquetPath := filepaths.GetParquetFileGlobForTable(dataDir, tableName, "") + + // Step 1: Query the first Parquet file to infer columns + columns, err := getColumnNames(ctx, parquetPath, db) + if err != nil { + // if this is because no parquet files match, suppress the error + if strings.Contains(err.Error(), "IO Error: No files found that match the pattern") || error_helpers.IsCancelledError(err) { + return nil + } + return err + } + + // Step 2: Build the select clause - cast tp_index as string + // (this is necessary as duckdb infers the type from the partition column name + // if the index looks like a number, it will infer the column as an int) + var typeOverrides = map[string]string{ + "tp_partition": "varchar", + "tp_index": "varchar", + "tp_date": "date", + } + var selectClauses []string + for _, col := range columns { + wrappedCol := fmt.Sprintf(`"%s"`, col) + if overrideType, ok := typeOverrides[col]; ok { + // Apply the override with casting + selectClauses = append(selectClauses, fmt.Sprintf("cast(%s as %s) as %s", col, overrideType, wrappedCol)) + } else { + // Add the column as-is + selectClauses = append(selectClauses, wrappedCol) + } + } + selectClause := strings.Join(selectClauses, ", ") + + // Step 3: Build the where clause + filterString := "" + if len(filters) > 0 { + filterString = fmt.Sprintf(" where %s", strings.Join(filters, " and ")) + } + + // Step 4: Construct the final query + query := fmt.Sprintf( + "create or replace view %s as select %s from '%s'%s", + tableName, selectClause, parquetPath, filterString, + ) + + // Execute the query + _, err = db.ExecContext(ctx, query) + if err != nil { + slog.Warn("failed to create view", "table", tableName, "error", err) + return fmt.Errorf("failed to create view: %w", err) + } + slog.Info("created view", "table", tableName) + return nil +} + +// query the provided parquet path to get the columns +func getColumnNames(ctx context.Context, parquetPath string, db *DuckDb) ([]string, error) { + columnQuery := fmt.Sprintf("select * from '%s' limit 0", parquetPath) + rows, err := db.QueryContext(ctx, columnQuery) + if err != nil { + return nil, err + } + defer rows.Close() + + // Retrieve column names + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + // Sort column names alphabetically but with tp_ fields on the end + columns = helpers.SortColumnsAlphabetically(columns) + + return columns, nil +} + +func getDirNames(folderPath string) ([]string, error) { + var dirNames []string + + // Read the directory contents + files, err := os.ReadDir(folderPath) + if err != nil { + return nil, err + } + + // Loop through the contents and add directories to dirNames + for _, file := range files { + if file.IsDir() { + dirNames = append(dirNames, file.Name()) + } + } + + return dirNames, nil +} diff --git a/internal/filepaths/legacy_database.go b/internal/filepaths/legacy_database.go new file mode 100644 index 00000000..66f46d4b --- /dev/null +++ b/internal/filepaths/legacy_database.go @@ -0,0 +1,42 @@ +package filepaths + +import ( + "fmt" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/constants" + "path/filepath" +) + +func TailpipeLegacyDbFilePath() string { + dataDir := config.GlobalWorkspaceProfile.GetDataDir() + return filepath.Join(dataDir, constants.TailpipeLegacyDbName) +} + +//const TempParquetExtension = ".parquet.tmp" + +func GetParquetFileGlobForTable(dataDir, tableName, fileRoot string) string { + return filepath.Join(dataDir, fmt.Sprintf("tp_table=%s/*/*/*/%s*.parquet", tableName, fileRoot)) +} + +// +//func GetParquetFileGlobForPartition(dataDir, tableName, partitionName, fileRoot string) string { +// return filepath.Join(dataDir, fmt.Sprintf("tp_table=%s/tp_partition=%s/*/*/%s*.parquet", tableName, partitionName, fileRoot)) +//} +// +//func GetTempParquetFileGlobForPartition(dataDir, tableName, partitionName, fileRoot string) string { +// return filepath.Join(dataDir, fmt.Sprintf("tp_table=%s/tp_partition=%s/*/*/%s*%s", tableName, partitionName, fileRoot, TempParquetExtension)) +//} +// +//// GetTempAndInvalidParquetFileGlobForPartition returns a glob pattern for invalid and temporary parquet files for a partition +//func GetTempAndInvalidParquetFileGlobForPartition(dataDir, tableName, partitionName string) string { +// base := filepath.Join(dataDir, fmt.Sprintf("tp_table=%s/tp_partition=%s", tableName, partitionName)) +// return filepath.Join(base, "*.parquet.*") +//} +// +//func GetParquetPartitionPath(dataDir, tableName, partitionName string) string { +// return filepath.Join(dataDir, fmt.Sprintf("tp_table=%s/tp_partition=%s", tableName, partitionName)) +//} +// +//func InvalidParquetFilePath() string { +// return filepath.Join(pfilepaths.EnsureInternalDir(), "invalid_parquet.json") +//}