diff --git a/cmd/collect.go b/cmd/collect.go index 660ac17e..1d7229ae 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/turbot/pipe-fittings/parse" "github.com/turbot/tailpipe/internal/collector" "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/parquet" "github.com/turbot/tailpipe/internal/plugin_manager" ) @@ -106,6 +108,17 @@ func doCollect(ctx context.Context, args []string) error { // collect each partition serially var errList []error for _, partition := range partitions { + // if a from time is set, clear the partition data from that time forward + if !fromTime.IsZero() { + _, err := parquet.DeleteParquetFiles(partition, fromTime) + if err != nil { + slog.Warn("Failed to delete parquet files after the from time", "partition", partition.Name, "fromTime", fromTime, "error", err) + errList = append(errList, err) + continue + } + error_helpers.FailOnError(err) + } + // do the collection err = collectPartition(ctx, partition, fromTime, pluginManager) if err != nil { errList = append(errList, err) @@ -127,14 +140,10 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime } defer c.Close() - // if there is a from time, add a filter to the partition - if !fromTime.IsZero() { - partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", fromTime.Format("2006-01-02T15:04:05"))) - } - if err = c.Collect(ctx, partition, fromTime); err != nil { return err } + // now wait for all collection to complete and close the collector c.WaitForCompletion(ctx) diff --git a/cmd/partition.go b/cmd/partition.go index 3336915a..8384a3d6 100644 --- a/cmd/partition.go +++ b/cmd/partition.go @@ -3,20 +3,27 @@ package cmd import ( "context" "fmt" + "log/slog" "strings" + "time" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/thediveo/enumflag/v2" - "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/contexthelpers" "github.com/turbot/pipe-fittings/error_helpers" + "github.com/turbot/pipe-fittings/parse" "github.com/turbot/pipe-fittings/printers" "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/display" + "github.com/turbot/tailpipe/internal/filepaths" + "github.com/turbot/tailpipe/internal/parquet" + "github.com/turbot/tailpipe/internal/plugin_manager" ) func partitionCmd() *cobra.Command { @@ -39,7 +46,7 @@ Examples: cmd.AddCommand(partitionListCmd()) cmd.AddCommand(partitionShowCmd()) - + cmd.AddCommand(partitionDeleteCmd()) cmd.Flags().BoolP(pconstants.ArgHelp, "h", false, "Help for partition") return cmd @@ -130,7 +137,7 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) { // Get Resources partitionName := args[0] - resource, err := display.GetPartitionResource(ctx, partitionName) + resource, err := display.GetPartitionResource(partitionName) error_helpers.FailOnError(err) printableResource := display.NewPrintableResource(resource) @@ -145,3 +152,98 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) { exitCode = pconstants.ExitCodeUnknownErrorPanic } } + +func partitionDeleteCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Args: cobra.ExactArgs(1), + Run: runPartitionDeleteCmd, + Short: "delete a partition for the specified period", + Long: `delete a partition for the specified period`, + } + + // args `from` and `to` accept: + // - ISO 8601 date (2024-01-01) + // - ISO 8601 datetime (2006-01-02T15:04:05) + // - ISO 8601 datetime with ms (2006-01-02T15:04:05.000) + // - RFC 3339 datetime with timezone (2006-01-02T15:04:05Z07:00) + // - relative time formats (T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M) + + cmdconfig.OnCmd(cmd). + AddStringFlag(pconstants.ArgFrom, "", "Specify the start time"). + AddBoolFlag(pconstants.ArgForce, false, "Force delete without confirmation") + + return cmd +} + +func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + + defer func() { + if r := recover(); r != nil { + exitCode = pconstants.ExitCodeUnknownErrorPanic + error_helpers.FailOnError(helpers.ToError(r)) + } + }() + + var from time.Time + var fromStr string + if viper.IsSet(pconstants.ArgFrom) { + fromArg := viper.GetString(pconstants.ArgFrom) + // parse the string as time.Time + // arg `from` accepts ISO 8601 date(2024-01-01), ISO 8601 datetime(2006-01-02T15:04:05), ISO 8601 datetime with ms(2006-01-02T15:04:05.000), + // RFC 3339 datetime with timezone(2006-01-02T15:04:05Z07:00) and relative time formats(T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M) + var err error + from, err = parse.ParseTime(fromArg, time.Now()) + + if err != nil { + error_helpers.FailOnError(fmt.Errorf("invalid date format for 'from': %s", fromArg)) + } + + fromStr = fmt.Sprintf(" from %s", from.Format(time.RFC3339)) + } + + partitionName := args[0] + partition, ok := config.GlobalConfig.Partitions[partitionName] + if !ok { + error_helpers.FailOnError(fmt.Errorf("partition %s found", partitionName)) + } + + if !viper.GetBool(pconstants.ArgForce) { + // confirm deletion + msg := fmt.Sprintf("Are you sure you want to delete partition %s%s?", partitionName, fromStr) + if !utils.UserConfirmationWithDefault(msg, true) { + fmt.Println("Deletion cancelled") //nolint:forbidigo//expected output + return + } + } + + filesDeleted, err := parquet.DeleteParquetFiles(partition, from) + error_helpers.FailOnError(err) + + // update collection state + // start the plugin manager + pluginManager := plugin_manager.New() + defer pluginManager.Close() + + // build the collection state path + collectionStateDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionStatePath := filepaths.CollectionStatePath(collectionStateDir, partition.TableName, partition.ShortName) + + // tell the plugin manager to update the collection state + err = pluginManager.UpdateCollectionState(ctx, partition, from, collectionStatePath) + error_helpers.FailOnError(err) + + msg := buildStatusMessage(filesDeleted, partitionName, fromStr) + fmt.Println(msg) //nolint:forbidigo//expected output + slog.Info("Partition deleted", "partition", partitionName, "from", from) +} + +func buildStatusMessage(filesDeleted int, partition string, fromStr string) interface{} { + var deletedStr string + if filesDeleted > 0 { + deletedStr = fmt.Sprintf(" (deleted %d parquet %s)", filesDeleted, utils.Pluralize("file", filesDeleted)) + } + + return fmt.Sprintf("\nDeleted partition '%s' %s%s.\n", partition, fromStr, deletedStr) +} diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 3a5f6afa..413f4769 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -7,15 +7,11 @@ import ( "fmt" "log/slog" "os" - "path/filepath" - "strconv" "strings" "time" tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" - - "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" "github.com/turbot/tailpipe-plugin-sdk/events" sdkfilepaths "github.com/turbot/tailpipe-plugin-sdk/filepaths" @@ -55,7 +51,7 @@ type Collector struct { func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { // get the temp data dir for this collection // - this is located in ~/.turbot/internal/collection// - collectionTempDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionTempDir := filepaths.GetCollectionTempDir() c := &Collector{ Events: make(chan *proto.Event, eventBufferSize), @@ -95,19 +91,22 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr return errors.New("collection already in progress") } - // cleanup the collection temp dir from previous runs - c.cleanupCollectionDir() - // tell plugin to start collecting collectResponse, err := c.pluginManager.Collect(ctx, partition, fromTime, c.collectionTempDir) if err != nil { return fmt.Errorf("failed to collect: %w", err) } - c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime)) + resolvedFromTime := collectResponse.FromTime + c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *resolvedFromTime)) //nolint:errcheck // handle this later go c.app.Run() // TODO: #error handling of errors + // if there is a from time, add a filter to the partition - this will be used by the parquet writer + if !resolvedFromTime.Time.IsZero() { + partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", resolvedFromTime.Time.Format("2006-01-02T15:04:05"))) + } + executionId := collectResponse.ExecutionId // add the execution to the map c.execution = newExecution(executionId, partition) @@ -368,34 +367,6 @@ func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) c.execution.pluginTiming = events.TimingCollectionFromProto(timing) } -func (c *Collector) cleanupCollectionDir() { - // list all folders alongside our collection temp dir - parent := filepath.Dir(c.collectionTempDir) - files, err := os.ReadDir(parent) - if err != nil { - slog.Warn("failed to list files in collection dir", "error", err) - return - } - for _, file := range files { - // if the file is a directory and is not our collection temp dir, remove it - if file.IsDir() && file.Name() != filepath.Base(c.collectionTempDir) { - // the folder name is the PID - check whether that pid exists - // if it doesn't, remove the folder - // Attempt to find the process - // try to parse the directory name as a pid - pid, err := strconv.ParseInt(file.Name(), 10, 32) - if err == nil { - if utils.PidExists(int(pid)) { - slog.Info(fmt.Sprintf("cleanupCollectionDir skipping directory '%s' as process with PID %d exists", file.Name(), pid)) - continue - } - } - slog.Debug("removing directory", "dir", file.Name()) - _ = os.RemoveAll(filepath.Join(parent, file.Name())) - } - } -} - func (c *Collector) Compact(ctx context.Context) error { c.app.Send(AwaitingCompactionMsg{}) updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) { diff --git a/internal/config/tailpipe_config.go b/internal/config/tailpipe_config.go index d1e8fb58..e15d6354 100644 --- a/internal/config/tailpipe_config.go +++ b/internal/config/tailpipe_config.go @@ -71,5 +71,4 @@ func (c *TailpipeConfig) InitPartitions() { partition.Plugin = plugin.NewPlugin(partition.InferPluginName()) } } - } diff --git a/internal/database/tables.go b/internal/database/tables.go index a16cae8c..65f017d3 100644 --- a/internal/database/tables.go +++ b/internal/database/tables.go @@ -50,7 +50,7 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ... dataDir := config.GlobalWorkspaceProfile.GetDataDir() // Path to the Parquet directory // hive structure is /tp_table=/tp_partition=/tp_index=/tp_date=.parquet - parquetPath := GetParquetFileGlob(dataDir, tableName, "") + parquetPath := GetParquetFileGlobForTable(dataDir, tableName, "") // Step 1: Query the first Parquet file to infer columns columns, err := getColumnNames(ctx, parquetPath, db) @@ -102,11 +102,16 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ... return nil } -func GetParquetFileGlob(dataDir, tableName, fileRoot string) string { +func GetParquetFileGlobForTable(dataDir, tableName, fileRoot string) string { parquetPath := fmt.Sprintf("%s/tp_table=%s/*/*/*/%s*.parquet", dataDir, tableName, fileRoot) return parquetPath } +func GetParquetFileGlobForPartition(dataDir, tableName, partitionName, fileRoot string) string { + parquetPath := fmt.Sprintf("%s/tp_table=%s/tp_partition=%s/*/*/%s*.parquet", dataDir, tableName, partitionName, fileRoot) + return parquetPath +} + // query the provided parquet path to get the columns func getColumnNames(ctx context.Context, parquetPath string, db *sql.DB) ([]string, error) { columnQuery := fmt.Sprintf("SELECT * FROM '%s' LIMIT 0", parquetPath) //nolint: gosec // this is a controlled query diff --git a/internal/display/partition.go b/internal/display/partition.go index 34525c6f..36759f60 100644 --- a/internal/display/partition.go +++ b/internal/display/partition.go @@ -68,27 +68,23 @@ func ListPartitionResources(ctx context.Context) ([]*PartitionResource, error) { return res, nil } -func GetPartitionResource(ctx context.Context, partitionName string) (*PartitionResource, error) { - partitions := config.GlobalConfig.Partitions - for _, p := range partitions { - name := fmt.Sprintf("%s.%s", p.TableName, p.ShortName) - if name == partitionName { - partition := &PartitionResource{ - Name: name, - Description: p.Description, - Plugin: p.Plugin.Alias, - } - - err := partition.setFileInformation() - if err != nil { - return nil, fmt.Errorf("error setting file information: %w", err) - } - - return partition, nil - } +func GetPartitionResource(partitionName string) (*PartitionResource, error) { + p, ok := config.GlobalConfig.Partitions[partitionName] + if !ok { + return nil, fmt.Errorf("no partitions found") + } + partition := &PartitionResource{ + Name: partitionName, + Description: p.Description, + Plugin: p.Plugin.Alias, + } + + err := partition.setFileInformation() + if err != nil { + return nil, fmt.Errorf("error setting file information: %w", err) } - return nil, fmt.Errorf("partition '%s' not found", partitionName) + return partition, nil } func (r *PartitionResource) setFileInformation() error { diff --git a/internal/filepaths/collection_temp_dir.go b/internal/filepaths/collection_temp_dir.go new file mode 100644 index 00000000..1f42fcf0 --- /dev/null +++ b/internal/filepaths/collection_temp_dir.go @@ -0,0 +1,51 @@ +package filepaths + +import ( + "fmt" + "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe/internal/config" + "log/slog" + "os" + "path/filepath" + "strconv" +) + +func GetCollectionTempDir() string { + // get the collection directory for this workspace + collectionDir := config.GlobalWorkspaceProfile.GetCollectionDir() + // cleanup the collection temp dir from previous runs + cleanupCollectionTempDirs(collectionDir) + // add a PID directory to the collection directory + return filepath.Join(collectionDir, fmt.Sprintf("%d", os.Getpid())) +} + +func cleanupCollectionTempDirs(collectionTempDir string) { + files, err := os.ReadDir(collectionTempDir) + if err != nil { + slog.Warn("failed to list files in collection dir", "error", err) + return + } + for _, file := range files { + // if the file is a directory and is not our collection temp dir, remove it + if file.IsDir() { + // the folder name is the PID - check whether that pid exists + // if it doesn't, remove the folder + // Attempt to find the process + // try to parse the directory name as a pid + pid, err := strconv.ParseInt(file.Name(), 10, 32) + if err == nil { + if utils.PidExists(int(pid)) { + slog.Info(fmt.Sprintf("cleanupCollectionTempDirs skipping directory '%s' as process with PID %d exists", file.Name(), pid)) + continue + } + } + slog.Debug("removing directory", "dir", file.Name()) + _ = os.RemoveAll(filepath.Join(collectionTempDir, file.Name())) + } + } +} + +func CollectionStatePath(collectionFolder string, table, partition string) string { + // return the path to the collection state file + return filepath.Join(collectionFolder, fmt.Sprintf("collection_state_%s_%s.json", table, partition)) +} diff --git a/internal/parquet/delete.go b/internal/parquet/delete.go new file mode 100644 index 00000000..bc724f29 --- /dev/null +++ b/internal/parquet/delete.go @@ -0,0 +1,66 @@ +package parquet + +import ( + "database/sql" + "fmt" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/database" + "os" + "strings" + "time" +) + +func DeleteParquetFiles(partition *config.Partition, from time.Time) (int, error) { + dataDir := config.GlobalWorkspaceProfile.GetDataDir() + + fileGlob := database.GetParquetFileGlobForPartition(dataDir, partition.TableName, partition.ShortName, "") + + // TODO verify for SQL injection - c an we use params + query := fmt.Sprintf(` + SELECT + DISTINCT '%s/tp_table=' || tp_table || '/tp_partition=' || tp_partition || '/tp_index=' || tp_index || '/tp_date=' || tp_date AS hive_path + FROM read_parquet('%s', hive_partitioning=true) + WHERE tp_partition = '%s' +`, dataDir, fileGlob, partition.ShortName) + + if !from.IsZero() { + query += fmt.Sprintf(` + AND tp_date >= '%s'`, from) + } + + db, err := sql.Open("duckdb", "") + if err != nil { + return 0, fmt.Errorf("failed to open DuckDB connection: %w", err) + } + + defer db.Close() + + rows, err := db.Query(query) + if err != nil { + // is this an error because there are no files? + if strings.HasPrefix(err.Error(), "IO Error: No files found") { + return 0, nil + } + return 0, fmt.Errorf("failed to query parquet folder names: %w", err) + } + defer rows.Close() + + var folders []string + // Iterate over the results + for rows.Next() { + var folder string + if err := rows.Scan(&folder); err != nil { + return 0, fmt.Errorf("failed to scan parquet folder name: %w", err) + } + folders = append(folders, folder) + } + + var errors = make(map[string]error) + for _, folder := range folders { + if err := os.RemoveAll(folder); err != nil { + errors[folder] = err + } + } + + return len(folders), nil +} diff --git a/internal/parquet/parquet_worker.go b/internal/parquet/parquet_worker.go index 7b39ed07..e9710102 100644 --- a/internal/parquet/parquet_worker.go +++ b/internal/parquet/parquet_worker.go @@ -174,7 +174,7 @@ func (w *parquetConversionWorker) convertFile(jsonlFilePath string, partition *c func getRowCount(db *sql.DB, destDir, fileRoot, table string) (int64, error) { // Build the query - rowCountQuery := fmt.Sprintf(`SELECT SUM(num_rows) FROM parquet_file_metadata('%s')`, database.GetParquetFileGlob(destDir, table, fileRoot)) //nolint:gosec // fixed sql query + rowCountQuery := fmt.Sprintf(`SELECT SUM(num_rows) FROM parquet_file_metadata('%s')`, database.GetParquetFileGlobForTable(destDir, table, fileRoot)) //nolint:gosec // fixed sql query // Execute the query and scan the result directly var rowCount int64 diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index f27fbd64..a2f97344 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -21,7 +21,7 @@ import ( "github.com/turbot/pipe-fittings/app_specific" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/error_helpers" - "github.com/turbot/pipe-fittings/filepaths" + pfilepaths "github.com/turbot/pipe-fittings/filepaths" "github.com/turbot/pipe-fittings/installationstate" pociinstaller "github.com/turbot/pipe-fittings/ociinstaller" pplugin "github.com/turbot/pipe-fittings/plugin" @@ -31,6 +31,7 @@ import ( "github.com/turbot/tailpipe-plugin-sdk/grpc/shared" "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" + "github.com/turbot/tailpipe/internal/filepaths" "github.com/turbot/tailpipe/internal/ociinstaller" "github.com/turbot/tailpipe/internal/plugin" "google.golang.org/protobuf/types/known/timestamppb" @@ -105,18 +106,20 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition // the name of the collection state file contains the partition name // thus the collection state is shared between multiple successive collections - collectionStateDir := filepath.Dir(collectionTempDir) + // build the collection state path + collectionStateDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionStatePath := filepaths.CollectionStatePath(collectionStateDir, partition.TableName, partition.ShortName) // tell the plugin to start the collection req := &proto.CollectRequest{ - TableName: partition.TableName, - PartitionName: partition.ShortName, - ExecutionId: executionID, - CollectionTempDir: collectionTempDir, - CollectionStateDir: collectionStateDir, - SourceData: partition.Source.ToProto(), - SourcePlugin: sourcePluginReattach, - FromTime: timestamppb.New(fromTime), + TableName: partition.TableName, + PartitionName: partition.ShortName, + ExecutionId: executionID, + CollectionTempDir: collectionTempDir, + CollectionStatePath: collectionStatePath, + SourceData: partition.Source.ToProto(), + SourcePlugin: sourcePluginReattach, + FromTime: timestamppb.New(fromTime), } if partition.Source.Connection != nil { @@ -153,6 +156,40 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition return CollectResponseFromProto(collectResponse), nil } +func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *config.Partition, fromTime time.Time, collectionStatePath string) error { + // identify which plugin provides the source + sourcePlugin, err := p.determineSourcePlugin(partition) + if err != nil { + return fmt.Errorf("error determining source plugin for source %s: %w", partition.Source.Type, err) + } + + // start plugin if needed + pluginClient, err := p.getPlugin(ctx, sourcePlugin) + if err != nil { + return fmt.Errorf("error starting plugin %s: %w", partition.Plugin.Alias, err) + } + + executionID := getExecutionId() + + // reuse CollectRequest for UpdateCollectionState + req := &proto.CollectRequest{ + TableName: partition.TableName, + PartitionName: partition.ShortName, + ExecutionId: executionID, + CollectionStatePath: collectionStatePath, + SourceData: partition.Source.ToProto(), + FromTime: timestamppb.New(fromTime), + } + + _, err = pluginClient.UpdateCollectionState(req) + if err != nil { + return fmt.Errorf("error updating collection state for plugin %s: %w", pluginClient.Name, error_helpers.TransformErrorToSteampipe(err)) + } + + // just return - the observer is responsible for waiting for completion + return err +} + // Describe starts the plugin if needed, discovers the artifacts and download them for the given partition. func (p *PluginManager) Describe(ctx context.Context, pluginName string) (*PluginDescribeResponse, error) { // build plugin ref from the name @@ -222,10 +259,9 @@ func (p *PluginManager) getPlugin(ctx context.Context, pluginDef *pplugin.Plugin } func (p *PluginManager) startPlugin(tp *pplugin.Plugin) (*grpc.PluginClient, error) { - // TODO #plugin search in dest folder for any .plugin, as steampipe does https://github.com/turbot/tailpipe/issues/4 pluginName := tp.Alias - pluginPath, err := filepaths.GetPluginPath(tp.Plugin, tp.Alias) + pluginPath, err := pfilepaths.GetPluginPath(tp.Plugin, tp.Alias) if err != nil { return nil, fmt.Errorf("error getting plugin path for plugin '%s': %w", tp.Alias, err) }