这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func getPartitionConfig(partitionNames []string) ([]*config.Partition, error) {
}

if len(errorList) > 0 {
// TODO errors better formating/error message https://github.com/turbot/tailpipe/issues/35
// TODO #errors better formating/error message https://github.com/turbot/tailpipe/issues/106
return nil, errors.Join(errorList...)
}

Expand Down Expand Up @@ -217,6 +217,6 @@ func setExitCodeForCollectError(err error) {
return
}

// TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/35
// TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106
exitCode = 1
}
2 changes: 1 addition & 1 deletion cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func connectCmd() *cobra.Command {
}

func runConnectCmd(cmd *cobra.Command, _ []string) {
// TODO K cancellation?
// TODO #cancellation cancellation? https://github.com/turbot/tailpipe/issues/88
//ctx, cancel := context.WithCancel(cmd.Context())
//contexthelpers.StartCancelHandler(cancel)

Expand Down
18 changes: 1 addition & 17 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -12,7 +11,6 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/thediveo/enumflag/v2"

"github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/cmdconfig"
pconstants "github.com/turbot/pipe-fittings/constants"
Expand Down Expand Up @@ -187,7 +185,7 @@ func pluginShowCmd() *cobra.Command {
Use: "show <plugin>",
Args: cobra.ExactArgs(1),
Run: runPluginShowCmd,
// TODO improve descriptions
// TODO improve descriptions https://github.com/turbot/tailpipe/issues/111
Short: "Show details of a plugin",
Long: `Show the tables and sources provided by plugin`,
}
Expand Down Expand Up @@ -259,20 +257,6 @@ func runPluginInstallCmd(cmd *cobra.Command, args []string) {
showProgress := viper.GetBool(pconstants.ArgProgress)
installReports := make(pplugin.PluginInstallReports, 0, len(plugins))

if len(plugins) == 0 {
if len(config.GlobalConfig.Plugins) == 0 {
error_helpers.ShowError(ctx, errors.New("no plugins installed"))
exitCode = pconstants.ExitCodeInsufficientOrWrongInputs
return
}

// get the list of plugins to install
for imageRef := range config.GlobalConfig.Plugins {
ref := pociinstaller.NewImageRef(imageRef)
plugins = append(plugins, ref.GetFriendlyName())
}
}

state, err := installationstate.Load()
if err != nil {
error_helpers.ShowError(ctx, fmt.Errorf("could not load state"))
Expand Down
2 changes: 1 addition & 1 deletion cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func setExitCodeForQueryError(err error) {
return
}

// TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/35
// TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106
exitCode = 1
}
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func rootCommand() *cobra.Command {

rootCmd.SetVersionTemplate("Tailpipe v{{.Version}}\n")

// TODO #config this will not reflect changes to install-dir - do we need to default in a different way
// TODO #config this will not reflect changes to install-dir - do we need to default in a different way https://github.com/turbot/tailpipe/issues/112
defaultConfigPath := filepaths.EnsureConfigDir()

cmdconfig.
Expand Down
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/turbot/go-kit v0.10.0-rc.0
github.com/turbot/pipe-fittings v1.5.2
github.com/turbot/pipe-fittings v1.7.0
// main
github.com/turbot/tailpipe-plugin-sdk v0.0.0-20240418033256-15206bee92ce
github.com/zclconf/go-cty v1.14.4
Expand Down Expand Up @@ -90,6 +90,7 @@ require (
github.com/containerd/log v0.1.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/elastic/go-grok v0.3.1 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -112,7 +113,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gosuri/uilive v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand All @@ -139,12 +139,12 @@ require (
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jedib0t/go-pretty/v6 v6.5.9 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/karrick/gows v0.3.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
Expand All @@ -169,11 +169,9 @@ require (
github.com/rs/xid v1.5.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/satyrius/gonx v1.4.0 // indirect
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smarty/assertions v1.16.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand All @@ -186,6 +184,7 @@ require (
github.com/tkrajina/go-reflector v0.5.6 // indirect
github.com/turbot/pipes-sdk-go v0.9.1 // indirect
github.com/turbot/steampipe-plugin-code v0.7.0 // indirect
github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0 // indirect
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
16 changes: 6 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elastic/go-grok v0.3.1 h1:WEhUxe2KrwycMnlvMimJXvzRa7DoByJB4PVUIE1ZD/U=
github.com/elastic/go-grok v0.3.1/go.mod h1:n38ls8ZgOboZRgKcjMY8eFeZFMmcL9n2lP0iHhIDk64=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -486,8 +488,6 @@ github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMd
github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDPT0hH1s=
github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY=
github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI=
github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw=
Expand Down Expand Up @@ -564,8 +564,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/karrick/gows v0.3.0 h1:/FGSuBiJMUqNOJPsAdLvHFg7RnkFoWBS8USpdco5ONQ=
github.com/karrick/gows v0.3.0/go.mod h1:kdZ/jfdo8yqKYn+BMjBkhP+/oRKUABR1abaomzRi/n8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand All @@ -589,6 +587,8 @@ github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczG
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/marcboeker/go-duckdb v1.8.3 h1:ZkYwiIZhbYsT6MmJsZ3UPTHrTZccDdM4ztoqSlEMXiQ=
Expand Down Expand Up @@ -676,8 +676,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg=
github.com/satyrius/gonx v1.4.0/go.mod h1:+r8KNe5d2tjkZU+DfhERo0G6KxkGih+1qYF6tqLHwvk=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs=
Expand All @@ -686,10 +684,6 @@ github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKl
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smarty/assertions v1.16.0 h1:EvHNkdRA4QHMrn75NZSoUQ/mAUXAYWfatfB01yTCzfY=
github.com/smarty/assertions v1.16.0/go.mod h1:duaaFdCS0K9dnoM50iyek/eYINOZ64gbh1Xlf6LG7AI=
github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY=
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down Expand Up @@ -739,6 +733,8 @@ github.com/turbot/pipes-sdk-go v0.9.1 h1:2yRojY2wymvJn6NQyE6A0EDFV267MNe+yDLxPVv
github.com/turbot/pipes-sdk-go v0.9.1/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
github.com/turbot/steampipe-plugin-code v0.7.0 h1:SROYIo/TI/Q/YNfXK+sAIS71umypUFm1Uz851TmoJkM=
github.com/turbot/steampipe-plugin-code v0.7.0/go.mod h1:GvdjncWum4sZNmR0iM03SKkIzl7aZKAFtIsyAR+z4YI=
github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0 h1:e/5EYO7B7UZW6joxO/wqtJGYFu+7NMCqMk/tPVbquFY=
github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0/go.mod h1:tYRC7FDKPTZ3MSty/tGLtH6UnVpU3zs1osF5DuktB5Q=
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg=
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7/go.mod h1:5hzpfalEjfcJWp9yq75/EZoEu2Mzm34eJAPm3HOW2tw=
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
Expand Down
2 changes: 1 addition & 1 deletion internal/cmdconfig/cmd_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func preRunHook(cmd *cobra.Command, args []string) error {
ew := initGlobalConfig(ctx)
// display any warnings
ew.ShowWarnings()
// TODO #errors sort exit code https://github.com/turbot/tailpipe/issues/35
// TODO #errors sort exit code https://github.com/turbot/tailpipe/issues/106
// check for error
error_helpers.FailOnError(ew.Error)

Expand Down
34 changes: 19 additions & 15 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ func New(ctx context.Context) (*Collector, error) {
}

c.parquetWriter = parquetWriter

if err != nil {
return nil, fmt.Errorf("failed to create colleciton state path: %w", err)
}
c.collectionStateRepository, err = collection_state.NewRepository()
if err != nil {
return nil, fmt.Errorf("failed to create collection state repository: %w", err)
Expand Down Expand Up @@ -186,7 +182,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) {
c.executionsLock.RUnlock()
if !ok {
slog.Error("Event_ChunkWrittenEvent - execution not found", "execution", executionId)
// TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35
// TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106
return
}

Expand All @@ -201,14 +197,14 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) {
err := c.parquetWriter.AddJob(executionId, chunkNumber)
if err != nil {
slog.Error("failed to add chunk to parquet writer", "error", err)
// TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35
// TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106
}
// store collection state data
if len(ev.CollectionState) > 0 {
err = c.collectionStateRepository.Save(execution.partition, string(ev.CollectionState))
if err != nil {
slog.Error("failed to save collection state data", "error", err)
// TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35
// TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106
}
}
case *proto.Event_CompleteEvent:
Expand Down Expand Up @@ -237,7 +233,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) {

// start thread waiting for execution to complete
// - this will wait for all parquet files to be written, and will then combine these into a single parquet file
// TODO #errors x what to do with an error here? https://github.com/turbot/tailpipe/issues/35
// TODO #errors x what to do with an error here? https://github.com/turbot/tailpipe/issues/106
go func() {
err := c.waitForExecution(ctx, completedEvent)
if err != nil {
Expand All @@ -246,9 +242,17 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) {
}()

case *proto.Event_ErrorEvent:
slog.Error("Event_ErrorEvent", "error", e.GetErrorEvent().Error)
// TODO #errors x what to do with an error here?

ev := e.GetErrorEvent()
// set the error on the execution
c.executionsLock.RLock()
execution, ok := c.executions[ev.ExecutionId]
c.executionsLock.RUnlock()
if !ok {
slog.Error("Error Event - execution not found", "execution", ev.ExecutionId)
return
}
execution.state = ExecutionState_ERROR
execution.error = fmt.Errorf("plugin error: %s", ev.Error)
}
}

Expand All @@ -259,7 +263,7 @@ func (c *Collector) WaitForCompletion(ctx context.Context) {
// wait for any ongoing partitions to complete
err := c.waitForExecutions(ctx)
if err != nil {
// TODO #errors x https://github.com/turbot/tailpipe/issues/35
// TODO #errors x https://github.com/turbot/tailpipe/issues/106
slog.Error("error waiting for executions to complete", "error", err)
}

Expand All @@ -272,7 +276,7 @@ func (c *Collector) WaitForCompletion(ctx context.Context) {
}

func (c *Collector) StatusString() string {
// TODO K we need to test multiple executions https://github.com/turbot/tailpipe/issues/71
// TODO #testing we need to test multiple executions https://github.com/turbot/tailpipe/issues/71
var str strings.Builder
str.WriteString("Collection complete.\n\n")
str.WriteString(c.status.String())
Expand Down Expand Up @@ -340,7 +344,7 @@ func (c *Collector) waitForExecution(ctx context.Context, ce *proto.EventComplet
// check chunk count - ask the parquet writer how many chunks have been written
chunksWritten, err := c.parquetWriter.GetChunksWritten(ce.ExecutionId)
if err != nil {
return fmt.Errorf("failed to get chunksWritten written: %w", err)
return err
}

// if no chunks have been written, we are done
Expand Down Expand Up @@ -385,7 +389,7 @@ func (c *Collector) waitForExecution(ctx context.Context, ce *proto.EventComplet
func (c *Collector) waitForExecutions(ctx context.Context) error {
// TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1
executionTimeout := executionMaxDuration
retryInterval := 5 * time.Second
retryInterval := 500 * time.Millisecond

err := retry.Do(ctx, retry.WithMaxDuration(executionTimeout, retry.NewConstant(retryInterval)), func(ctx context.Context) error {
c.executionsLock.RLock()
Expand Down
5 changes: 3 additions & 2 deletions internal/collector/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newExecution(executionId string, part *config.Partition) *execution {
e := &execution{
id: executionId,
partition: part.UnqualifiedName,
table: part.Table,
table: part.TableName,
plugin: part.Plugin.Alias,
state: ExecutionState_PENDING,
}
Expand All @@ -66,7 +66,8 @@ func (e *execution) done(err error) {
if err != nil {
e.state = ExecutionState_ERROR
e.error = err
} else {
} else if e.state != ExecutionState_ERROR {
// if state has not already been set to error, set to complete
e.state = ExecutionState_COMPLETE
}
e.executionTiming.End = time.Now()
Expand Down
7 changes: 6 additions & 1 deletion internal/config/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import (
"github.com/hashicorp/hcl/v2"
"github.com/turbot/pipe-fittings/hclhelpers"
"github.com/turbot/pipe-fittings/modconfig"
"github.com/turbot/pipe-fittings/schema"
"github.com/turbot/tailpipe-plugin-sdk/grpc/proto"
)

func init() {
// we have a subtype - register it and ALSO implement GetSubType
registerResourceWithSubType(schema.BlockTypeConnection)
}

type TailpipeConnection struct {
modconfig.HclResourceImpl
// TODO K rather than plugin - just use a name which in practice will be the plugin name
Plugin string `cty:"plugin"`
Hcl []byte `cty:"hcl"`
// the hcl range for the connection - use our version so we can sty serialise it
Expand Down
Loading
Loading