这是indexloc提供的服务,不要输入任何密码
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/turbot/tailpipe/internal/filepaths"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/thediveo/enumflag/v2"
Expand All @@ -23,7 +26,7 @@ import (
"github.com/turbot/tailpipe/internal/config"
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/database"
error_helpers "github.com/turbot/tailpipe/internal/error_helpers"
"github.com/turbot/tailpipe/internal/error_helpers"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -97,6 +100,17 @@ func runConnectCmd(cmd *cobra.Command, _ []string) {
// use the signal-aware/cancelable context created upstream in preRunHook
ctx := cmd.Context()

// first - determine whether we still have unmigrated data - if so, run the legacy connect command
// (NOTE: if a migration is in progress, fail with an error)
// TODO - remove this check in a future release
legacyDb, err := checkForLegacyDb()
error_helpers.FailOnError(err)
if legacyDb {
slog.Warn("Legacy data detected - running legacy connect command. Data will be migrated to ducklake when running any command except connect ")
runLegacyConnectCmd(cmd, nil)
return
}

defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
Expand Down Expand Up @@ -124,6 +138,22 @@ func runConnectCmd(cmd *cobra.Command, _ []string) {

}

func checkForLegacyDb() (bool, error) {
legacyDbPath := filepaths.TailpipeLegacyDbFilePath()
_, err := os.Stat(legacyDbPath)
legacyDbExists := err == nil

// if we are in the middle of a migration, return an error
migratingDir := config.GlobalWorkspaceProfile.GetMigratingDir()
migratingDbPath := filepath.Join(migratingDir, "tailpipe.db")
_, err = os.Stat(migratingDbPath)
migrationInProgress := err == nil
if migrationInProgress {
return false, fmt.Errorf("a data migration is in progress - run any tailpipe command apart from 'connect' to complete migration")
}
return legacyDbExists, nil
}

func generateInitFile(ctx context.Context) (string, error) {
// cleanup the old db files if not in use
err := cleanupOldInitFiles()
Expand Down
201 changes: 201 additions & 0 deletions cmd/connect_legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"github.com/spf13/cobra"
"github.com/turbot/go-kit/helpers"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"

"github.com/spf13/viper"
"github.com/turbot/pipe-fittings/v2/connection"
pconstants "github.com/turbot/pipe-fittings/v2/constants"
"github.com/turbot/pipe-fittings/v2/error_helpers"
pfilepaths "github.com/turbot/pipe-fittings/v2/filepaths"
"github.com/turbot/tailpipe/internal/config"
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/database"
"github.com/turbot/tailpipe/internal/filepaths"
)

func runLegacyConnectCmd(cmd *cobra.Command, _ []string) {
var err error
var databaseFilePath string
ctx := cmd.Context()

defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
setExitCodeForConnectError(err)
displayOutputLegacy(ctx, databaseFilePath, err)
}()

databaseFilePath, err = generateDbFile(ctx)

// we are done - the defer block will print either the filepath (if successful) or the error (if not)
}

func generateDbFile(ctx context.Context) (string, error) {
databaseFilePath := generateTempDBFilename(config.GlobalWorkspaceProfile.GetDataDir())

// cleanup the old db files if not in use
err := cleanupOldDbFiles()
if err != nil {
return "", err
}

// first build the filters
filters, err := getFilters()
if err != nil {
return "", fmt.Errorf("error building filters: %w", err)
}

// if there are no filters, just copy the db file
if len(filters) == 0 {
err = copyDBFile(filepaths.TailpipeLegacyDbFilePath(), databaseFilePath)
return databaseFilePath, err
}

// Open a DuckDB connection (creates the file if it doesn't exist)
db, err := database.NewDuckDb(database.WithDbFile(databaseFilePath))

if err != nil {
return "", fmt.Errorf("failed to open DuckDB connection: %w", err)
}
defer db.Close()

err = database.AddTableViews(ctx, db, filters...)
return databaseFilePath, err
}

func displayOutputLegacy(ctx context.Context, databaseFilePath string, err error) {
switch viper.GetString(pconstants.ArgOutput) {
case pconstants.OutputFormatText:
if err == nil {
// output the filepath
fmt.Println(databaseFilePath) //nolint:forbidigo // ui output
} else {
error_helpers.ShowError(ctx, err)
}
case pconstants.OutputFormatJSON:
res := connection.TailpipeConnectResponse{
DatabaseFilepath: databaseFilePath,
}
if err != nil {
res.Error = err.Error()
}
b, err := json.Marshal(res)
if err == nil {
fmt.Println(string(b)) //nolint:forbidigo // ui output
} else {
fmt.Printf(`{"error": "failed to marshal response: %s"}`, err) //nolint:forbidigo // ui output
}

default:
// unexpected - cobras validation should prevent
error_helpers.ShowError(ctx, fmt.Errorf("unsupported output format %q", viper.GetString(pconstants.ArgOutput)))
}
}

// generateTempDBFilename generates a temporary filename with a timestamp
func generateTempDBFilename(dataDir string) string {
timestamp := time.Now().Format("20060102150405") // e.g., 20241031103000
return filepath.Join(dataDir, fmt.Sprintf("tailpipe_%s.db", timestamp))
}

// copyDBFile copies the source database file to the destination
func copyDBFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()

destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()

_, err = io.Copy(destFile, sourceFile)
return err
}

// cleanupOldDbFiles deletes old db files(older than a day) that are not in use
func cleanupOldDbFiles() error {
baseDir := pfilepaths.GetDataDir()
log.Printf("[INFO] Cleaning up old db files in %s\n", baseDir)
cutoffTime := time.Now().Add(-constants.LegacyDbFileMaxAge) // Files older than 1 day

// The baseDir ("$TAILPIPE_INSTALL_DIR/data") is expected to have subdirectories for different workspace
// profiles(default, work etc). Each subdirectory may contain multiple .db files.
// Example structure:
// data/
// ├── default/
// │ ├── tailpipe_20250115182129.db
// │ ├── tailpipe_20250115193816.db
// │ ├── tailpipe.db
// │ └── ...
// ├── work/
// │ ├── tailpipe_20250115182129.db
// │ ├── tailpipe_20250115193816.db
// │ ├── tailpipe.db
// │ └── ...
// So we traverse all these subdirectories for each workspace and process the relevant files.
err := filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %s: %v", path, err)
}

// skip directories and non-`.db` files
if info.IsDir() || !strings.HasSuffix(info.Name(), ".db") {
return nil
}

// skip `tailpipe.db` file
if info.Name() == "tailpipe.db" {
return nil
}

// only process `tailpipe_*.db` files
if !strings.HasPrefix(info.Name(), "tailpipe_") {
return nil
}

// check if the file is older than the cutoff time
if info.ModTime().After(cutoffTime) {
log.Printf("[DEBUG] Skipping deleting file %s(%s) as it is not older than %s\n", path, info.ModTime().String(), cutoffTime)
return nil
}

// check for a lock on the file
db, err := database.NewDuckDb(database.WithDbFile(path))
if err != nil {
log.Printf("[INFO] Skipping deletion of file %s due to error: %v\n", path, err)
return nil
}
defer db.Close()

// if no lock, delete the file
err = os.Remove(path)
if err != nil {
log.Printf("[INFO] Failed to delete db file %s: %v", path, err)
} else {
log.Printf("[DEBUG] Cleaned up old unused db file: %s\n", path)
}

return nil
})

if err != nil {
return err
}
return nil
}
19 changes: 19 additions & 0 deletions internal/cmdconfig/cmd_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func preRunHook(cmd *cobra.Command, args []string) error {
contexthelpers.StartCancelHandler(cancel)
cmd.SetContext(ctx)

// skip migration for the 'connect' command (and its subcommands)
// this is necessary since connect is used by powerpipe to connect to the tailpipe
// db, and it could take a long time to migrate if there are large number of rows and tables.
// this can cause no-feedback in powerpipe (while it is trying to connect to tailpipe db).
// to avoid this scenario we skip migration for the 'connect' command
if isConnectCommand(cmd) {
return nil
}

// migrate legacy data to DuckLake:
// Prior to Tailpipe v0.7.0 we stored data as native Parquet files alongside a tailpipe.db
// (DuckDB) that defined SQL views. From v0.7.0 onward Tailpipe uses DuckLake, which
Expand Down Expand Up @@ -128,6 +137,16 @@ func setMemoryLimit() {
}
}

// isConnectCommand returns true if the current command or any of its parents is 'connect'
func isConnectCommand(cmd *cobra.Command) bool {
for c := cmd; c != nil; c = c.Parent() {
if c.Name() == "connect" {
return true
}
}
return false
}

// runScheduledTasks runs the task runner and returns a channel which is closed when
// task run is complete
//
Expand Down
8 changes: 8 additions & 0 deletions internal/constants/legacy_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package constants

import "time"

const (
TailpipeLegacyDbName = "tailpipe.db"
LegacyDbFileMaxAge = 24 * time.Hour
)
4 changes: 2 additions & 2 deletions internal/database/compaction_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func getTimeRangesToReorder(ctx context.Context, db *DuckDb, pk *partitionKey, r
if reindex {
rm, err := newReorderMetadata(ctx, db, pk)
if err != nil {
return nil, fmt.Errorf("failed to retiever stats for partition key: %w", err)
return nil, fmt.Errorf("failed to retrieve stats for partition key: %w", err)
}

// make a single time range
Expand Down Expand Up @@ -52,7 +52,7 @@ func getTimeRangesToReorder(ctx context.Context, db *DuckDb, pk *partitionKey, r
// get stats for the partition key
rm, err := newReorderMetadata(ctx, db, pk)
if err != nil {
return nil, fmt.Errorf("failed to retiever stats for partition key: %w", err)
return nil, fmt.Errorf("failed to retrieve stats for partition key: %w", err)
}
rm.unorderedRanges = unorderedRanges
return rm, nil
Expand Down
Loading