From 155991cb0b67a02f7ee89bc5d0025e0251130bd5 Mon Sep 17 00:00:00 2001 From: kai Date: Fri, 17 Jan 2025 16:44:18 +0000 Subject: [PATCH 01/13] add missing status fields remove spinner from collector --- internal/collector/collector.go | 24 ++++++++---------------- internal/collector/status.go | 16 ++++++++++------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index bb2ef2c8..84d61ed8 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -12,7 +12,6 @@ import ( "strings" "time" - "github.com/briandowns/spinner" "github.com/sethvargo/go-retry" "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" @@ -37,7 +36,7 @@ type Collector struct { execution *execution parquetWriter *parquet.ParquetJobPool - spinner *spinner.Spinner + // the current plugin status - used to update the spinner status status @@ -69,20 +68,14 @@ func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { } c.sourcePath = sourcePath - // TODO #ui temp - c.spinner = spinner.New( - spinner.CharSets[14], - 100*time.Millisecond, - spinner.WithHiddenCursor(true), - spinner.WithWriter(os.Stdout), - ) + // create bubbletea app + //c.app = tea.NewProgram(model.NewModel(), tea.WithAltScreen()) return c, nil } func (c *Collector) Close() { close(c.Events) - c.spinner.Stop() // delete the collection temp dir _ = os.RemoveAll(c.collectionTempDir) @@ -102,8 +95,6 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr return fmt.Errorf("failed to collect: %w", err) } fmt.Printf("Collecting partition '%s' from %s (%s)\n", partition.Name(), collectResponse.FromTime.Time.Format(time.DateTime), collectResponse.FromTime.Source) //nolint:forbidigo//UI output - c.spinner.Start() - c.spinner.Suffix = " Collecting logs" executionId := collectResponse.ExecutionId // add the execution to the map @@ -127,7 +118,7 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr rowCount, err := c.parquetWriter.GetRowCount() if err == nil { c.status.SetRowsConverted(rowCount) - c.setStatusMessage() + c.updateUI() } } } @@ -155,7 +146,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { c.execution.state = ExecutionState_STARTED case *proto.Event_StatusEvent: c.status.UpdateWithPluginStatus(e.GetStatusEvent()) - c.setStatusMessage() + c.updateUI() case *proto.Event_ChunkWrittenEvent: ev := e.GetChunkWrittenEvent() @@ -357,8 +348,9 @@ func (c *Collector) listenToEventsAsync(ctx context.Context) { }() } -func (c *Collector) setStatusMessage() { - c.spinner.Suffix = " " + c.status.String() +func (c *Collector) updateUI() { + // updatye bubble teas app + // c.app.Update(model.NewModel(c.status.String(), c.execution.getTiming().String())) } func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) { diff --git a/internal/collector/status.go b/internal/collector/status.go index f0393d3b..32df224f 100644 --- a/internal/collector/status.go +++ b/internal/collector/status.go @@ -6,12 +6,16 @@ import ( ) type status struct { - ArtifactsDiscovered int64 - ArtifactsDownloaded int64 - ArtifactsExtracted int64 - RowsEnriched int64 - RowsConverted int64 - Errors int32 + LatestPath string + ArtifactsDiscovered int64 + ArtifactsDownloaded int64 + ArtifactsDownloadedBytes uint64 + ArtifactsExtracted int64 + ArtifactErrors int64 + RowsReceived int64 + RowsEnriched int64 + RowsConverted int64 + Errors int32 } // UpdateWithPluginStatus updates the status with the values from the plugin status event From 6e20ea3559327c1736753746b665d99714f72d9f Mon Sep 17 00:00:00 2001 From: Graza Date: Mon, 20 Jan 2025 11:33:36 +0000 Subject: [PATCH 02/13] feat(wip): collect ui --- go.mod | 13 +++ go.sum | 27 ++++++ internal/collector/collector.go | 21 ++--- internal/collector/status.go | 4 +- internal/collector/tui.go | 159 ++++++++++++++++++++++++++++++++ 5 files changed, 211 insertions(+), 13 deletions(-) create mode 100644 internal/collector/tui.go diff --git a/go.mod b/go.mod index f110e6de..be4c739f 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( github.com/alecthomas/chroma v0.10.0 github.com/briandowns/spinner v1.23.0 github.com/c-bata/go-prompt v0.0.0-00010101000000-000000000000 + github.com/charmbracelet/bubbles v0.20.0 + github.com/charmbracelet/bubbletea v1.2.4 github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 github.com/dustin/go-humanize v1.0.1 github.com/fsnotify/fsnotify v1.7.0 @@ -82,11 +84,16 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect github.com/aws/smithy-go v1.20.3 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.0 // indirect github.com/bmatcuk/doublestar v1.3.4 // indirect github.com/btubbs/datetime v0.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/charmbracelet/harmonica v0.2.0 // indirect + github.com/charmbracelet/lipgloss v1.0.0 // indirect + github.com/charmbracelet/x/ansi v0.4.5 // indirect + github.com/charmbracelet/x/term v0.2.1 // indirect github.com/containerd/containerd v1.7.18 // indirect github.com/containerd/errdefs v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect @@ -95,6 +102,7 @@ require ( github.com/dgraph-io/ristretto v0.2.0 // indirect github.com/dlclark/regexp2 v1.4.0 // indirect github.com/elastic/go-grok v0.3.1 // indirect + github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect @@ -144,9 +152,11 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-tty v0.0.3 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect @@ -155,6 +165,9 @@ require ( github.com/mitchellh/go-wordwrap v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/locker v1.0.1 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/termenv v0.15.2 // indirect github.com/oklog/run v1.0.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8bc9fb44..37d03719 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudr github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ= github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ00z/TKoufEY6K/a0k6AhaJrQKdFe6OfVXsa4= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= @@ -265,6 +267,18 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/bubbles v0.20.0 h1:jSZu6qD8cRQ6k9OMfR1WlM+ruM8fkPWkHvQWD9LIutE= +github.com/charmbracelet/bubbles v0.20.0/go.mod h1:39slydyswPy+uVOHZ5x/GjwVAFkCsV8IIVy+4MhzwwU= +github.com/charmbracelet/bubbletea v1.2.4 h1:KN8aCViA0eps9SCOThb2/XPIlea3ANJLUkv3KnQRNCE= +github.com/charmbracelet/bubbletea v1.2.4/go.mod h1:Qr6fVQw+wX7JkWWkVyXYk/ZUQ92a6XNekLXa3rR18MM= +github.com/charmbracelet/harmonica v0.2.0 h1:8NxJWRWg/bzKqqEaaeFNipOu77YR5t8aSwG4pgaUBiQ= +github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao= +github.com/charmbracelet/lipgloss v1.0.0 h1:O7VkGDvqEdGi93X+DeqsQ7PKHDgtQfF8j8/O2qFMQNg= +github.com/charmbracelet/lipgloss v1.0.0/go.mod h1:U5fy9Z+C38obMs+T+tJqst9VGzlOYGj4ri9reL3qUlo= +github.com/charmbracelet/x/ansi v0.4.5 h1:LqK4vwBNaXw2AyGIICa5/29Sbdq58GbGdFngSexTdRM= +github.com/charmbracelet/x/ansi v0.4.5/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= +github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= +github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -320,6 +334,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= @@ -573,6 +589,8 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -594,6 +612,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -619,6 +639,12 @@ github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= @@ -963,6 +989,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 84d61ed8..7d1d6d1f 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -12,6 +12,7 @@ import ( "strings" "time" + tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" @@ -45,6 +46,9 @@ type Collector struct { collectionTempDir string sourcePath string + + // bubble tea app + app *tea.Program } func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { @@ -68,9 +72,6 @@ func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { } c.sourcePath = sourcePath - // create bubbletea app - //c.app = tea.NewProgram(model.NewModel(), tea.WithAltScreen()) - return c, nil } @@ -94,7 +95,10 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr if err != nil { return fmt.Errorf("failed to collect: %w", err) } - fmt.Printf("Collecting partition '%s' from %s (%s)\n", partition.Name(), collectResponse.FromTime.Time.Format(time.DateTime), collectResponse.FromTime.Source) //nolint:forbidigo//UI output + //fmt.Printf("Collecting partition '%s' from %s (%s)\n", partition.Name(), collectResponse.FromTime.Time.Format(time.DateTime), collectResponse.FromTime.Source) //nolint:forbidigo//UI output + + c.app = tea.NewProgram(newCollectionModel(partition.FullName, *collectResponse.FromTime)) + go c.app.Run() // TODO: #error handling of errors executionId := collectResponse.ExecutionId // add the execution to the map @@ -118,7 +122,7 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr rowCount, err := c.parquetWriter.GetRowCount() if err == nil { c.status.SetRowsConverted(rowCount) - c.updateUI() + c.app.Send(c.status) } } } @@ -146,7 +150,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { c.execution.state = ExecutionState_STARTED case *proto.Event_StatusEvent: c.status.UpdateWithPluginStatus(e.GetStatusEvent()) - c.updateUI() + c.app.Send(c.status) case *proto.Event_ChunkWrittenEvent: ev := e.GetChunkWrittenEvent() @@ -348,11 +352,6 @@ func (c *Collector) listenToEventsAsync(ctx context.Context) { }() } -func (c *Collector) updateUI() { - // updatye bubble teas app - // c.app.Update(model.NewModel(c.status.String(), c.execution.getTiming().String())) -} - func (c *Collector) setPluginTiming(executionId string, timing []*proto.Timing) { c.execution.pluginTiming = events.TimingCollectionFromProto(timing) } diff --git a/internal/collector/status.go b/internal/collector/status.go index 32df224f..0b8a7d97 100644 --- a/internal/collector/status.go +++ b/internal/collector/status.go @@ -15,7 +15,7 @@ type status struct { RowsReceived int64 RowsEnriched int64 RowsConverted int64 - Errors int32 + Errors int64 } // UpdateWithPluginStatus updates the status with the values from the plugin status event @@ -24,7 +24,7 @@ func (s *status) UpdateWithPluginStatus(event *proto.EventStatus) { s.ArtifactsDownloaded = event.ArtifactsDownloaded s.ArtifactsExtracted = event.ArtifactsExtracted s.RowsEnriched = event.RowsEnriched - s.Errors = event.Errors + s.Errors = int64(event.Errors) } func (s *status) SetRowsConverted(rowsConverted int64) { diff --git a/internal/collector/tui.go b/internal/collector/tui.go new file mode 100644 index 00000000..387e7816 --- /dev/null +++ b/internal/collector/tui.go @@ -0,0 +1,159 @@ +package collector + +import ( + "fmt" + "math" + "strings" + "time" + + "github.com/charmbracelet/bubbles/progress" + "github.com/charmbracelet/bubbletea" + "github.com/dustin/go-humanize" + + "github.com/turbot/pipe-fittings/utils" + "github.com/turbot/tailpipe-plugin-sdk/row_source" +) + +type collectionModel struct { + partitionName string + fromTime row_source.ResolvedFromTime + + // artifacts + path string + discovered int64 + downloaded int64 + downloadedBytes uint64 + extracted int64 + errors int64 + + // rows + rowsReceived int64 + rowsEnriched int64 + rowsConverted int64 + rowsErrors int64 + + complete bool + terminalWidth int + initiated time.Time + progressBarConfig progress.Model +} + +type collectionCompleteMsg struct{} + +func newCollectionModel(partitionName string, fromTime row_source.ResolvedFromTime) collectionModel { + return collectionModel{ + partitionName: partitionName, + fromTime: fromTime, + initiated: time.Now(), + progressBarConfig: progress.New(progress.WithWidth(20), progress.WithFillCharacters('=', '-'), progress.WithColorProfile(3), progress.WithoutPercentage()), + } +} + +func (c collectionModel) Init() tea.Cmd { + return nil +} + +func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch t := msg.(type) { + case tea.KeyMsg: + switch t.String() { + case "ctrl+c": + // TODO: Handle graceful exit + return c, tea.Quit + } + case collectionCompleteMsg: + c.complete = true + return c, nil + case status: + c.path = t.LatestPath + c.discovered = t.ArtifactsDiscovered + c.downloaded = t.ArtifactsDownloaded + c.downloadedBytes = t.ArtifactsDownloadedBytes + c.extracted = t.ArtifactsExtracted + c.errors = t.ArtifactErrors + c.rowsReceived = t.RowsReceived + c.rowsEnriched = t.RowsEnriched + c.rowsConverted = t.RowsConverted + c.rowsErrors = t.Errors + return c, nil + } + return c, nil +} + +func (c collectionModel) View() string { + var b strings.Builder + var countLength int = 5 + var descriptionLength int = 12 + countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) + countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) + downloadedDisplay := fmt.Sprintf("(%s)", humanize.Bytes(c.downloadedBytes)) + if c.rowsReceived > c.discovered { + descriptionLength = 11 + } + if countArtifactsDisplayLen > countLength { + countLength = countArtifactsDisplayLen + } + if countRowsDisplayLen > countLength { + countLength = countRowsDisplayLen + } + + // header + b.WriteString(fmt.Sprintf("Collecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) + + // artifacts + if c.path != "" || c.discovered > 0 { + b.WriteString("Artifacts:\n") + b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &c.path)) + if c.complete { + b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) + b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) + } else { + b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig)) + b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig)) + } + if c.errors > 0 { + b.WriteString(writeCountLine("Errors:", descriptionLength, c.errors, countLength, nil)) + } + b.WriteString("\n") + } + + // rows + b.WriteString("Rows:\n") + b.WriteString(writeCountLine("Received:", descriptionLength, c.rowsReceived, countLength, nil)) + if c.complete { + b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) + b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) + } else { + b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig)) + b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig)) + } + if c.rowsErrors > 0 { + b.WriteString(writeCountLine("Errors:", descriptionLength, c.rowsErrors, countLength, nil)) + } + b.WriteString("\n") + + // run time + duration := time.Since(c.initiated) + b.WriteString(fmt.Sprintf("Time: %s\n", utils.HumanizeDuration(duration))) + + return b.String() +} + +func writeCountLine(desc string, descLen int, count int64, maxCountLen int, suffix *string) string { + s := "" + if suffix != nil { + s = *suffix + } + return fmt.Sprintf(" %-*s%*s %s\n", descLen, desc, maxCountLen, humanize.Comma(count), s) +} + +func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, percent float64, suffix *string, pb *progress.Model) string { + s := "" + if suffix != nil { + s = *suffix + } + if math.IsNaN(percent) { + percent = 0 + } + return fmt.Sprintf(" %-*s%*s [%s] %3.0f%% %s\n", descLen, desc, maxCountLen, humanize.Comma(count), pb.ViewAs(percent), percent*100, s) +} From 6319b64e5fda069b67265f194dc791441fcd294f Mon Sep 17 00:00:00 2001 From: Graza Date: Mon, 20 Jan 2025 11:40:26 +0000 Subject: [PATCH 03/13] feat(wip): collect ui - floor percentages --- internal/collector/tui.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/collector/tui.go b/internal/collector/tui.go index 387e7816..d00ceb15 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -155,5 +155,5 @@ func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, p if math.IsNaN(percent) { percent = 0 } - return fmt.Sprintf(" %-*s%*s [%s] %3.0f%% %s\n", descLen, desc, maxCountLen, humanize.Comma(count), pb.ViewAs(percent), percent*100, s) + return fmt.Sprintf(" %-*s%*s [%s] %3.0f%% %s\n", descLen, desc, maxCountLen, humanize.Comma(count), pb.ViewAs(percent), math.Floor(percent*100), s) } From ce594792bbb72ad7449aa40708147116645a1cdc Mon Sep 17 00:00:00 2001 From: Graza Date: Mon, 20 Jan 2025 11:53:12 +0000 Subject: [PATCH 04/13] feat(wip): collect ui - send completition event --- internal/collector/collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 7d1d6d1f..ea9b5169 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -215,6 +215,7 @@ func (c *Collector) WaitForCompletion(ctx context.Context) { } c.parquetWriter.Close() + c.app.Send(collectionCompleteMsg{}) // if inbox path is empty, remove it (ignore errors) _ = os.Remove(c.sourcePath) From 46e82b26848ef90b087959cf1315a10b5cbe1818 Mon Sep 17 00:00:00 2001 From: kai Date: Mon, 20 Jan 2025 14:14:38 +0000 Subject: [PATCH 05/13] Add DownloadedArtifactInfo with LocalName and size - this is what Loaders and Extractors accept as arg now Rename ArtifactInfo OriginalName to Name Pass latest artifact path, downloaded bytes and rows received in status messages --- internal/collector/status.go | 10 +++++++--- internal/collector/tui.go | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/collector/status.go b/internal/collector/status.go index 0b8a7d97..19a48236 100644 --- a/internal/collector/status.go +++ b/internal/collector/status.go @@ -6,10 +6,10 @@ import ( ) type status struct { - LatestPath string + LatestArtifactPath string ArtifactsDiscovered int64 ArtifactsDownloaded int64 - ArtifactsDownloadedBytes uint64 + ArtifactsDownloadedBytes int64 ArtifactsExtracted int64 ArtifactErrors int64 RowsReceived int64 @@ -20,11 +20,15 @@ type status struct { // UpdateWithPluginStatus updates the status with the values from the plugin status event func (s *status) UpdateWithPluginStatus(event *proto.EventStatus) { + s.LatestArtifactPath = event.LatestArtifactPath s.ArtifactsDiscovered = event.ArtifactsDiscovered s.ArtifactsDownloaded = event.ArtifactsDownloaded + s.ArtifactsDownloadedBytes = event.ArtifactsDownloadedBytes s.ArtifactsExtracted = event.ArtifactsExtracted + s.ArtifactErrors = event.ArtifactErrors + s.RowsReceived = event.RowsReceived s.RowsEnriched = event.RowsEnriched - s.Errors = int64(event.Errors) + s.Errors = event.Errors } func (s *status) SetRowsConverted(rowsConverted int64) { diff --git a/internal/collector/tui.go b/internal/collector/tui.go index d00ceb15..19cfc33a 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -22,7 +22,7 @@ type collectionModel struct { path string discovered int64 downloaded int64 - downloadedBytes uint64 + downloadedBytes int64 extracted int64 errors int64 @@ -65,7 +65,7 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { c.complete = true return c, nil case status: - c.path = t.LatestPath + c.path = t.LatestArtifactPath c.discovered = t.ArtifactsDiscovered c.downloaded = t.ArtifactsDownloaded c.downloadedBytes = t.ArtifactsDownloadedBytes @@ -86,7 +86,7 @@ func (c collectionModel) View() string { var descriptionLength int = 12 countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) - downloadedDisplay := fmt.Sprintf("(%s)", humanize.Bytes(c.downloadedBytes)) + downloadedDisplay := fmt.Sprintf("(%s)", humanize.Bytes((uint64)(c.downloadedBytes))) if c.rowsReceived > c.discovered { descriptionLength = 11 } From c5252984803aed3c5dc3b7b3cb38a3e0bb27e58e Mon Sep 17 00:00:00 2001 From: Graza Date: Mon, 20 Jan 2025 15:06:34 +0000 Subject: [PATCH 06/13] feat(wip): collect ui - progress bars shouldn't hit 100% if collection isn't completed --- internal/collector/tui.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/collector/tui.go b/internal/collector/tui.go index 19cfc33a..df2a42b3 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -87,14 +87,12 @@ func (c collectionModel) View() string { countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) downloadedDisplay := fmt.Sprintf("(%s)", humanize.Bytes((uint64)(c.downloadedBytes))) - if c.rowsReceived > c.discovered { - descriptionLength = 11 - } if countArtifactsDisplayLen > countLength { countLength = countArtifactsDisplayLen } if countRowsDisplayLen > countLength { countLength = countRowsDisplayLen + descriptionLength = 11 } // header @@ -102,14 +100,18 @@ func (c collectionModel) View() string { // artifacts if c.path != "" || c.discovered > 0 { + if c.complete { + // TODO: #tactical we should clear path in event once complete + c.path = "" + } b.WriteString("Artifacts:\n") b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &c.path)) if c.complete { b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) } else { - b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig)) - b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig)) + b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig, c.complete)) + b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig, c.complete)) } if c.errors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.errors, countLength, nil)) @@ -124,8 +126,8 @@ func (c collectionModel) View() string { b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) } else { - b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig)) - b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig)) + b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig, c.complete)) + b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig, c.complete)) } if c.rowsErrors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.rowsErrors, countLength, nil)) @@ -147,7 +149,7 @@ func writeCountLine(desc string, descLen int, count int64, maxCountLen int, suff return fmt.Sprintf(" %-*s%*s %s\n", descLen, desc, maxCountLen, humanize.Comma(count), s) } -func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, percent float64, suffix *string, pb *progress.Model) string { +func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, percent float64, suffix *string, pb *progress.Model, complete bool) string { s := "" if suffix != nil { s = *suffix @@ -155,5 +157,9 @@ func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, p if math.IsNaN(percent) { percent = 0 } + // TODO: #hack review - essentially if we're not complete, we shouldn't be 100% on any progress bar + if !complete && percent >= 1.0 { + percent = 0.99 + } return fmt.Sprintf(" %-*s%*s [%s] %3.0f%% %s\n", descLen, desc, maxCountLen, humanize.Comma(count), pb.ViewAs(percent), math.Floor(percent*100), s) } From 24ac2f46db200367a1912c44a6915930f52f78f4 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 11:17:32 +0000 Subject: [PATCH 07/13] . --- internal/collector/tui.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/collector/tui.go b/internal/collector/tui.go index df2a42b3..e7da216e 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -96,7 +96,7 @@ func (c collectionModel) View() string { } // header - b.WriteString(fmt.Sprintf("Collecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) + b.WriteString(fmt.Sprintf("\nCollecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) // artifacts if c.path != "" || c.discovered > 0 { From bd2b56e8d007e79a6273703a92291e23b70ffe97 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 12:18:49 +0000 Subject: [PATCH 08/13] refactor(collect): added compaction to collect --- cmd/collect.go | 61 ++++++++++++++++++--------------- internal/collector/collector.go | 23 ++++++++++--- internal/collector/tui.go | 40 +++++++++++++++------ 3 files changed, 82 insertions(+), 42 deletions(-) diff --git a/cmd/collect.go b/cmd/collect.go index 350f7c5d..ed14fb7b 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -4,21 +4,22 @@ import ( "context" "errors" "fmt" - "github.com/turbot/pipe-fittings/parse" "strings" "time" "github.com/danwakefield/fnmatch" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/exp/maps" + "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/error_helpers" + "github.com/turbot/pipe-fittings/parse" "github.com/turbot/tailpipe/internal/collector" "github.com/turbot/tailpipe/internal/config" "github.com/turbot/tailpipe/internal/plugin_manager" - "golang.org/x/exp/maps" ) // NOTE: the hard coded config that was previously defined here has been moved to hcl in the file tailpipe/internal/parse/test_data/configs/resources.tpc @@ -72,36 +73,36 @@ func runCollectCmd(cmd *cobra.Command, args []string) { func collectAndCompact(ctx context.Context, args []string) error { // collect the data - statusStrings, timingStrings, err := doCollect(ctx, args) + _, _, err := doCollect(ctx, args) if err != nil { return err } // compact the data - var compactStatusString string - if viper.GetBool(pconstants.ArgCompact) { - compactStatus, err := doCompaction(ctx) - // if the context was cancelled, we don't want to return an error - if err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("compaction error: %w", err) - } - compactStatusString = compactStatus.BriefString() - if ctx.Err() != nil { - // instead show the status as cancelled - compactStatusString = "Compaction cancelled: " + compactStatusString - } - } + //var compactStatusString string + //if viper.GetBool(pconstants.ArgCompact) { + // compactStatus, err := doCompaction(ctx) + // // if the context was cancelled, we don't want to return an error + // if err != nil && !errors.Is(err, context.Canceled) { + // return fmt.Errorf("compaction error: %w", err) + // } + // compactStatusString = compactStatus.BriefString() + // if ctx.Err() != nil { + // // instead show the status as cancelled + // compactStatusString = "Compaction cancelled: " + compactStatusString + // } + //} // now show the result - for i, statusString := range statusStrings { - fmt.Println(statusString) //nolint:forbidigo // ui output - if len(timingStrings) > i { - fmt.Println(timingStrings[i]) //nolint:forbidigo // ui output - } - } - if compactStatusString != "" { - fmt.Println(compactStatusString) //nolint:forbidigo // ui output - } + //for i, statusString := range statusStrings { + // fmt.Println(statusString) //nolint:forbidigo // ui output + // if len(timingStrings) > i { + // fmt.Println(timingStrings[i]) //nolint:forbidigo // ui output + // } + //} + //if compactStatusString != "" { + // fmt.Println(compactStatusString) //nolint:forbidigo // ui output + //} return nil } @@ -161,12 +162,18 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime partition.AddFilter(fmt.Sprintf("tp_timestamp >= '%s'", fromTime.Format("2006-01-02T15:04:05"))) } - if err := c.Collect(ctx, partition, fromTime); err != nil { + if err = c.Collect(ctx, partition, fromTime); err != nil { return "", "", err } // now wait for all collection to complete and close the collector c.WaitForCompletion(ctx) - return c.StatusString(), c.TimingString(), nil + + err = c.Compact(ctx) + if err != nil { + return "", "", err + } + + return "", "", nil } func getPartitions(args []string) ([]*config.Partition, error) { diff --git a/internal/collector/collector.go b/internal/collector/collector.go index ea9b5169..7e7d2502 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -78,6 +78,13 @@ func New(pluginManager *plugin_manager.PluginManager) (*Collector, error) { func (c *Collector) Close() { close(c.Events) + c.parquetWriter.Close() + + c.app.Send(CollectionCompleteMsg{}) + + // if inbox path is empty, remove it (ignore errors) + _ = os.Remove(c.sourcePath) + // delete the collection temp dir _ = os.RemoveAll(c.collectionTempDir) } @@ -214,11 +221,6 @@ func (c *Collector) WaitForCompletion(ctx context.Context) { slog.Error("error waiting for execution to complete", "error", err) } - c.parquetWriter.Close() - c.app.Send(collectionCompleteMsg{}) - - // if inbox path is empty, remove it (ignore errors) - _ = os.Remove(c.sourcePath) } func (c *Collector) StatusString() string { @@ -384,3 +386,14 @@ func (c *Collector) cleanupCollectionDir() { } } } + +func (c *Collector) Compact(ctx context.Context) error { + updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) { + c.app.Send(CompactionStatusUpdateMsg{status: &compactionStatus}) + } + err := parquet.CompactDataFiles(ctx, updateAppCompactionFunc) + if err != nil { + return fmt.Errorf("failed to compact data files: %w", err) + } + return nil +} diff --git a/internal/collector/tui.go b/internal/collector/tui.go index e7da216e..6ffa5d03 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -12,6 +12,7 @@ import ( "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe/internal/parquet" ) type collectionModel struct { @@ -36,9 +37,16 @@ type collectionModel struct { terminalWidth int initiated time.Time progressBarConfig progress.Model + + // compaction + compactionStatus *parquet.CompactionStatus } -type collectionCompleteMsg struct{} +type CollectionCompleteMsg struct{} + +type CompactionStatusUpdateMsg struct { + status *parquet.CompactionStatus +} func newCollectionModel(partitionName string, fromTime row_source.ResolvedFromTime) collectionModel { return collectionModel{ @@ -61,9 +69,9 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // TODO: Handle graceful exit return c, tea.Quit } - case collectionCompleteMsg: + case CollectionCompleteMsg: c.complete = true - return c, nil + return c, tea.Quit case status: c.path = t.LatestArtifactPath c.discovered = t.ArtifactsDiscovered @@ -76,6 +84,9 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { c.rowsConverted = t.RowsConverted c.rowsErrors = t.Errors return c, nil + case CompactionStatusUpdateMsg: + c.compactionStatus = t.status + return c, nil } return c, nil } @@ -95,23 +106,25 @@ func (c collectionModel) View() string { descriptionLength = 11 } + collectionComplete := c.complete || c.compactionStatus != nil + // header b.WriteString(fmt.Sprintf("\nCollecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) // artifacts if c.path != "" || c.discovered > 0 { - if c.complete { + if collectionComplete { // TODO: #tactical we should clear path in event once complete c.path = "" } b.WriteString("Artifacts:\n") b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &c.path)) - if c.complete { + if collectionComplete { b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) } else { - b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig, c.complete)) - b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig, c.complete)) + b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig, collectionComplete)) + b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig, collectionComplete)) } if c.errors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.errors, countLength, nil)) @@ -122,18 +135,25 @@ func (c collectionModel) View() string { // rows b.WriteString("Rows:\n") b.WriteString(writeCountLine("Received:", descriptionLength, c.rowsReceived, countLength, nil)) - if c.complete { + if collectionComplete { b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) } else { - b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig, c.complete)) - b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig, c.complete)) + b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig, collectionComplete)) + b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig, collectionComplete)) } if c.rowsErrors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.rowsErrors, countLength, nil)) } b.WriteString("\n") + // compaction + if c.compactionStatus != nil { + b.WriteString("Compaction:\n") + b.WriteString(fmt.Sprintf(" %s\n", c.compactionStatus.VerboseString())) + b.WriteString("\n") + } + // run time duration := time.Since(c.initiated) b.WriteString(fmt.Sprintf("Time: %s\n", utils.HumanizeDuration(duration))) From b7c7ca5c60aefeb322203a475a614a436e118db0 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 12:38:20 +0000 Subject: [PATCH 09/13] feat: collection UI --- internal/collector/collector.go | 4 ++-- internal/collector/tui.go | 21 +++++++++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 7e7d2502..65c44bd0 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -14,6 +14,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/sethvargo/go-retry" + "github.com/turbot/pipe-fittings/utils" "github.com/turbot/tailpipe-plugin-sdk/constants" "github.com/turbot/tailpipe-plugin-sdk/events" @@ -102,9 +103,8 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr if err != nil { return fmt.Errorf("failed to collect: %w", err) } - //fmt.Printf("Collecting partition '%s' from %s (%s)\n", partition.Name(), collectResponse.FromTime.Time.Format(time.DateTime), collectResponse.FromTime.Source) //nolint:forbidigo//UI output - c.app = tea.NewProgram(newCollectionModel(partition.FullName, *collectResponse.FromTime)) + c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime)) go c.app.Run() // TODO: #error handling of errors executionId := collectResponse.ExecutionId diff --git a/internal/collector/tui.go b/internal/collector/tui.go index 6ffa5d03..4b186cb1 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -85,8 +85,13 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { c.rowsErrors = t.Errors return c, nil case CompactionStatusUpdateMsg: - c.compactionStatus = t.status - return c, nil + if c.compactionStatus == nil { + c.compactionStatus = t.status + return c, nil + } else { + c.compactionStatus.Update(*t.status) + return c, nil + } } return c, nil } @@ -112,13 +117,21 @@ func (c collectionModel) View() string { b.WriteString(fmt.Sprintf("\nCollecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) // artifacts + displayPath := c.path if c.path != "" || c.discovered > 0 { if collectionComplete { // TODO: #tactical we should clear path in event once complete - c.path = "" + displayPath = "" } + if displayPath != "" { + if strings.Contains(displayPath, "/") { + displayPath = displayPath[strings.LastIndex(displayPath, "/")+1:] + } + displayPath = fmt.Sprintf("(%s)", displayPath) + } + b.WriteString("Artifacts:\n") - b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &c.path)) + b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &displayPath)) if collectionComplete { b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) From bfca225b349ce8284ef9249c4b61b5d2bcb4f012 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 12:40:40 +0000 Subject: [PATCH 10/13] feat: collection UI --- cmd/collect.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/cmd/collect.go b/cmd/collect.go index ed14fb7b..4b7f2885 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -78,32 +78,6 @@ func collectAndCompact(ctx context.Context, args []string) error { return err } - // compact the data - //var compactStatusString string - //if viper.GetBool(pconstants.ArgCompact) { - // compactStatus, err := doCompaction(ctx) - // // if the context was cancelled, we don't want to return an error - // if err != nil && !errors.Is(err, context.Canceled) { - // return fmt.Errorf("compaction error: %w", err) - // } - // compactStatusString = compactStatus.BriefString() - // if ctx.Err() != nil { - // // instead show the status as cancelled - // compactStatusString = "Compaction cancelled: " + compactStatusString - // } - //} - - // now show the result - //for i, statusString := range statusStrings { - // fmt.Println(statusString) //nolint:forbidigo // ui output - // if len(timingStrings) > i { - // fmt.Println(timingStrings[i]) //nolint:forbidigo // ui output - // } - //} - //if compactStatusString != "" { - // fmt.Println(compactStatusString) //nolint:forbidigo // ui output - //} - return nil } From f72a28d84808a9137d2c803ee9abc4b91cb2f9d0 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 14:01:12 +0000 Subject: [PATCH 11/13] refactor: removed unused params --- cmd/collect.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/cmd/collect.go b/cmd/collect.go index 652fd576..50d36bf5 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -75,7 +75,7 @@ func runCollectCmd(cmd *cobra.Command, args []string) { func collectAndCompact(ctx context.Context, args []string) error { // collect the data - _, _, err := doCollect(ctx, args) + err := doCollect(ctx, args) if err != nil { return err } @@ -83,19 +83,19 @@ func collectAndCompact(ctx context.Context, args []string) error { return nil } -func doCollect(ctx context.Context, args []string) ([]string, []string, error) { +func doCollect(ctx context.Context, args []string) error { var fromTime time.Time if viper.GetString(pconstants.ArgFrom) != "" { var err error fromTime, err = parse.ParseTime(viper.GetString(pconstants.ArgFrom), time.Now()) if err != nil { - return nil, nil, fmt.Errorf("failed to parse 'from' argument: %w", err) + return fmt.Errorf("failed to parse 'from' argument: %w", err) } } partitions, err := getPartitions(args) if err != nil { - return nil, nil, fmt.Errorf("failed to get partition config: %w", err) + return fmt.Errorf("failed to get partition config: %w", err) } // now we have the partitions, we can start collecting @@ -105,31 +105,26 @@ func doCollect(ctx context.Context, args []string) ([]string, []string, error) { defer pluginManager.Close() // collect each partition serially - statusStrings := make([]string, 0, len(partitions)) - timingStrings := make([]string, 0, len(partitions)) var errList []error for _, partition := range partitions { - statusString, timingString, err := collectPartition(ctx, partition, fromTime, pluginManager) + err = collectPartition(ctx, partition, fromTime, pluginManager) if err != nil { errList = append(errList, err) - } else { - statusStrings = append(statusStrings, statusString) - timingStrings = append(timingStrings, timingString) } } if len(errList) > 0 { err = errors.Join(errList...) - return nil, nil, fmt.Errorf("collection error: %w", err) + return fmt.Errorf("collection error: %w", err) } - return statusStrings, timingStrings, nil + return nil } -func collectPartition(ctx context.Context, partition *config.Partition, fromTime time.Time, pluginManager *plugin_manager.PluginManager) (string, string, error) { +func collectPartition(ctx context.Context, partition *config.Partition, fromTime time.Time, pluginManager *plugin_manager.PluginManager) error { c, err := collector.New(pluginManager) if err != nil { - return "", "", fmt.Errorf("failed to create collector: %w", err) + return fmt.Errorf("failed to create collector: %w", err) } defer c.Close() @@ -139,17 +134,17 @@ func collectPartition(ctx context.Context, partition *config.Partition, fromTime } if err = c.Collect(ctx, partition, fromTime); err != nil { - return "", "", err + return err } // now wait for all collection to complete and close the collector c.WaitForCompletion(ctx) err = c.Compact(ctx) if err != nil { - return "", "", err + return err } - return "", "", nil + return nil } func getPartitions(args []string) ([]*config.Partition, error) { From a1d87f3ed3fb1dd14402545f8d9f5e02f202e4ca Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 15:38:54 +0000 Subject: [PATCH 12/13] refactor: removed progress display, updated path/size to remove brackets --- internal/collector/collector.go | 3 +- internal/collector/tui.go | 92 +++++++++++++-------------------- 2 files changed, 38 insertions(+), 57 deletions(-) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 65c44bd0..c20efb97 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -81,7 +81,7 @@ func (c *Collector) Close() { c.parquetWriter.Close() - c.app.Send(CollectionCompleteMsg{}) + c.app.Send(CollectionFinishedMsg{}) // if inbox path is empty, remove it (ignore errors) _ = os.Remove(c.sourcePath) @@ -388,6 +388,7 @@ func (c *Collector) cleanupCollectionDir() { } func (c *Collector) Compact(ctx context.Context) error { + c.app.Send(AwaitingCompactionMsg{}) updateAppCompactionFunc := func(compactionStatus parquet.CompactionStatus) { c.app.Send(CompactionStatusUpdateMsg{status: &compactionStatus}) } diff --git a/internal/collector/tui.go b/internal/collector/tui.go index 4b186cb1..e861ebac 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -2,11 +2,9 @@ package collector import ( "fmt" - "math" "strings" "time" - "github.com/charmbracelet/bubbles/progress" "github.com/charmbracelet/bubbletea" "github.com/dustin/go-humanize" @@ -33,16 +31,17 @@ type collectionModel struct { rowsConverted int64 rowsErrors int64 - complete bool - terminalWidth int - initiated time.Time - progressBarConfig progress.Model + complete bool + terminalWidth int + initiated time.Time // compaction compactionStatus *parquet.CompactionStatus } -type CollectionCompleteMsg struct{} +type CollectionFinishedMsg struct{} + +type AwaitingCompactionMsg struct{} type CompactionStatusUpdateMsg struct { status *parquet.CompactionStatus @@ -50,10 +49,10 @@ type CompactionStatusUpdateMsg struct { func newCollectionModel(partitionName string, fromTime row_source.ResolvedFromTime) collectionModel { return collectionModel{ - partitionName: partitionName, - fromTime: fromTime, - initiated: time.Now(), - progressBarConfig: progress.New(progress.WithWidth(20), progress.WithFillCharacters('=', '-'), progress.WithColorProfile(3), progress.WithoutPercentage()), + partitionName: partitionName, + fromTime: fromTime, + initiated: time.Now(), + compactionStatus: nil, } } @@ -69,8 +68,7 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // TODO: Handle graceful exit return c, tea.Quit } - case CollectionCompleteMsg: - c.complete = true + case CollectionFinishedMsg: return c, tea.Quit case status: c.path = t.LatestArtifactPath @@ -84,6 +82,11 @@ func (c collectionModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { c.rowsConverted = t.RowsConverted c.rowsErrors = t.Errors return c, nil + case AwaitingCompactionMsg: + cs := parquet.CompactionStatus{} + c.complete = true + c.compactionStatus = &cs + return c, nil case CompactionStatusUpdateMsg: if c.compactionStatus == nil { c.compactionStatus = t.status @@ -100,9 +103,10 @@ func (c collectionModel) View() string { var b strings.Builder var countLength int = 5 var descriptionLength int = 12 + countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) - downloadedDisplay := fmt.Sprintf("(%s)", humanize.Bytes((uint64)(c.downloadedBytes))) + downloadedDisplay := humanize.Bytes((uint64)(c.downloadedBytes)) if countArtifactsDisplayLen > countLength { countLength = countArtifactsDisplayLen } @@ -112,33 +116,28 @@ func (c collectionModel) View() string { } collectionComplete := c.complete || c.compactionStatus != nil + displayPath := c.path + timeLabel := "Time:" + + if collectionComplete { + // TODO: #tactical we should clear path in event once complete + displayPath = "" + timeLabel = "Completed:" + } // header b.WriteString(fmt.Sprintf("\nCollecting logs for %s from %s (%s)\n\n", c.partitionName, c.fromTime.Time.Format("2006-01-02"), c.fromTime.Source)) // artifacts - displayPath := c.path if c.path != "" || c.discovered > 0 { - if collectionComplete { - // TODO: #tactical we should clear path in event once complete - displayPath = "" - } - if displayPath != "" { - if strings.Contains(displayPath, "/") { - displayPath = displayPath[strings.LastIndex(displayPath, "/")+1:] - } - displayPath = fmt.Sprintf("(%s)", displayPath) + if strings.Contains(displayPath, "/") { + displayPath = displayPath[strings.LastIndex(displayPath, "/")+1:] } b.WriteString("Artifacts:\n") b.WriteString(writeCountLine("Discovered:", descriptionLength, c.discovered, countLength, &displayPath)) - if collectionComplete { - b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) - b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) - } else { - b.WriteString(writeProgressLine("Downloaded:", descriptionLength, c.downloaded, countLength, float64(c.downloaded)/float64(c.discovered), &downloadedDisplay, &c.progressBarConfig, collectionComplete)) - b.WriteString(writeProgressLine("Extracted:", descriptionLength, c.extracted, countLength, float64(c.extracted)/float64(c.discovered), nil, &c.progressBarConfig, collectionComplete)) - } + b.WriteString(writeCountLine("Downloaded:", descriptionLength, c.downloaded, countLength, &downloadedDisplay)) + b.WriteString(writeCountLine("Extracted:", descriptionLength, c.extracted, countLength, nil)) if c.errors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.errors, countLength, nil)) } @@ -148,13 +147,8 @@ func (c collectionModel) View() string { // rows b.WriteString("Rows:\n") b.WriteString(writeCountLine("Received:", descriptionLength, c.rowsReceived, countLength, nil)) - if collectionComplete { - b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) - b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) - } else { - b.WriteString(writeProgressLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, float64(c.rowsEnriched)/float64(c.rowsReceived), nil, &c.progressBarConfig, collectionComplete)) - b.WriteString(writeProgressLine("Converted:", descriptionLength, c.rowsConverted, countLength, float64(c.rowsConverted)/float64(c.rowsReceived), nil, &c.progressBarConfig, collectionComplete)) - } + b.WriteString(writeCountLine("Enriched:", descriptionLength, c.rowsEnriched, countLength, nil)) + b.WriteString(writeCountLine("Converted:", descriptionLength, c.rowsConverted, countLength, nil)) if c.rowsErrors > 0 { b.WriteString(writeCountLine("Errors:", descriptionLength, c.rowsErrors, countLength, nil)) } @@ -162,14 +156,15 @@ func (c collectionModel) View() string { // compaction if c.compactionStatus != nil { - b.WriteString("Compaction:\n") - b.WriteString(fmt.Sprintf(" %s\n", c.compactionStatus.VerboseString())) + b.WriteString("File Compaction:\n") + b.WriteString(fmt.Sprintf(" Compacted: %d => %d\n", c.compactionStatus.Source, c.compactionStatus.Dest)) + b.WriteString(fmt.Sprintf(" Skipped: %d\n", c.compactionStatus.Uncompacted)) b.WriteString("\n") } // run time duration := time.Since(c.initiated) - b.WriteString(fmt.Sprintf("Time: %s\n", utils.HumanizeDuration(duration))) + b.WriteString(fmt.Sprintf("%s %s\n", timeLabel, utils.HumanizeDuration(duration))) return b.String() } @@ -181,18 +176,3 @@ func writeCountLine(desc string, descLen int, count int64, maxCountLen int, suff } return fmt.Sprintf(" %-*s%*s %s\n", descLen, desc, maxCountLen, humanize.Comma(count), s) } - -func writeProgressLine(desc string, descLen int, count int64, maxCountLen int, percent float64, suffix *string, pb *progress.Model, complete bool) string { - s := "" - if suffix != nil { - s = *suffix - } - if math.IsNaN(percent) { - percent = 0 - } - // TODO: #hack review - essentially if we're not complete, we shouldn't be 100% on any progress bar - if !complete && percent >= 1.0 { - percent = 0.99 - } - return fmt.Sprintf(" %-*s%*s [%s] %3.0f%% %s\n", descLen, desc, maxCountLen, humanize.Comma(count), pb.ViewAs(percent), math.Floor(percent*100), s) -} From fb76b3e454b4f10526107a0ea8321af307176844 Mon Sep 17 00:00:00 2001 From: Graza Date: Tue, 21 Jan 2025 17:03:10 +0000 Subject: [PATCH 13/13] refactor: updated timings on polling & lint fixes --- cmd/collect.go | 5 ----- internal/collector/collector.go | 29 +++++++++++++++++++---------- internal/collector/tui.go | 12 ++++++++---- internal/display/plugin.go | 2 +- 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/cmd/collect.go b/cmd/collect.go index 50d36bf5..660ac17e 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -14,7 +14,6 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" - "github.com/turbot/pipe-fittings/constants" pconstants "github.com/turbot/pipe-fittings/constants" "github.com/turbot/pipe-fittings/error_helpers" "github.com/turbot/pipe-fittings/parse" @@ -242,7 +241,3 @@ func setExitCodeForCollectError(err error) { // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106 exitCode = 1 } - -func shouldShowCollectTiming() bool { - return viper.GetBool(constants.ArgTiming) -} diff --git a/internal/collector/collector.go b/internal/collector/collector.go index c20efb97..3a5f6afa 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -105,6 +105,7 @@ func (c *Collector) Collect(ctx context.Context, partition *config.Partition, fr } c.app = tea.NewProgram(newCollectionModel(partition.GetUnqualifiedName(), *collectResponse.FromTime)) + //nolint:errcheck // handle this later go c.app.Run() // TODO: #error handling of errors executionId := collectResponse.ExecutionId @@ -259,7 +260,7 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl // TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1 executionTimeout := executionMaxDuration - retryInterval := 5 * time.Second + retryInterval := 200 * time.Millisecond c.execution.totalRows = ce.RowCount c.execution.chunkCount = ce.ChunkCount @@ -296,15 +297,8 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl // not all chunks have been written return retry.RetryableError(fmt.Errorf("not all chunks have been written")) } - // so we are done writing chunks - now update the db to add a view to this data - // Open a DuckDB connection - db, err := sql.Open("duckdb", filepaths.TailpipeDbFilePath()) - if err != nil { - return err - } - defer db.Close() - return database.AddTableView(ctx, c.execution.table, db) + return nil }) slog.Debug("waitForConversions - all chunks written", "execution", c.execution.id) @@ -317,6 +311,18 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl return err } + // so we are done writing chunks - now update the db to add a view to this data + // Open a DuckDB connection + db, err := sql.Open("duckdb", filepaths.TailpipeDbFilePath()) + if err != nil { + return err + } + defer db.Close() + + err = database.AddTableView(ctx, c.execution.table, db) + if err != nil { + return err + } // notify the writer that the collection is complete return c.parquetWriter.JobGroupComplete(ce.ExecutionId) } @@ -325,7 +331,9 @@ func (c *Collector) waitForConversions(ctx context.Context, ce *proto.EventCompl func (c *Collector) waitForExecution(ctx context.Context) error { // TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1 executionTimeout := executionMaxDuration - retryInterval := 500 * time.Millisecond + retryInterval := 100 * time.Millisecond + + slog.Error("waiting for execution") err := retry.Do(ctx, retry.WithMaxDuration(executionTimeout, retry.NewConstant(retryInterval)), func(ctx context.Context) error { switch c.execution.state { @@ -343,6 +351,7 @@ func (c *Collector) waitForExecution(ctx context.Context) error { } return err } + slog.Error("done waiting for execution") return nil } diff --git a/internal/collector/tui.go b/internal/collector/tui.go index e861ebac..31c72db8 100644 --- a/internal/collector/tui.go +++ b/internal/collector/tui.go @@ -31,9 +31,8 @@ type collectionModel struct { rowsConverted int64 rowsErrors int64 - complete bool - terminalWidth int - initiated time.Time + complete bool + initiated time.Time // compaction compactionStatus *parquet.CompactionStatus @@ -103,10 +102,15 @@ func (c collectionModel) View() string { var b strings.Builder var countLength int = 5 var descriptionLength int = 12 + var downloadedDisplay string countArtifactsDisplayLen := len(humanize.Comma(c.discovered)) countRowsDisplayLen := len(humanize.Comma(c.rowsReceived)) - downloadedDisplay := humanize.Bytes((uint64)(c.downloadedBytes)) + if c.downloadedBytes < 0 { + downloadedDisplay = "0 B" // Handle negative values gracefully + } else { + downloadedDisplay = humanize.Bytes(uint64(c.downloadedBytes)) + } if countArtifactsDisplayLen > countLength { countLength = countArtifactsDisplayLen } diff --git a/internal/display/plugin.go b/internal/display/plugin.go index d3c11400..dbd5d77d 100644 --- a/internal/display/plugin.go +++ b/internal/display/plugin.go @@ -89,7 +89,7 @@ func GetPluginResource(ctx context.Context, name string) (*PluginResource, error slices.Sort(sources) var tables []string - for table, _ := range desc.TableSchemas { + for table := range desc.TableSchemas { tables = append(tables, table) } slices.Sort(tables)