这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions cmd/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/turbot/pipe-fittings/parse"
"github.com/turbot/tailpipe/internal/collector"
"github.com/turbot/tailpipe/internal/config"
"github.com/turbot/tailpipe/internal/parquet"
"github.com/turbot/tailpipe/internal/plugin_manager"
)

Expand Down Expand Up @@ -106,6 +108,17 @@ func doCollect(ctx context.Context, args []string) error {
// collect each partition serially
var errList []error
for _, partition := range partitions {
// if a from time is set, clear the partition data from that time forward
if !fromTime.IsZero() {
_, err := parquet.DeleteParquetFiles(partition, fromTime)
if err != nil {
slog.Warn("Failed to delete parquet files after the from time", "partition", partition.Name, "fromTime", fromTime, "error", err)
errList = append(errList, err)
continue
}
error_helpers.FailOnError(err)
}
// do the collection
err = collectPartition(ctx, partition, fromTime, pluginManager)
if err != nil {
errList = append(errList, err)
Expand All @@ -127,14 +140,10 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime
}
defer c.Close()

// if there is a from time, add a filter to the partition
if !fromTime.IsZero() {
partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", fromTime.Format("2006-01-02T15:04:05")))
}

if err = c.Collect(ctx, partition, fromTime); err != nil {
return err
}

// now wait for all collection to complete and close the collector
c.WaitForCompletion(ctx)

Expand Down
108 changes: 105 additions & 3 deletions cmd/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@ package cmd
import (
"context"
"fmt"
"log/slog"
"strings"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/thediveo/enumflag/v2"

"github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/cmdconfig"
pconstants "github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/contexthelpers"
"github.com/turbot/pipe-fittings/error_helpers"
"github.com/turbot/pipe-fittings/parse"
"github.com/turbot/pipe-fittings/printers"
"github.com/turbot/pipe-fittings/utils"
"github.com/turbot/tailpipe/internal/config"
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/display"
"github.com/turbot/tailpipe/internal/filepaths"
"github.com/turbot/tailpipe/internal/parquet"
"github.com/turbot/tailpipe/internal/plugin_manager"
)

func partitionCmd() *cobra.Command {
Expand All @@ -39,7 +46,7 @@ Examples:

cmd.AddCommand(partitionListCmd())
cmd.AddCommand(partitionShowCmd())

cmd.AddCommand(partitionDeleteCmd())
cmd.Flags().BoolP(pconstants.ArgHelp, "h", false, "Help for partition")

return cmd
Expand Down Expand Up @@ -130,7 +137,7 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) {

// Get Resources
partitionName := args[0]
resource, err := display.GetPartitionResource(ctx, partitionName)
resource, err := display.GetPartitionResource(partitionName)
error_helpers.FailOnError(err)
printableResource := display.NewPrintableResource(resource)

Expand All @@ -145,3 +152,98 @@ func runPartitionShowCmd(cmd *cobra.Command, args []string) {
exitCode = pconstants.ExitCodeUnknownErrorPanic
}
}

func partitionDeleteCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "delete ",
Args: cobra.ExactArgs(1),
Run: runPartitionDeleteCmd,
Short: "delete a partition for the specified period",
Long: `delete a partition for the specified period`,
}

// args `from` and `to` accept:
// - ISO 8601 date (2024-01-01)
// - ISO 8601 datetime (2006-01-02T15:04:05)
// - ISO 8601 datetime with ms (2006-01-02T15:04:05.000)
// - RFC 3339 datetime with timezone (2006-01-02T15:04:05Z07:00)
// - relative time formats (T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M)

cmdconfig.OnCmd(cmd).
AddStringFlag(pconstants.ArgFrom, "", "Specify the start time").
AddBoolFlag(pconstants.ArgForce, false, "Force delete without confirmation")

return cmd
}

func runPartitionDeleteCmd(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

defer func() {
if r := recover(); r != nil {
exitCode = pconstants.ExitCodeUnknownErrorPanic
error_helpers.FailOnError(helpers.ToError(r))
}
}()

var from time.Time
var fromStr string
if viper.IsSet(pconstants.ArgFrom) {
fromArg := viper.GetString(pconstants.ArgFrom)
// parse the string as time.Time
// arg `from` accepts ISO 8601 date(2024-01-01), ISO 8601 datetime(2006-01-02T15:04:05), ISO 8601 datetime with ms(2006-01-02T15:04:05.000),
// RFC 3339 datetime with timezone(2006-01-02T15:04:05Z07:00) and relative time formats(T-2Y, T-10m, T-10W, T-180d, T-9H, T-10M)
var err error
from, err = parse.ParseTime(fromArg, time.Now())

if err != nil {
error_helpers.FailOnError(fmt.Errorf("invalid date format for 'from': %s", fromArg))
}

fromStr = fmt.Sprintf(" from %s", from.Format(time.RFC3339))
}

partitionName := args[0]
partition, ok := config.GlobalConfig.Partitions[partitionName]
if !ok {
error_helpers.FailOnError(fmt.Errorf("partition %s found", partitionName))
}

if !viper.GetBool(pconstants.ArgForce) {
// confirm deletion
msg := fmt.Sprintf("Are you sure you want to delete partition %s%s?", partitionName, fromStr)
if !utils.UserConfirmationWithDefault(msg, true) {
fmt.Println("Deletion cancelled") //nolint:forbidigo//expected output
return
}
}

filesDeleted, err := parquet.DeleteParquetFiles(partition, from)
error_helpers.FailOnError(err)

// update collection state
// start the plugin manager
pluginManager := plugin_manager.New()
defer pluginManager.Close()

// build the collection state path
collectionStateDir := config.GlobalWorkspaceProfile.GetCollectionDir()
collectionStatePath := filepaths.CollectionStatePath(collectionStateDir, partition.TableName, partition.ShortName)

// tell the plugin manager to update the collection state
err = pluginManager.UpdateCollectionState(ctx, partition, from, collectionStatePath)
error_helpers.FailOnError(err)

msg := buildStatusMessage(filesDeleted, partitionName, fromStr)
fmt.Println(msg) //nolint:forbidigo//expected output
slog.Info("Partition deleted", "partition", partitionName, "from", from)
}

func buildStatusMessage(filesDeleted int, partition string, fromStr string) interface{} {
var deletedStr string
if filesDeleted > 0 {
deletedStr = fmt.Sprintf(" (deleted %d parquet %s)", filesDeleted, utils.Pluralize("file", filesDeleted))
}

return fmt.Sprintf("\nDeleted partition '%s' %s%s.\n", partition, fromStr, deletedStr)
}
45 changes: 8 additions & 37 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@ import (
"fmt"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"time"

tea "github.com/charmbracelet/bubbletea"
"github.com/sethvargo/go-retry"

"github.com/turbot/pipe-fittings/utils"
"github.com/turbot/tailpipe-plugin-sdk/constants"
"github.com/turbot/tailpipe-plugin-sdk/events"
sdkfilepaths "github.com/turbot/tailpipe-plugin-sdk/filepaths"
Expand Down Expand Up @@ -55,7 +51,7 @@ type Collector struct {
func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) {
// get the temp data dir for this collection
// - this is located in ~/.turbot/internal/collection/<profile_name>/<pid>
collectionTempDir := config.GlobalWorkspaceProfile.GetCollectionDir()
collectionTempDir := filepaths.GetCollectionTempDir()

c := &Collector{
Events: make(chan *proto.Event, eventBufferSize),
Expand Down Expand Up @@ -95,19 +91,22 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr
return errors.New("collection already in progress")
}

// cleanup the collection temp dir from previous runs
c.cleanupCollectionDir()

// tell plugin to start collecting
collectResponse, err := c.pluginManager.Collect(ctx, partition, fromTime, c.collectionTempDir)
if err != nil {
return fmt.Errorf("failed to collect: %w", err)
}

c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime))
resolvedFromTime := collectResponse.FromTime
c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *resolvedFromTime))
//nolint:errcheck // handle this later
go c.app.Run() // TODO: #error handling of errors

// if there is a from time, add a filter to the partition - this will be used by the parquet writer
if !resolvedFromTime.Time.IsZero() {
partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", resolvedFromTime.Time.Format("2006-01-02T15:04:05")))
}

executionId := collectResponse.ExecutionId
// add the execution to the map
c.execution = newExecution(executionId, partition)
Expand Down Expand Up @@ -368,34 +367,6 @@ func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing)
c.execution.pluginTiming = events.TimingCollectionFromProto(timing)
}

func (c *Collector) cleanupCollectionDir() {
// list all folders alongside our collection temp dir
parent := filepath.Dir(c.collectionTempDir)
files, err := os.ReadDir(parent)
if err != nil {
slog.Warn("failed to list files in collection dir", "error", err)
return
}
for _, file := range files {
// if the file is a directory and is not our collection temp dir, remove it
if file.IsDir() && file.Name() != filepath.Base(c.collectionTempDir) {
// the folder name is the PID - check whether that pid exists
// if it doesn't, remove the folder
// Attempt to find the process
// try to parse the directory name as a pid
pid, err := strconv.ParseInt(file.Name(), 10, 32)
if err == nil {
if utils.PidExists(int(pid)) {
slog.Info(fmt.Sprintf("cleanupCollectionDir skipping directory '%s' as process with PID %d exists", file.Name(), pid))
continue
}
}
slog.Debug("removing directory", "dir", file.Name())
_ = os.RemoveAll(filepath.Join(parent, file.Name()))
}
}
}

func (c *Collector) Compact(ctx context.Context) error {
c.app.Send(AwaitingCompactionMsg{})
updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) {
Expand Down
1 change: 0 additions & 1 deletion internal/config/tailpipe_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,4 @@ func (c *TailpipeConfig) InitPartitions() {
partition.Plugin = plugin.NewPlugin(partition.InferPluginName())
}
}

}
9 changes: 7 additions & 2 deletions internal/database/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ...
dataDir := config.GlobalWorkspaceProfile.GetDataDir()
// Path to the Parquet directory
// hive structure is <workspace>/tp_table=<table_name>/tp_partition=<partition>/tp_index=<index>/tp_date=<date>.parquet
parquetPath := GetParquetFileGlob(dataDir, tableName, "")
parquetPath := GetParquetFileGlobForTable(dataDir, tableName, "")

// Step 1: Query the first Parquet file to infer columns
columns, err := getColumnNames(ctx, parquetPath, db)
Expand Down Expand Up @@ -102,11 +102,16 @@ func AddTableView(ctx context.Context, tableName string, db *sql.DB, filters ...
return nil
}

func GetParquetFileGlob(dataDir, tableName, fileRoot string) string {
func GetParquetFileGlobForTable(dataDir, tableName, fileRoot string) string {
parquetPath := fmt.Sprintf("%s/tp_table=%s/*/*/*/%s*.parquet", dataDir, tableName, fileRoot)
return parquetPath
}

func GetParquetFileGlobForPartition(dataDir, tableName, partitionName, fileRoot string) string {
parquetPath := fmt.Sprintf("%s/tp_table=%s/tp_partition=%s/*/*/%s*.parquet", dataDir, tableName, partitionName, fileRoot)
return parquetPath
}

// query the provided parquet path to get the columns
func getColumnNames(ctx context.Context, parquetPath string, db *sql.DB) ([]string, error) {
columnQuery := fmt.Sprintf("SELECT * FROM '%s' LIMIT 0", parquetPath) //nolint: gosec // this is a controlled query
Expand Down
34 changes: 15 additions & 19 deletions internal/display/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,23 @@ func ListPartitionResources(ctx context.Context) ([]*PartitionResource, error) {
return res, nil
}

func GetPartitionResource(ctx context.Context, partitionName string) (*PartitionResource, error) {
partitions := config.GlobalConfig.Partitions
for _, p := range partitions {
name := fmt.Sprintf("%s.%s", p.TableName, p.ShortName)
if name == partitionName {
partition := &PartitionResource{
Name: name,
Description: p.Description,
Plugin: p.Plugin.Alias,
}

err := partition.setFileInformation()
if err != nil {
return nil, fmt.Errorf("error setting file information: %w", err)
}

return partition, nil
}
func GetPartitionResource(partitionName string) (*PartitionResource, error) {
p, ok := config.GlobalConfig.Partitions[partitionName]
if !ok {
return nil, fmt.Errorf("no partitions found")
}
partition := &PartitionResource{
Name: partitionName,
Description: p.Description,
Plugin: p.Plugin.Alias,
}

err := partition.setFileInformation()
if err != nil {
return nil, fmt.Errorf("error setting file information: %w", err)
}

return nil, fmt.Errorf("partition '%s' not found", partitionName)
return partition, nil
}

func (r *PartitionResource) setFileInformation() error {
Expand Down
Loading
Loading