diff --git a/cmd/collect.go b/cmd/collect.go index e7db750e..660ac17e 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -10,16 +10,16 @@ import ( "github.com/danwakefield/fnmatch" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/exp/maps" + "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" - "github.com/turbot/pipe-fittings/constants" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/error_helpers" "github.com/turbot/pipe-fittings/parse" "github.com/turbot/tailpipe/internal/collector" "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/plugin_manager" - "golang.org/x/exp/maps" ) // NOTE: the hard coded config that was previously defined here has been moved to hcl in the file tailpipe/internal/parse/test_data/configs/resources.tpc @@ -74,54 +74,27 @@ func runCollectCmd(cmd *cobra.Command, args []string) { func collectAndCompact(ctx context.Context, args []string) error { // collect the data - statusStrings, timingStrings, err := doCollect(ctx, args) + err := doCollect(ctx, args) if err != nil { return err } - // compact the data - var compactStatusString string - if viper.GetBool(pconstants.ArgCompact) { - compactStatus, err := doCompaction(ctx) - // if the context was cancelled, we don't want to return an error - if err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("compaction error: %w", err) - } - compactStatusString = compactStatus.BriefString() - if ctx.Err() != nil { - // instead show the status as cancelled - compactStatusString = "Compaction cancelled: " + compactStatusString - } - } - - // now show the result - for i, statusString := range statusStrings { - fmt.Println(statusString) //nolint:forbidigo // ui output - // show timing if requested - if len(timingStrings) > i && shouldShowCollectTiming() { - fmt.Println(timingStrings[i]) //nolint:forbidigo // ui output - } - } - if compactStatusString != "" { - fmt.Println(compactStatusString) //nolint:forbidigo // ui output - } - return nil } -func doCollect(ctx context.Context, args []string) ([]string, []string, error) { +func doCollect(ctx context.Context, args []string) error { var fromTime time.Time if viper.GetString(pconstants.ArgFrom) != "" { var err error fromTime, err = parse.ParseTime(viper.GetString(pconstants.ArgFrom), time.Now()) if err != nil { - return nil, nil, fmt.Errorf("failed to parse 'from' argument: %w", err) + return fmt.Errorf("failed to parse 'from' argument: %w", err) } } partitions, err := getPartitions(args) if err != nil { - return nil, nil, fmt.Errorf("failed to get partition config: %w", err) + return fmt.Errorf("failed to get partition config: %w", err) } // now we have the partitions, we can start collecting @@ -131,31 +104,26 @@ func doCollect(ctx context.Context, args []string) ([]string, []string, error) { defer pluginManager.Close() // collect each partition serially - statusStrings := make([]string, 0, len(partitions)) - timingStrings := make([]string, 0, len(partitions)) var errList []error for _, partition := range partitions { - statusString, timingString, err := collectPartition(ctx, partition, fromTime, pluginManager) + err = collectPartition(ctx, partition, fromTime, pluginManager) if err != nil { errList = append(errList, err) - } else { - statusStrings = append(statusStrings, statusString) - timingStrings = append(timingStrings, timingString) } } if len(errList) > 0 { err = errors.Join(errList...) - return nil, nil, fmt.Errorf("collection error: %w", err) + return fmt.Errorf("collection error: %w", err) } - return statusStrings, timingStrings, nil + return nil } -func collectPartition(ctx context.Context, partition *config.Partition, fromTime time.Time, pluginManager *plugin_manager.PluginManager) (string, string, error) { +func collectPartition(ctx context.Context, partition *config.Partition, fromTime time.Time, pluginManager *plugin_manager.PluginManager) error { c, err := collector.New(pluginManager) if err != nil { - return "", "", fmt.Errorf("failed to create collector: %w", err) + return fmt.Errorf("failed to create collector: %w", err) } defer c.Close() @@ -164,12 +132,18 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", fromTime.Format("2006-01-02T15:04:05"))) } - if err := c.Collect(ctx, partition, fromTime); err != nil { - return "", "", err + if err = c.Collect(ctx, partition, fromTime); err != nil { + return err } // now wait for all collection to complete and close the collector c.WaitForCompletion(ctx) - return c.StatusString(), c.TimingString(), nil + + err = c.Compact(ctx) + if err != nil { + return err + } + + return nil } func getPartitions(args []string) ([]*config.Partition, error) { @@ -267,7 +241,3 @@ func setExitCodeForCollectError(err error) { // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106 exitCode = 1 } - -func shouldShowCollectTiming() bool { - return viper.GetBool(constants.ArgTiming) -} diff --git a/go.mod b/go.mod index f110e6de..be4c739f 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( github.com/alecthomas/chroma v0.10.0 github.com/briandowns/spinner v1.23.0 github.com/c-bata/go-prompt v0.0.0-00010101000000-000000000000 + github.com/charmbracelet/bubbles v0.20.0 + github.com/charmbracelet/bubbletea v1.2.4 github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 github.com/dustin/go-humanize v1.0.1 github.com/fsnotify/fsnotify v1.7.0 @@ -82,11 +84,16 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect github.com/aws/smithy-go v1.20.3 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.0 // indirect github.com/bmatcuk/doublestar v1.3.4 // indirect github.com/btubbs/datetime v0.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/charmbracelet/harmonica v0.2.0 // indirect + github.com/charmbracelet/lipgloss v1.0.0 // indirect + github.com/charmbracelet/x/ansi v0.4.5 // indirect + github.com/charmbracelet/x/term v0.2.1 // indirect github.com/containerd/containerd v1.7.18 // indirect github.com/containerd/errdefs v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect @@ -95,6 +102,7 @@ require ( github.com/dgraph-io/ristretto v0.2.0 // indirect github.com/dlclark/regexp2 v1.4.0 // indirect github.com/elastic/go-grok v0.3.1 // indirect + github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect @@ -144,9 +152,11 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-tty v0.0.3 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect @@ -155,6 +165,9 @@ require ( github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/locker v1.0.1 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/termenv v0.15.2 // indirect github.com/oklog/run v1.0.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8bc9fb44..37d03719 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudr github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ= github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ00z/TKoufEY6K/a0k6AhaJrQKdFe6OfVXsa4= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= @@ -265,6 +267,18 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/bubbles v0.20.0 h1:jSZu6qD8cRQ6k9OMfR1WlM+ruM8fkPWkHvQWD9LIutE= +github.com/charmbracelet/bubbles v0.20.0/go.mod h1:39slydyswPy+uVOHZ5x/GjwVAFkCsV8IIVy+4MhzwwU= +github.com/charmbracelet/bubbletea v1.2.4 h1:KN8aCViA0eps9SCOThb2/XPIlea3ANJLUkv3KnQRNCE= +github.com/charmbracelet/bubbletea v1.2.4/go.mod h1:Qr6fVQw+wX7JkWWkVyXYk/ZUQ92a6XNekLXa3rR18MM= +github.com/charmbracelet/harmonica v0.2.0 h1:8NxJWRWg/bzKqqEaaeFNipOu77YR5t8aSwG4pgaUBiQ= +github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao= +github.com/charmbracelet/lipgloss v1.0.0 h1:O7VkGDvqEdGi93X+DeqsQ7PKHDgtQfF8j8/O2qFMQNg= +github.com/charmbracelet/lipgloss v1.0.0/go.mod h1:U5fy9Z+C38obMs+T+tJqst9VGzlOYGj4ri9reL3qUlo= +github.com/charmbracelet/x/ansi v0.4.5 h1:LqK4vwBNaXw2AyGIICa5/29Sbdq58GbGdFngSexTdRM= +github.com/charmbracelet/x/ansi v0.4.5/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= +github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= +github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -320,6 +334,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= @@ -573,6 +589,8 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -594,6 +612,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -619,6 +639,12 @@ github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= @@ -963,6 +989,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/collector/collector.go b/internal/collector/collector.go index bb2ef2c8..3a5f6afa 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -12,8 +12,9 @@ import ( "strings" "time" - "github.com/briandowns/spinner" + tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" + "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" "github.com/turbot/tailpipe-plugin-sdk/events" @@ -37,7 +38,7 @@ type Collector struct { execution *execution parquetWriter *parquet.ParquetJobPool - spinner *spinner.Spinner + // the current plugin status - used to update the spinner status status @@ -46,6 +47,9 @@ type Collector struct { collectionTempDir string sourcePath string + + // bubble tea app + app *tea.Program } func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { @@ -69,20 +73,18 @@ func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { } c.sourcePath = sourcePath - // TODO #ui temp - c.spinner = spinner.New( - spinner.CharSets[14], - 100*time.Millisecond, - spinner.WithHiddenCursor(true), - spinner.WithWriter(os.Stdout), - ) - return c, nil } func (c *Collector) Close() { close(c.Events) - c.spinner.Stop() + + c.parquetWriter.Close() + + c.app.Send(CollectionFinishedMsg{}) + + // if inbox path is empty, remove it (ignore errors) + _ = os.Remove(c.sourcePath) // delete the collection temp dir _ = os.RemoveAll(c.collectionTempDir) @@ -101,9 +103,10 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr if err != nil { return fmt.Errorf("failed to collect: %w", err) } - fmt.Printf("Collecting partition '%s' from %s (%s)\n", partition.Name(), collectResponse.FromTime.Time.Format(time.DateTime), collectResponse.FromTime.Source) //nolint:forbidigo//UI output - c.spinner.Start() - c.spinner.Suffix = " Collecting logs" + + c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime)) + //nolint:errcheck // handle this later + go c.app.Run() // TODO: #error handling of errors executionId := collectResponse.ExecutionId // add the execution to the map @@ -127,7 +130,7 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr rowCount, err := c.parquetWriter.GetRowCount() if err == nil { c.status.SetRowsConverted(rowCount) - c.setStatusMessage() + c.app.Send(c.status) } } } @@ -155,7 +158,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { c.execution.state = ExecutionState_STARTED case *proto.Event_StatusEvent: c.status.UpdateWithPluginStatus(e.GetStatusEvent()) - c.setStatusMessage() + c.app.Send(c.status) case *proto.Event_ChunkWrittenEvent: ev := e.GetChunkWrittenEvent() @@ -219,10 +222,6 @@ func (c *Collector) WaitForCompletion(ctx context.Context) { slog.Error("error waiting for execution to complete", "error", err) } - c.parquetWriter.Close() - - // if inbox path is empty, remove it (ignore errors) - _ = os.Remove(c.sourcePath) } func (c *Collector) StatusString() string { @@ -261,7 +260,7 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl // TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1 executionTimeout := executionMaxDuration - retryInterval := 5 * time.Second + retryInterval := 200 * time.Millisecond c.execution.totalRows = ce.RowCount c.execution.chunkCount = ce.ChunkCount @@ -298,15 +297,8 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl // not all chunks have been written return retry.RetryableError(fmt.Errorf("not all chunks have been written")) } - // so we are done writing chunks - now update the db to add a view to this data - // Open a DuckDB connection - db, err := sql.Open("duckdb", filepaths.TailpipeDbFilePath()) - if err != nil { - return err - } - defer db.Close() - return database.AddTableView(ctx, c.execution.table, db) + return nil }) slog.Debug("waitForConversions - all chunks written", "execution", c.execution.id) @@ -319,6 +311,18 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl return err } + // so we are done writing chunks - now update the db to add a view to this data + // Open a DuckDB connection + db, err := sql.Open("duckdb", filepaths.TailpipeDbFilePath()) + if err != nil { + return err + } + defer db.Close() + + err = database.AddTableView(ctx, c.execution.table, db) + if err != nil { + return err + } // notify the writer that the collection is complete return c.parquetWriter.JobGroupComplete(ce.ExecutionId) } @@ -327,7 +331,9 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl func (c *Collector) waitForExecution(ctx context.Context) error { // TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1 executionTimeout := executionMaxDuration - retryInterval := 500 * time.Millisecond + retryInterval := 100 * time.Millisecond + + slog.Error("waiting for execution") err := retry.Do(ctx, retry.WithMaxDuration(executionTimeout, retry.NewConstant(retryInterval)), func(ctx context.Context) error { switch c.execution.state { @@ -345,6 +351,7 @@ func (c *Collector) waitForExecution(ctx context.Context) error { } return err } + slog.Error("done waiting for execution") return nil } @@ -357,10 +364,6 @@ func (c *Collector) listenToEventsAsync(ctx context.Context) { }() } -func (c *Collector) setStatusMessage() { - c.spinner.Suffix = " " + c.status.String() -} - func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) { c.execution.pluginTiming = events.TimingCollectionFromProto(timing) } @@ -392,3 +395,15 @@ func (c *Collector) cleanupCollectionDir() { } } } + +func (c *Collector) Compact(ctx context.Context) error { + c.app.Send(AwaitingCompactionMsg{}) + updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) { + c.app.Send(CompactionStatusUpdateMsg{status: &compactionStatus}) + } + err := parquet.CompactDataFiles(ctx, updateAppCompactionFunc) + if err != nil { + return fmt.Errorf("failed to compact data files: %w", err) + } + return nil +} diff --git a/internal/collector/status.go b/internal/collector/status.go index f0393d3b..19a48236 100644 --- a/internal/collector/status.go +++ b/internal/collector/status.go @@ -6,19 +6,27 @@ import ( ) type status struct { - ArtifactsDiscovered int64 - ArtifactsDownloaded int64 - ArtifactsExtracted int64 - RowsEnriched int64 - RowsConverted int64 - Errors int32 + LatestArtifactPath string + ArtifactsDiscovered int64 + ArtifactsDownloaded int64 + ArtifactsDownloadedBytes int64 + ArtifactsExtracted int64 + ArtifactErrors int64 + RowsReceived int64 + RowsEnriched int64 + RowsConverted int64 + Errors int64 } // UpdateWithPluginStatus updates the status with the values from the plugin status event func (s *status) UpdateWithPluginStatus(event *proto.EventStatus) { + s.LatestArtifactPath = event.LatestArtifactPath s.ArtifactsDiscovered = event.ArtifactsDiscovered s.ArtifactsDownloaded = event.ArtifactsDownloaded + s.ArtifactsDownloadedBytes = event.ArtifactsDownloadedBytes s.ArtifactsExtracted = event.ArtifactsExtracted + s.ArtifactErrors = event.ArtifactErrors + s.RowsReceived = event.RowsReceived s.RowsEnriched = event.RowsEnriched s.Errors = event.Errors } diff --git a/internal/collector/tui.go b/internal/collector/tui.go new file mode 100644 index 00000000..31c72db8 --- /dev/null +++ b/internal/collector/tui.go @@ -0,0 +1,182 @@ +package collector + +import ( + "fmt" + "strings" + "time" + + "github.com/charmbracelet/bubbletea" + "github.com/dustin/go-humanize" + + "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe/internal/parquet" +) + +type collectionModel struct { + partitionName string + fromTime row_source.ResolvedFromTime + + // artifacts + path string + discovered int64 + downloaded int64 + downloadedBytes int64 + extracted int64 + errors int64 + + // rows + rowsReceived int64 + rowsEnriched int64 + rowsConverted int64 + rowsErrors int64 + + complete bool + initiated time.Time + + // compaction + compactionStatus *parquet.CompactionStatus +} + +type CollectionFinishedMsg struct{} + +type AwaitingCompactionMsg struct{} + +type CompactionStatusUpdateMsg struct { + status *parquet.CompactionStatus +} + +func newCollectionModel(partitionName string, fromTime row_source.ResolvedFromTime) collectionModel { + return collectionModel{ + partitionName: partitionName, + fromTime: fromTime, + initiated: time.Now(), + compactionStatus: nil, + } +} + +func (c collectionModel) Init() tea.Cmd { + return nil +} + +func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch t := msg.(type) { + case tea.KeyMsg: + switch t.String() { + case "ctrl+c": + // TODO: Handle graceful exit + return c, tea.Quit + } + case CollectionFinishedMsg: + return c, tea.Quit + case status: + c.path = t.LatestArtifactPath + c.discovered = t.ArtifactsDiscovered + c.downloaded = t.ArtifactsDownloaded + c.downloadedBytes = t.ArtifactsDownloadedBytes + c.extracted = t.ArtifactsExtracted + c.errors = t.ArtifactErrors + c.rowsReceived = t.RowsReceived + c.rowsEnriched = t.RowsEnriched + c.rowsConverted = t.RowsConverted + c.rowsErrors = t.Errors + return c, nil + case AwaitingCompactionMsg: + cs := parquet.CompactionStatus{} + c.complete = true + c.compactionStatus = &cs + return c, nil + case CompactionStatusUpdateMsg: + if c.compactionStatus == nil { + c.compactionStatus = t.status + return c, nil + } else { + c.compactionStatus.Update(*t.status) + return c, nil + } + } + return c, nil +} + +func (c collectionModel) View() string { + var b strings.Builder + var countLength int = 5 + var descriptionLength int = 12 + var downloadedDisplay string + + countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) + countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) + if c.downloadedBytes < 0 { + downloadedDisplay = "0 B" // Handle negative values gracefully + } else { + downloadedDisplay = humanize.Bytes(uint64(c.downloadedBytes)) + } + if countArtifactsDisplayLen > countLength { + countLength = countArtifactsDisplayLen + } + if countRowsDisplayLen > countLength { + countLength = countRowsDisplayLen + descriptionLength = 11 + } + + collectionComplete := c.complete || c.compactionStatus != nil + displayPath := c.path + timeLabel := "Time:" + + if collectionComplete { + // TODO: #tactical we should clear path in event once complete + displayPath = "" + timeLabel = "Completed:" + } + + // header + b.WriteString(fmt.Sprintf("\nCollecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) + + // artifacts + if c.path != "" || c.discovered > 0 { + if strings.Contains(displayPath, "/") { + displayPath = displayPath[strings.LastIndex(displayPath, "/")+1:] + } + + b.WriteString("Artifacts:\n") + b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &displayPath)) + b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) + b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) + if c.errors > 0 { + b.WriteString(writeCountLine("Errors:", descriptionLength, c.errors, countLength, nil)) + } + b.WriteString("\n") + } + + // rows + b.WriteString("Rows:\n") + b.WriteString(writeCountLine("Received:", descriptionLength, c.rowsReceived, countLength, nil)) + b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) + b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) + if c.rowsErrors > 0 { + b.WriteString(writeCountLine("Errors:", descriptionLength, c.rowsErrors, countLength, nil)) + } + b.WriteString("\n") + + // compaction + if c.compactionStatus != nil { + b.WriteString("File Compaction:\n") + b.WriteString(fmt.Sprintf(" Compacted: %d => %d\n", c.compactionStatus.Source, c.compactionStatus.Dest)) + b.WriteString(fmt.Sprintf(" Skipped: %d\n", c.compactionStatus.Uncompacted)) + b.WriteString("\n") + } + + // run time + duration := time.Since(c.initiated) + b.WriteString(fmt.Sprintf("%s %s\n", timeLabel, utils.HumanizeDuration(duration))) + + return b.String() +} + +func writeCountLine(desc string, descLen int, count int64, maxCountLen int, suffix *string) string { + s := "" + if suffix != nil { + s = *suffix + } + return fmt.Sprintf(" %-*s%*s %s\n", descLen, desc, maxCountLen, humanize.Comma(count), s) +} diff --git a/internal/display/plugin.go b/internal/display/plugin.go index d3c11400..dbd5d77d 100644 --- a/internal/display/plugin.go +++ b/internal/display/plugin.go @@ -89,7 +89,7 @@ func GetPluginResource(ctx context.Context, name string) (*PluginResource, error slices.Sort(sources) var tables []string - for table, _ := range desc.TableSchemas { + for table := range desc.TableSchemas { tables = append(tables, table) } slices.Sort(tables)