From 04dd1df417af082500d8b75336d9cc63abdd7087 Mon Sep 17 00:00:00 2001 From: kai Date: Sun, 19 Jan 2025 11:43:33 +0000 Subject: [PATCH 1/8] Add partition deletion command. Add update colleciton state grpx --- cmd/collect.go | 13 +++++ cmd/partition.go | 68 ++++++++++++++++++++++- internal/config/tailpipe_config.go | 1 - internal/display/partition.go | 34 +++++------- internal/parquet/delete.go | 56 +++++++++++++++++++ internal/plugin_manager/plugin_manager.go | 28 ++++++++++ 6 files changed, 177 insertions(+), 23 deletions(-) create mode 100644 internal/parquet/delete.go diff --git a/cmd/collect.go b/cmd/collect.go index 660ac17e..79871b48 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -19,6 +19,7 @@ import ( "github.com/turbot/pipe-fittings/parse" "github.com/turbot/tailpipe/internal/collector" "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/parquet" "github.com/turbot/tailpipe/internal/plugin_manager" ) @@ -106,6 +107,18 @@ func doCollect(ctx context.Context, args []string) error { // collect each partition serially var errList []error for _, partition := range partitions { + // if a from time is set, clear the partition data from that time forward + if !fromTime.IsZero() { + err := parquet.DeleteParquetFiles(partition, fromTime) + if err != nil { + slog.Warn("Failed to delete parquet files after the from time", "partition", partition.Name, "fromTime", fromTime, "error", err) + errList = append(errList, err) + continue + } + error_helpers.FailOnError(err) + } + // do the collection + err = collectPartition(ctx, partition, fromTime, pluginManager) err = collectPartition(ctx, partition, fromTime, pluginManager) if err != nil { errList = append(errList, err) diff --git a/cmd/partition.go b/cmd/partition.go index 3336915a..58025c94 100644 --- a/cmd/partition.go +++ b/cmd/partition.go @@ -4,19 +4,23 @@ import ( "context" "fmt" "strings" + "time" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/thediveo/enumflag/v2" - "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/contexthelpers" "github.com/turbot/pipe-fittings/error_helpers" + "github.com/turbot/pipe-fittings/parse" "github.com/turbot/pipe-fittings/printers" "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/display" + "github.com/turbot/tailpipe/internal/parquet" ) func partitionCmd() *cobra.Command { @@ -39,7 +43,7 @@ Examples: cmd.AddCommand(partitionListCmd()) cmd.AddCommand(partitionShowCmd()) - + cmd.AddCommand(partitionDeleteCmd()) cmd.Flags().BoolP(pconstants.ArgHelp, "h", false, "Help for partition") return cmd @@ -130,7 +134,7 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) { // Get Resources partitionName := args[0] - resource, err := display.GetPartitionResource(ctx, partitionName) + resource, err := display.GetPartitionResource(partitionName) error_helpers.FailOnError(err) printableResource := display.NewPrintableResource(resource) @@ -145,3 +149,61 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) { exitCode = pconstants.ExitCodeUnknownErrorPanic } } + +func partitionDeleteCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Args: cobra.ExactArgs(1), + Run: runPartitionDeleteCmd, + Short: "delete a partition for the specified period", + Long: `delete a partition for the specified period`, + } + + // args `from` and `to` accept: + // - ISO 8601 date (2024-01-01) + // - ISO 8601 datetime (2006-01-02T15:04:05) + // - ISO 8601 datetime with ms (2006-01-02T15:04:05.000) + // - RFC 3339 datetime with timezone (2006-01-02T15:04:05Z07:00) + // - relative time formats (T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M) + + cmdconfig.OnCmd(cmd). + AddStringFlag(pconstants.ArgFrom, "", "Specify the start time") + + return cmd +} + +func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { + //ctx := cmd.Context() + + defer func() { + if r := recover(); r != nil { + exitCode = pconstants.ExitCodeUnknownErrorPanic + error_helpers.FailOnError(helpers.ToError(r)) + } + }() + + var from time.Time + if viper.IsSet(pconstants.ArgFrom) { + fromArg := viper.GetString(pconstants.ArgFrom) + // parse the string as time.Time + // arg `from` accepts ISO 8601 date(2024-01-01), ISO 8601 datetime(2006-01-02T15:04:05), ISO 8601 datetime with ms(2006-01-02T15:04:05.000), + // RFC 3339 datetime with timezone(2006-01-02T15:04:05Z07:00) and relative time formats(T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M) + var err error + from, err = parse.ParseTime(fromArg, time.Now()) + + if err != nil { + error_helpers.FailOnError(fmt.Errorf("invalid date format for 'from': %s", fromArg)) + } + } + + partitionName := args[0] + partition, ok := config.GlobalConfig.Partitions[partitionName] + if !ok { + error_helpers.FailOnError(fmt.Errorf("partition %s found", partitionName)) + } + + // TODO CONFIRM + + err := parquet.DeleteParquetFiles(partition, from) + error_helpers.FailOnError(err) +} diff --git a/internal/config/tailpipe_config.go b/internal/config/tailpipe_config.go index d1e8fb58..e15d6354 100644 --- a/internal/config/tailpipe_config.go +++ b/internal/config/tailpipe_config.go @@ -71,5 +71,4 @@ func (c *TailpipeConfig) InitPartitions() { partition.Plugin = plugin.NewPlugin(partition.InferPluginName()) } } - } diff --git a/internal/display/partition.go b/internal/display/partition.go index 34525c6f..36759f60 100644 --- a/internal/display/partition.go +++ b/internal/display/partition.go @@ -68,27 +68,23 @@ func ListPartitionResources(ctx context.Context) ([]*PartitionResource, error) { return res, nil } -func GetPartitionResource(ctx context.Context, partitionName string) (*PartitionResource, error) { - partitions := config.GlobalConfig.Partitions - for _, p := range partitions { - name := fmt.Sprintf("%s.%s", p.TableName, p.ShortName) - if name == partitionName { - partition := &PartitionResource{ - Name: name, - Description: p.Description, - Plugin: p.Plugin.Alias, - } - - err := partition.setFileInformation() - if err != nil { - return nil, fmt.Errorf("error setting file information: %w", err) - } - - return partition, nil - } +func GetPartitionResource(partitionName string) (*PartitionResource, error) { + p, ok := config.GlobalConfig.Partitions[partitionName] + if !ok { + return nil, fmt.Errorf("no partitions found") + } + partition := &PartitionResource{ + Name: partitionName, + Description: p.Description, + Plugin: p.Plugin.Alias, + } + + err := partition.setFileInformation() + if err != nil { + return nil, fmt.Errorf("error setting file information: %w", err) } - return nil, fmt.Errorf("partition '%s' not found", partitionName) + return partition, nil } func (r *PartitionResource) setFileInformation() error { diff --git a/internal/parquet/delete.go b/internal/parquet/delete.go new file mode 100644 index 00000000..89f83ada --- /dev/null +++ b/internal/parquet/delete.go @@ -0,0 +1,56 @@ +package parquet + +import ( + "database/sql" + "fmt" + "github.com/turbot/tailpipe/internal/config" + "github.com/turbot/tailpipe/internal/database" + "os" + "time" +) + +func DeleteParquetFiles(partition *config.Partition, from time.Time) error { + dataDir := config.GlobalWorkspaceProfile.GetDataDir() + + fileGlob := database.GetParquetFileGlob(dataDir, partition.TableName, "") + + // TODO verify for SQL injection - c an we use params + query := fmt.Sprintf(` + SELECT DISTINCT regexp_replace(filename, '/[^/]+$', '') AS folder_path + FROM read_parquet('%s',filename=true) + WHERE tp_partition = '%s' + AND tp_date > '%s'; +`, fileGlob, partition.ShortName, from) + + db, err := sql.Open("duckdb", "") + if err != nil { + return fmt.Errorf("failed to open DuckDB connection: %w", err) + } + + defer db.Close() + + rows, err := db.Query(query) + if err != nil { + return fmt.Errorf("failed to query parquet folder names: %w", err) + } + defer rows.Close() + + var folders []string + // Iterate over the results + for rows.Next() { + var date string + if err := rows.Scan(&date); err != nil { + return fmt.Errorf("failed to scan parquet folder name: %w", err) + } + folders = append(folders, date) + } + + var errors = make(map[string]error) + for _, folder := range folders { + if err := os.RemoveAll(folder); err != nil { + errors[folder] = err + } + } + + return nil +} diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index f27fbd64..c7267733 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -153,6 +153,34 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition return CollectResponseFromProto(collectResponse), nil } +func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *config.Partition, fromTime time.Time, collectionTempDir string) error { + // start plugin if needed + pluginClient, err := p.getPlugin(ctx, partition.Plugin) + if err != nil { + return fmt.Errorf("error starting plugin %s: %w", partition.Plugin.Alias, err) + } + collectionStateDir := filepath.Dir(collectionTempDir) + executionID := getExecutionId() + // reuse CollectRequest for UpdateCollectionState + req := &proto.CollectRequest{ + TableName: partition.TableName, + PartitionName: partition.ShortName, + ExecutionId: executionID, + CollectionTempDir: collectionTempDir, + CollectionStateDir: collectionStateDir, + SourceData: partition.Source.ToProto(), + FromTime: timestamppb.New(fromTime), + } + + _, err = pluginClient.UpdateCollectionState(req) + if err != nil { + return fmt.Errorf("error updating collection state for plugin %s: %w", pluginClient.Name, error_helpers.TransformErrorToSteampipe(err)) + } + + // just return - the observer is responsible for waiting for completion + return err +} + // Describe starts the plugin if needed, discovers the artifacts and download them for the given partition. func (p *PluginManager) Describe(ctx context.Context, pluginName string) (*PluginDescribeResponse, error) { // build plugin ref from the name From 5d3f75ed4259c658a11a4c69cab84857d21af8a7 Mon Sep 17 00:00:00 2001 From: kai Date: Mon, 20 Jan 2025 11:32:02 +0000 Subject: [PATCH 2/8] put back UpdateCollectionStateRequest --- internal/plugin_manager/plugin_manager.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index c7267733..a9f3bf22 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -154,18 +154,23 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition } func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *config.Partition, fromTime time.Time, collectionTempDir string) error { + + // identify which plugin provides the source + sourcePlugin, err := p.determineSourcePlugin(partition) + if err != nil { + return fmt.Errorf("error determining source plugin for source %s: %w", partition.Source.Type, err) + } + // start plugin if needed - pluginClient, err := p.getPlugin(ctx, partition.Plugin) + pluginClient, err := p.getPlugin(ctx, sourcePlugin) if err != nil { return fmt.Errorf("error starting plugin %s: %w", partition.Plugin.Alias, err) } collectionStateDir := filepath.Dir(collectionTempDir) - executionID := getExecutionId() - // reuse CollectRequest for UpdateCollectionState - req := &proto.CollectRequest{ + + req := &proto.UpdateCollectionStateRequest{ TableName: partition.TableName, PartitionName: partition.ShortName, - ExecutionId: executionID, CollectionTempDir: collectionTempDir, CollectionStateDir: collectionStateDir, SourceData: partition.Source.ToProto(), From 47b92570df73e7091702f1e589cbcdf13658f86e Mon Sep 17 00:00:00 2001 From: kai Date: Mon, 20 Jan 2025 18:48:12 +0000 Subject: [PATCH 3/8] parquet deletion works --- cmd/collect.go | 8 +--- cmd/partition.go | 40 ++++++++++++++++++-- internal/collector/collector.go | 43 ++++----------------- internal/database/tables.go | 9 ++++- internal/filepaths/collection_temp_dir.go | 46 +++++++++++++++++++++++ internal/parquet/delete.go | 38 ++++++++++++------- internal/parquet/parquet_worker.go | 3 +- internal/plugin_manager/plugin_manager.go | 6 ++- 8 files changed, 130 insertions(+), 63 deletions(-) create mode 100644 internal/filepaths/collection_temp_dir.go diff --git a/cmd/collect.go b/cmd/collect.go index 79871b48..42819539 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -109,7 +109,7 @@ func doCollect(ctx context.Context, args []string) error { for _, partition := range partitions { // if a from time is set, clear the partition data from that time forward if !fromTime.IsZero() { - err := parquet.DeleteParquetFiles(partition, fromTime) + _, err := parquet.DeleteParquetFiles(partition, fromTime) if err != nil { slog.Warn("Failed to delete parquet files after the from time", "partition", partition.Name, "fromTime", fromTime, "error", err) errList = append(errList, err) @@ -140,14 +140,10 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime } defer c.Close() - // if there is a from time, add a filter to the partition - if !fromTime.IsZero() { - partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", fromTime.Format("2006-01-02T15:04:05"))) - } - if err = c.Collect(ctx, partition, fromTime); err != nil { return err } + // now wait for all collection to complete and close the collector c.WaitForCompletion(ctx) diff --git a/cmd/partition.go b/cmd/partition.go index 58025c94..50ef8d61 100644 --- a/cmd/partition.go +++ b/cmd/partition.go @@ -20,7 +20,9 @@ import ( "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" "github.com/turbot/tailpipe/internal/display" + "github.com/turbot/tailpipe/internal/filepaths" "github.com/turbot/tailpipe/internal/parquet" + "github.com/turbot/tailpipe/internal/plugin_manager" ) func partitionCmd() *cobra.Command { @@ -167,13 +169,14 @@ func partitionDeleteCmd() *cobra.Command { // - relative time formats (T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M) cmdconfig.OnCmd(cmd). - AddStringFlag(pconstants.ArgFrom, "", "Specify the start time") + AddStringFlag(pconstants.ArgFrom, "", "Specify the start time"). + AddBoolFlag(pconstants.ArgForce, false, "Force delete without confirmation") return cmd } func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { - //ctx := cmd.Context() + ctx := cmd.Context() defer func() { if r := recover(); r != nil { @@ -183,6 +186,7 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { }() var from time.Time + var fromStr string if viper.IsSet(pconstants.ArgFrom) { fromArg := viper.GetString(pconstants.ArgFrom) // parse the string as time.Time @@ -194,6 +198,8 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { if err != nil { error_helpers.FailOnError(fmt.Errorf("invalid date format for 'from': %s", fromArg)) } + + fromStr = fmt.Sprintf(" from %s", from.Format(time.RFC3339)) } partitionName := args[0] @@ -202,8 +208,34 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { error_helpers.FailOnError(fmt.Errorf("partition %s found", partitionName)) } - // TODO CONFIRM + if !viper.GetBool(pconstants.ArgForce) { + // confirm deletion + msg := fmt.Sprintf("Are you sure you want to delete partition %s%s?", partitionName, fromStr) + if !utils.UserConfirmationWithDefault(msg, false) { + fmt.Println("Deletion cancelled") //nolint:forbidigo//expected output + return + } + } + + filesDeleted, err := parquet.DeleteParquetFiles(partition, from) + error_helpers.FailOnError(err) + + if filesDeleted == 0 { + fmt.Println("No parquet files deleted") //nolint:forbidigo//expected output + } else { + fmt.Printf("Deleted %d parquet %s\n", filesDeleted, utils.Pluralize("file", filesDeleted)) //nolint:forbidigo//expected output + } + + // update collection state + // start the plugin manager + pluginManager := plugin_manager.New() + defer pluginManager.Close() + + // get the temp data dir for this collection + // - this is located in ~/.turbot/internal/collection// + collectionTempDir := filepaths.GetCollectionTempDir() - err := parquet.DeleteParquetFiles(partition, from) + // tell the plugin manager to update the collection state + err = pluginManager.UpdateCollectionState(ctx, partition, from, collectionTempDir) error_helpers.FailOnError(err) } diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 3a5f6afa..ae5a99ae 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -7,14 +7,11 @@ import ( "fmt" "log/slog" "os" - "path/filepath" - "strconv" "strings" "time" tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" - "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" "github.com/turbot/tailpipe-plugin-sdk/events" @@ -55,7 +52,7 @@ type Collector struct { func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { // get the temp data dir for this collection // - this is located in ~/.turbot/internal/collection// - collectionTempDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionTempDir := filepaths.GetCollectionTempDir() c := &Collector{ Events: make(chan *proto.Event, eventBufferSize), @@ -95,19 +92,22 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr return errors.New("collection already in progress") } - // cleanup the collection temp dir from previous runs - c.cleanupCollectionDir() - // tell plugin to start collecting collectResponse, err := c.pluginManager.Collect(ctx, partition, fromTime, c.collectionTempDir) if err != nil { return fmt.Errorf("failed to collect: %w", err) } - c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime)) + resolvedFromTime := collectResponse.FromTime + c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *resolvedFromTime)) //nolint:errcheck // handle this later go c.app.Run() // TODO: #error handling of errors + // if there is a from time, add a filter to the partition - this will be used by the parquet writer + if !resolvedFromTime.Time.IsZero() { + partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", resolvedFromTime.Time.Format("2006-01-02T15:04:05"))) + } + executionId := collectResponse.ExecutionId // add the execution to the map c.execution = newExecution(executionId, partition) @@ -368,33 +368,6 @@ func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) c.execution.pluginTiming = events.TimingCollectionFromProto(timing) } -func (c *Collector) cleanupCollectionDir() { - // list all folders alongside our collection temp dir - parent := filepath.Dir(c.collectionTempDir) - files, err := os.ReadDir(parent) - if err != nil { - slog.Warn("failed to list files in collection dir", "error", err) - return - } - for _, file := range files { - // if the file is a directory and is not our collection temp dir, remove it - if file.IsDir() && file.Name() != filepath.Base(c.collectionTempDir) { - // the folder name is the PID - check whether that pid exists - // if it doesn't, remove the folder - // Attempt to find the process - // try to parse the directory name as a pid - pid, err := strconv.ParseInt(file.Name(), 10, 32) - if err == nil { - if utils.PidExists(int(pid)) { - slog.Info(fmt.Sprintf("cleanupCollectionDir skipping directory '%s' as process with PID %d exists", file.Name(), pid)) - continue - } - } - slog.Debug("removing directory", "dir", file.Name()) - _ = os.RemoveAll(filepath.Join(parent, file.Name())) - } - } -} func (c *Collector) Compact(ctx context.Context) error { c.app.Send(AwaitingCompactionMsg{}) diff --git a/internal/database/tables.go b/internal/database/tables.go index a16cae8c..65f017d3 100644 --- a/internal/database/tables.go +++ b/internal/database/tables.go @@ -50,7 +50,7 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ... dataDir := config.GlobalWorkspaceProfile.GetDataDir() // Path to the Parquet directory // hive structure is /tp_table=/tp_partition=/tp_index=/tp_date=.parquet - parquetPath := GetParquetFileGlob(dataDir, tableName, "") + parquetPath := GetParquetFileGlobForTable(dataDir, tableName, "") // Step 1: Query the first Parquet file to infer columns columns, err := getColumnNames(ctx, parquetPath, db) @@ -102,11 +102,16 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ... return nil } -func GetParquetFileGlob(dataDir, tableName, fileRoot string) string { +func GetParquetFileGlobForTable(dataDir, tableName, fileRoot string) string { parquetPath := fmt.Sprintf("%s/tp_table=%s/*/*/*/%s*.parquet", dataDir, tableName, fileRoot) return parquetPath } +func GetParquetFileGlobForPartition(dataDir, tableName, partitionName, fileRoot string) string { + parquetPath := fmt.Sprintf("%s/tp_table=%s/tp_partition=%s/*/*/%s*.parquet", dataDir, tableName, partitionName, fileRoot) + return parquetPath +} + // query the provided parquet path to get the columns func getColumnNames(ctx context.Context, parquetPath string, db *sql.DB) ([]string, error) { columnQuery := fmt.Sprintf("SELECT * FROM '%s' LIMIT 0", parquetPath) //nolint: gosec // this is a controlled query diff --git a/internal/filepaths/collection_temp_dir.go b/internal/filepaths/collection_temp_dir.go new file mode 100644 index 00000000..011e23bb --- /dev/null +++ b/internal/filepaths/collection_temp_dir.go @@ -0,0 +1,46 @@ +package filepaths + +import ( + "fmt" + "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe/internal/config" + "log/slog" + "os" + "path/filepath" + "strconv" +) + +func GetCollectionTempDir() string { + collectionDir := config.GlobalWorkspaceProfile.GetCollectionDir() + // cleanup the collection temp dir from previous runs + cleanupCollectionTempDirs(collectionDir) + + return filepath.Join(collectionDir, fmt.Sprintf("%d", os.Getpid())) +} +func cleanupCollectionTempDirs(collectionTempDir string) { + // list all folders alongside our collection temp dir + parent := filepath.Dir(collectionTempDir) + files, err := os.ReadDir(parent) + if err != nil { + slog.Warn("failed to list files in collection dir", "error", err) + return + } + for _, file := range files { + // if the file is a directory and is not our collection temp dir, remove it + if file.IsDir() { + // the folder name is the PID - check whether that pid exists + // if it doesn't, remove the folder + // Attempt to find the process + // try to parse the directory name as a pid + pid, err := strconv.ParseInt(file.Name(), 10, 32) + if err == nil { + if utils.PidExists(int(pid)) { + slog.Info(fmt.Sprintf("cleanupCollectionTempDirs skipping directory '%s' as process with PID %d exists", file.Name(), pid)) + continue + } + } + slog.Debug("removing directory", "dir", file.Name()) + _ = os.RemoveAll(filepath.Join(parent, file.Name())) + } + } +} diff --git a/internal/parquet/delete.go b/internal/parquet/delete.go index 89f83ada..e2226f85 100644 --- a/internal/parquet/delete.go +++ b/internal/parquet/delete.go @@ -6,43 +6,53 @@ import ( "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/database" "os" + "strings" "time" ) -func DeleteParquetFiles(partition *config.Partition, from time.Time) error { +func DeleteParquetFiles(partition *config.Partition, from time.Time) (int, error) { dataDir := config.GlobalWorkspaceProfile.GetDataDir() - fileGlob := database.GetParquetFileGlob(dataDir, partition.TableName, "") + fileGlob := database.GetParquetFileGlobForPartition(dataDir, partition.TableName, partition.ShortName, "") // TODO verify for SQL injection - c an we use params query := fmt.Sprintf(` - SELECT DISTINCT regexp_replace(filename, '/[^/]+$', '') AS folder_path - FROM read_parquet('%s',filename=true) - WHERE tp_partition = '%s' - AND tp_date > '%s'; -`, fileGlob, partition.ShortName, from) + SELECT + DISTINCT '%s/tp_table=' || tp_table || '/tp_partition=' || tp_partition || '/tp_index=' || tp_index || '/tp_date=' || tp_date AS hive_path + FROM read_parquet('%s', hive_partitioning=true) + WHERE tp_partition = '%s'; +`, dataDir, fileGlob, partition.ShortName) + + if !from.IsZero() { + query += fmt.Sprintf(` + AND tp_date >= '%s'`, from) + } db, err := sql.Open("duckdb", "") if err != nil { - return fmt.Errorf("failed to open DuckDB connection: %w", err) + return 0, fmt.Errorf("failed to open DuckDB connection: %w", err) } defer db.Close() rows, err := db.Query(query) if err != nil { - return fmt.Errorf("failed to query parquet folder names: %w", err) + // is this an error because there are no files? + if strings.HasPrefix(err.Error(), "IO Error: No files found") { + return 0, nil + } + return 0, fmt.Errorf("failed to query parquet folder names: %w", err) } defer rows.Close() var folders []string // Iterate over the results for rows.Next() { - var date string - if err := rows.Scan(&date); err != nil { - return fmt.Errorf("failed to scan parquet folder name: %w", err) + var folder string + if err := rows.Scan(&folder); err != nil { + return 0, fmt.Errorf("failed to scan parquet folder name: %w", err) } - folders = append(folders, date) + folders = append(folders, folder) } var errors = make(map[string]error) @@ -52,5 +62,5 @@ func DeleteParquetFiles(partition *config.Partition, from time.Time) error { } } - return nil + return len(folders), nil } diff --git a/internal/parquet/parquet_worker.go b/internal/parquet/parquet_worker.go index 7b39ed07..5ecb8fb8 100644 --- a/internal/parquet/parquet_worker.go +++ b/internal/parquet/parquet_worker.go @@ -35,6 +35,7 @@ type parquetConversionWorker struct { // helper struct which provides unique filename roots fileRootProvider *FileRootProvider db *duckDb + fromTime time.Time } func newParquetConversionWorker(jobChan chan parquetJob, errorChan chan parquetJobError, sourceDir, destDir string, fileRootProvider *FileRootProvider) (*parquetConversionWorker, error) { @@ -174,7 +175,7 @@ func (w *parquetConversionWorker) convertFile(jsonlFilePath string, partition *c func getRowCount(db *sql.DB, destDir, fileRoot, table string) (int64, error) { // Build the query - rowCountQuery := fmt.Sprintf(`SELECT SUM(num_rows) FROM parquet_file_metadata('%s')`, database.GetParquetFileGlob(destDir, table, fileRoot)) //nolint:gosec // fixed sql query + rowCountQuery := fmt.Sprintf(`SELECT SUM(num_rows) FROM parquet_file_metadata('%s')`, database.GetParquetFileGlobForTable(destDir, table, fileRoot)) //nolint:gosec // fixed sql query // Execute the query and scan the result directly var rowCount int64 diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index a9f3bf22..741ada7e 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -168,9 +168,13 @@ func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *co } collectionStateDir := filepath.Dir(collectionTempDir) - req := &proto.UpdateCollectionStateRequest{ + executionID := getExecutionId() + + // reuse CollectRequest for UpdateCollectionState + req := &proto.CollectRequest{ TableName: partition.TableName, PartitionName: partition.ShortName, + ExecutionId: executionID, CollectionTempDir: collectionTempDir, CollectionStateDir: collectionStateDir, SourceData: partition.Source.ToProto(), From e97502b6a4fa7ce181df9a5791a1fafcb7bf7d8d Mon Sep 17 00:00:00 2001 From: kai Date: Tue, 21 Jan 2025 16:55:31 +0000 Subject: [PATCH 4/8] pass full colleciton state path not dir fix collect from - fix deletion fix partition delete - (collection state needs work) --- cmd/partition.go | 30 ++++++++++------ internal/filepaths/collection_temp_dir.go | 15 +++++--- internal/parquet/delete.go | 2 +- internal/plugin_manager/plugin_manager.go | 43 +++++++++++------------ 4 files changed, 51 insertions(+), 39 deletions(-) diff --git a/cmd/partition.go b/cmd/partition.go index 50ef8d61..8384a3d6 100644 --- a/cmd/partition.go +++ b/cmd/partition.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "log/slog" "strings" "time" @@ -211,7 +212,7 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { if !viper.GetBool(pconstants.ArgForce) { // confirm deletion msg := fmt.Sprintf("Are you sure you want to delete partition %s%s?", partitionName, fromStr) - if !utils.UserConfirmationWithDefault(msg, false) { + if !utils.UserConfirmationWithDefault(msg, true) { fmt.Println("Deletion cancelled") //nolint:forbidigo//expected output return } @@ -220,22 +221,29 @@ func runPartitionDeleteCmd(cmd *cobra.Command, args []string) { filesDeleted, err := parquet.DeleteParquetFiles(partition, from) error_helpers.FailOnError(err) - if filesDeleted == 0 { - fmt.Println("No parquet files deleted") //nolint:forbidigo//expected output - } else { - fmt.Printf("Deleted %d parquet %s\n", filesDeleted, utils.Pluralize("file", filesDeleted)) //nolint:forbidigo//expected output - } - // update collection state // start the plugin manager pluginManager := plugin_manager.New() defer pluginManager.Close() - // get the temp data dir for this collection - // - this is located in ~/.turbot/internal/collection// - collectionTempDir := filepaths.GetCollectionTempDir() + // build the collection state path + collectionStateDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionStatePath := filepaths.CollectionStatePath(collectionStateDir, partition.TableName, partition.ShortName) // tell the plugin manager to update the collection state - err = pluginManager.UpdateCollectionState(ctx, partition, from, collectionTempDir) + err = pluginManager.UpdateCollectionState(ctx, partition, from, collectionStatePath) error_helpers.FailOnError(err) + + msg := buildStatusMessage(filesDeleted, partitionName, fromStr) + fmt.Println(msg) //nolint:forbidigo//expected output + slog.Info("Partition deleted", "partition", partitionName, "from", from) +} + +func buildStatusMessage(filesDeleted int, partition string, fromStr string) interface{} { + var deletedStr string + if filesDeleted > 0 { + deletedStr = fmt.Sprintf(" (deleted %d parquet %s)", filesDeleted, utils.Pluralize("file", filesDeleted)) + } + + return fmt.Sprintf("\nDeleted partition '%s' %s%s.\n", partition, fromStr, deletedStr) } diff --git a/internal/filepaths/collection_temp_dir.go b/internal/filepaths/collection_temp_dir.go index 011e23bb..1f42fcf0 100644 --- a/internal/filepaths/collection_temp_dir.go +++ b/internal/filepaths/collection_temp_dir.go @@ -11,16 +11,16 @@ import ( ) func GetCollectionTempDir() string { + // get the collection directory for this workspace collectionDir := config.GlobalWorkspaceProfile.GetCollectionDir() // cleanup the collection temp dir from previous runs cleanupCollectionTempDirs(collectionDir) - + // add a PID directory to the collection directory return filepath.Join(collectionDir, fmt.Sprintf("%d", os.Getpid())) } + func cleanupCollectionTempDirs(collectionTempDir string) { - // list all folders alongside our collection temp dir - parent := filepath.Dir(collectionTempDir) - files, err := os.ReadDir(parent) + files, err := os.ReadDir(collectionTempDir) if err != nil { slog.Warn("failed to list files in collection dir", "error", err) return @@ -40,7 +40,12 @@ func cleanupCollectionTempDirs(collectionTempDir string) { } } slog.Debug("removing directory", "dir", file.Name()) - _ = os.RemoveAll(filepath.Join(parent, file.Name())) + _ = os.RemoveAll(filepath.Join(collectionTempDir, file.Name())) } } } + +func CollectionStatePath(collectionFolder string, table, partition string) string { + // return the path to the collection state file + return filepath.Join(collectionFolder, fmt.Sprintf("collection_state_%s_%s.json", table, partition)) +} diff --git a/internal/parquet/delete.go b/internal/parquet/delete.go index e2226f85..bc724f29 100644 --- a/internal/parquet/delete.go +++ b/internal/parquet/delete.go @@ -20,7 +20,7 @@ func DeleteParquetFiles(partition *config.Partition, from time.Time) (int, error SELECT DISTINCT '%s/tp_table=' || tp_table || '/tp_partition=' || tp_partition || '/tp_index=' || tp_index || '/tp_date=' || tp_date AS hive_path FROM read_parquet('%s', hive_partitioning=true) - WHERE tp_partition = '%s'; + WHERE tp_partition = '%s' `, dataDir, fileGlob, partition.ShortName) if !from.IsZero() { diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index 741ada7e..a2f97344 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -21,7 +21,7 @@ import ( "github.com/turbot/pipe-fittings/app_specific" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/error_helpers" - "github.com/turbot/pipe-fittings/filepaths" + pfilepaths "github.com/turbot/pipe-fittings/filepaths" "github.com/turbot/pipe-fittings/installationstate" pociinstaller "github.com/turbot/pipe-fittings/ociinstaller" pplugin "github.com/turbot/pipe-fittings/plugin" @@ -31,6 +31,7 @@ import ( "github.com/turbot/tailpipe-plugin-sdk/grpc/shared" "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/constants" + "github.com/turbot/tailpipe/internal/filepaths" "github.com/turbot/tailpipe/internal/ociinstaller" "github.com/turbot/tailpipe/internal/plugin" "google.golang.org/protobuf/types/known/timestamppb" @@ -105,18 +106,20 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition // the name of the collection state file contains the partition name // thus the collection state is shared between multiple successive collections - collectionStateDir := filepath.Dir(collectionTempDir) + // build the collection state path + collectionStateDir := config.GlobalWorkspaceProfile.GetCollectionDir() + collectionStatePath := filepaths.CollectionStatePath(collectionStateDir, partition.TableName, partition.ShortName) // tell the plugin to start the collection req := &proto.CollectRequest{ - TableName: partition.TableName, - PartitionName: partition.ShortName, - ExecutionId: executionID, - CollectionTempDir: collectionTempDir, - CollectionStateDir: collectionStateDir, - SourceData: partition.Source.ToProto(), - SourcePlugin: sourcePluginReattach, - FromTime: timestamppb.New(fromTime), + TableName: partition.TableName, + PartitionName: partition.ShortName, + ExecutionId: executionID, + CollectionTempDir: collectionTempDir, + CollectionStatePath: collectionStatePath, + SourceData: partition.Source.ToProto(), + SourcePlugin: sourcePluginReattach, + FromTime: timestamppb.New(fromTime), } if partition.Source.Connection != nil { @@ -153,8 +156,7 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition return CollectResponseFromProto(collectResponse), nil } -func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *config.Partition, fromTime time.Time, collectionTempDir string) error { - +func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *config.Partition, fromTime time.Time, collectionStatePath string) error { // identify which plugin provides the source sourcePlugin, err := p.determineSourcePlugin(partition) if err != nil { @@ -166,19 +168,17 @@ func (p *PluginManager) UpdateCollectionState(ctx context.Context, partition *co if err != nil { return fmt.Errorf("error starting plugin %s: %w", partition.Plugin.Alias, err) } - collectionStateDir := filepath.Dir(collectionTempDir) executionID := getExecutionId() // reuse CollectRequest for UpdateCollectionState req := &proto.CollectRequest{ - TableName: partition.TableName, - PartitionName: partition.ShortName, - ExecutionId: executionID, - CollectionTempDir: collectionTempDir, - CollectionStateDir: collectionStateDir, - SourceData: partition.Source.ToProto(), - FromTime: timestamppb.New(fromTime), + TableName: partition.TableName, + PartitionName: partition.ShortName, + ExecutionId: executionID, + CollectionStatePath: collectionStatePath, + SourceData: partition.Source.ToProto(), + FromTime: timestamppb.New(fromTime), } _, err = pluginClient.UpdateCollectionState(req) @@ -259,10 +259,9 @@ func (p *PluginManager) getPlugin(ctx context.Context, pluginDef *pplugin.Plugin } func (p *PluginManager) startPlugin(tp *pplugin.Plugin) (*grpc.PluginClient, error) { - // TODO #plugin search in dest folder for any .plugin, as steampipe does https://github.com/turbot/tailpipe/issues/4 pluginName := tp.Alias - pluginPath, err := filepaths.GetPluginPath(tp.Plugin, tp.Alias) + pluginPath, err := pfilepaths.GetPluginPath(tp.Plugin, tp.Alias) if err != nil { return nil, fmt.Errorf("error getting plugin path for plugin '%s': %w", tp.Alias, err) } From 997cdcf50517706149b099cb5995d733d828069c Mon Sep 17 00:00:00 2001 From: kai Date: Tue, 21 Jan 2025 17:34:34 +0000 Subject: [PATCH 5/8] rebase fix --- cmd/collect.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/collect.go b/cmd/collect.go index 42819539..58e9f6c4 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log/slog" "strings" "time" From ec804f425020744cc9423cf2e11ec81ae8f7c19a Mon Sep 17 00:00:00 2001 From: kai Date: Tue, 21 Jan 2025 17:40:39 +0000 Subject: [PATCH 6/8] rebase fix --- internal/collector/collector.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index ae5a99ae..413f4769 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -12,7 +12,6 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" - "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" "github.com/turbot/tailpipe-plugin-sdk/events" sdkfilepaths "github.com/turbot/tailpipe-plugin-sdk/filepaths" @@ -368,7 +367,6 @@ func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) c.execution.pluginTiming = events.TimingCollectionFromProto(timing) } - func (c *Collector) Compact(ctx context.Context) error { c.app.Send(AwaitingCompactionMsg{}) updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) { From 14e34cfa5db41a6eb4156804e174036f5b51494a Mon Sep 17 00:00:00 2001 From: kai Date: Tue, 21 Jan 2025 17:58:59 +0000 Subject: [PATCH 7/8] rebase fix --- cmd/collect.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/collect.go b/cmd/collect.go index 58e9f6c4..1d7229ae 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -120,7 +120,6 @@ func doCollect(ctx context.Context, args []string) error { } // do the collection err = collectPartition(ctx, partition, fromTime, pluginManager) - err = collectPartition(ctx, partition, fromTime, pluginManager) if err != nil { errList = append(errList, err) } From b4b2fd8ad78da1518c93c8a45fd1f04a2bb183c8 Mon Sep 17 00:00:00 2001 From: kai Date: Tue, 21 Jan 2025 22:29:13 +0000 Subject: [PATCH 8/8] remove unused field --- internal/parquet/parquet_worker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/parquet/parquet_worker.go b/internal/parquet/parquet_worker.go index 5ecb8fb8..e9710102 100644 --- a/internal/parquet/parquet_worker.go +++ b/internal/parquet/parquet_worker.go @@ -35,7 +35,6 @@ type parquetConversionWorker struct { // helper struct which provides unique filename roots fileRootProvider *FileRootProvider db *duckDb - fromTime time.Time } func newParquetConversionWorker(jobChan chan parquetJob, errorChan chan parquetJobError, sourceDir, destDir string, fileRootProvider *FileRootProvider) (*parquetConversionWorker, error) {