这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ func doCollect(ctx context.Context, cancel context.CancelFunc, args []string) er
partitionNames = append(partitionNames, partition.FullName)
}
slog.Info("Starting collection", "partition(s)", partitionNames, "from", fromTime, "to", toTime)

// Create backup of metadata database before starting collection
if err := database.BackupDucklakeMetadata(); err != nil {
slog.Warn("Failed to backup metadata database", "error", err)
// Continue with collection - backup failure shouldn't block the operation
}

// now we have the partitions, we can start collecting

// start the plugin manager
Expand Down
6 changes: 6 additions & 0 deletions cmd/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func runCompactCmd(cmd *cobra.Command, args []string) {
patterns, err := database.GetPartitionPatternsForArgs(maps.Keys(config.GlobalConfig.Partitions), args...)
error_helpers.FailOnErrorWithMessage(err, "failed to get partition patterns")

// Create backup of metadata database before starting compaction
if err := database.BackupDucklakeMetadata(); err != nil {
slog.Warn("Failed to backup metadata database", "error", err)
// Continue with compaction - backup failure shouldn't block the operation
}

// do the compaction

status, err := doCompaction(ctx, db, patterns)
Expand Down
7 changes: 7 additions & 0 deletions cmd/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,13 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) {
error_helpers.FailOnError(err)
defer db.Close()

// Create backup before deletion
slog.Info("Creating backup before partition deletion", "partition", partitionName)
if err := database.BackupDucklakeMetadata(); err != nil {
slog.Warn("Failed to create backup before partition deletion", "error", err)
// Continue with deletion - backup failure should not prevent deletion
}

// show spinner while deleting the partition
spinner := statushooks.NewStatusSpinnerHook()
spinner.SetStatus(fmt.Sprintf("Deleting partition %s", partition.TableName))
Expand Down
138 changes: 138 additions & 0 deletions internal/database/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package database

import (
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/turbot/pipe-fittings/v2/utils"
"github.com/turbot/tailpipe/internal/config"
)

// BackupDucklakeMetadata creates a timestamped backup of the DuckLake metadata database.
// It creates backup files with format: metadata.sqlite.backup.YYYYMMDDHHMMSS
// and also backs up the WAL file if it exists:
// - metadata.sqlite-wal.backup.YYYYMMDDHHMMSS
// It removes any existing backup files to maintain only the most recent backup.
//
// The backup is created in the same directory as the original database file.
// If the database file doesn't exist, no backup is created and no error is returned.
//
// Returns an error if the backup operation fails.
func BackupDucklakeMetadata() error {
// Get the path to the DuckLake metadata database
dbPath := config.GlobalWorkspaceProfile.GetDucklakeDbPath()

// Check if the database file exists
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
slog.Debug("DuckLake metadata database does not exist, skipping backup", "path", dbPath)
return nil
} else if err != nil {
return fmt.Errorf("failed to check if database exists: %w", err)
}

// Generate timestamp for backup filename
timestamp := time.Now().Format("20060102150405") // YYYYMMDDHHMMSS format

// Create backup filenames
dbDir := filepath.Dir(dbPath)
mainBackupFilename := fmt.Sprintf("metadata.sqlite.backup.%s", timestamp)
mainBackupPath := filepath.Join(dbDir, mainBackupFilename)

// Also prepare paths for WAL file
walPath := dbPath + "-wal"
walBackupFilename := fmt.Sprintf("metadata.sqlite-wal.backup.%s", timestamp)
walBackupPath := filepath.Join(dbDir, walBackupFilename)

slog.Info("Creating backup of DuckLake metadata database", "source", dbPath, "backup", mainBackupPath)

// Create the main database backup first
if err := utils.CopyFile(dbPath, mainBackupPath); err != nil {
return fmt.Errorf("failed to create main database backup: %w", err)
}

// Backup WAL file if it exists
if _, err := os.Stat(walPath); err == nil {
if err := utils.CopyFile(walPath, walBackupPath); err != nil {
slog.Warn("Failed to backup WAL file", "source", walPath, "error", err)
// Continue - WAL backup failure is not critical
} else {
slog.Debug("Successfully backed up WAL file", "backup", walBackupPath)
}
}

slog.Info("Successfully created backup of DuckLake metadata database", "backup", mainBackupPath)

// Clean up old backup files after successfully creating the new one
if err := cleanupOldBackups(dbDir, timestamp); err != nil {
slog.Warn("Failed to clean up old backup files", "error", err)
// Don't return error - the backup was successful, cleanup is just housekeeping
}
return nil
}

// isBackupFile checks if a filename matches any of the backup patterns
func isBackupFile(filename string) bool {
backupPrefixes := []string{
"metadata.sqlite.backup.",
"metadata.sqlite-wal.backup.",
}

for _, prefix := range backupPrefixes {
if strings.HasPrefix(filename, prefix) {
return true
}
}
return false
}

// shouldRemoveBackup determines if a backup file should be removed
func shouldRemoveBackup(filename, excludeTimestamp string) bool {
if !isBackupFile(filename) {
return false
}
// Don't remove files with the current timestamp
return !strings.HasSuffix(filename, "."+excludeTimestamp)
}

// cleanupOldBackups removes all existing backup files in the specified directory,
// except for the newly created backup files with the given timestamp.
// Backup files are identified by the patterns:
// - metadata.sqlite.backup.*
// - metadata.sqlite-wal.backup.*
func cleanupOldBackups(dir, excludeTimestamp string) error {
entries, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("failed to read directory: %w", err)
}

var deletedCount int
for _, entry := range entries {
if entry.IsDir() {
continue
}

filename := entry.Name()
if !shouldRemoveBackup(filename, excludeTimestamp) {
continue
}

backupPath := filepath.Join(dir, filename)
if err := os.Remove(backupPath); err != nil {
slog.Warn("Failed to remove old backup file", "file", backupPath, "error", err)
// Continue removing other files even if one fails
} else {
slog.Debug("Removed old backup file", "file", backupPath)
deletedCount++
}
}

if deletedCount > 0 {
slog.Debug("Cleaned up old backup files", "count", deletedCount)
}

return nil
}