diff --git a/cmd/collect.go b/cmd/collect.go index 5c6bd008..7317b560 100644 --- a/cmd/collect.go +++ b/cmd/collect.go @@ -116,7 +116,7 @@ func getPartitionConfig(partitionNames []string) ([]*config.Partition, error) { } if len(errorList) > 0 { - // TODO errors better formating/error message https://github.com/turbot/tailpipe/issues/35 + // TODO #errors better formating/error message https://github.com/turbot/tailpipe/issues/106 return nil, errors.Join(errorList...) } @@ -217,6 +217,6 @@ func setExitCodeForCollectError(err error) { return } - // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/35 + // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106 exitCode = 1 } diff --git a/cmd/connect.go b/cmd/connect.go index d2d0e3a3..7ecf0f3f 100644 --- a/cmd/connect.go +++ b/cmd/connect.go @@ -48,7 +48,7 @@ func connectCmd() *cobra.Command { } func runConnectCmd(cmd *cobra.Command, _ []string) { - // TODO K cancellation? + // TODO #cancellation cancellation? https://github.com/turbot/tailpipe/issues/88 //ctx, cancel := context.WithCancel(cmd.Context()) //contexthelpers.StartCancelHandler(cancel) diff --git a/cmd/plugin.go b/cmd/plugin.go index e16ffc0e..4af340e0 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "errors" "fmt" "strings" "sync" @@ -12,7 +11,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/thediveo/enumflag/v2" - "github.com/turbot/go-kit/helpers" "github.com/turbot/pipe-fittings/cmdconfig" pconstants "github.com/turbot/pipe-fittings/constants" @@ -187,7 +185,7 @@ func pluginShowCmd() *cobra.Command { Use: "show ", Args: cobra.ExactArgs(1), Run: runPluginShowCmd, - // TODO improve descriptions + // TODO improve descriptions https://github.com/turbot/tailpipe/issues/111 Short: "Show details of a plugin", Long: `Show the tables and sources provided by plugin`, } @@ -259,20 +257,6 @@ func runPluginInstallCmd(cmd *cobra.Command, args []string) { showProgress := viper.GetBool(pconstants.ArgProgress) installReports := make(pplugin.PluginInstallReports, 0, len(plugins)) - if len(plugins) == 0 { - if len(config.GlobalConfig.Plugins) == 0 { - error_helpers.ShowError(ctx, errors.New("no plugins installed")) - exitCode = pconstants.ExitCodeInsufficientOrWrongInputs - return - } - - // get the list of plugins to install - for imageRef := range config.GlobalConfig.Plugins { - ref := pociinstaller.NewImageRef(imageRef) - plugins = append(plugins, ref.GetFriendlyName()) - } - } - state, err := installationstate.Load() if err != nil { error_helpers.ShowError(ctx, fmt.Errorf("could not load state")) diff --git a/cmd/query.go b/cmd/query.go index 79b20a4c..fc0a2f31 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -71,6 +71,6 @@ func setExitCodeForQueryError(err error) { return } - // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/35 + // TODO #errors - assign exit codes https://github.com/turbot/tailpipe/issues/106 exitCode = 1 } diff --git a/cmd/root.go b/cmd/root.go index ed059c65..3cab6543 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,7 +33,7 @@ func rootCommand() *cobra.Command { rootCmd.SetVersionTemplate("Tailpipe v{{.Version}}\n") - // TODO #config this will not reflect changes to install-dir - do we need to default in a different way + // TODO #config this will not reflect changes to install-dir - do we need to default in a different way https://github.com/turbot/tailpipe/issues/112 defaultConfigPath := filepaths.EnsureConfigDir() cmdconfig. diff --git a/go.mod b/go.mod index 97a3f524..7129f05a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/turbot/go-kit v0.10.0-rc.0 - github.com/turbot/pipe-fittings v1.5.2 + github.com/turbot/pipe-fittings v1.7.0 // main github.com/turbot/tailpipe-plugin-sdk v0.0.0-20240418033256-15206bee92ce github.com/zclconf/go-cty v1.14.4 @@ -90,6 +90,7 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/elastic/go-grok v0.3.1 // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -112,7 +113,6 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect - github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/gosuri/uilive v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -139,12 +139,12 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jedib0t/go-pretty/v6 v6.5.9 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/karrick/gows v0.3.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/magefile/mage v1.15.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-localereader v0.0.1 // indirect @@ -169,11 +169,9 @@ require ( github.com/rs/xid v1.5.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect - github.com/satyrius/gonx v1.4.0 // indirect github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/smarty/assertions v1.16.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect @@ -186,6 +184,7 @@ require ( github.com/tkrajina/go-reflector v0.5.6 // indirect github.com/turbot/pipes-sdk-go v0.9.1 // indirect github.com/turbot/steampipe-plugin-code v0.7.0 // indirect + github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0 // indirect github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 // indirect github.com/ulikunitz/xz v0.5.10 // indirect github.com/xlab/treeprint v1.2.0 // indirect diff --git a/go.sum b/go.sum index 4e0bc9e3..e495366c 100644 --- a/go.sum +++ b/go.sum @@ -312,6 +312,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/elastic/go-grok v0.3.1 h1:WEhUxe2KrwycMnlvMimJXvzRa7DoByJB4PVUIE1ZD/U= +github.com/elastic/go-grok v0.3.1/go.mod h1:n38ls8ZgOboZRgKcjMY8eFeZFMmcL9n2lP0iHhIDk64= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU= github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -486,8 +488,6 @@ github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMd github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDPT0hH1s= github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= -github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= -github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/gosuri/uiprogress v0.0.1 h1:0kpv/XY/qTmFWl/SkaJykZXrBBzwwadmW8fRb7RJSxw= @@ -564,8 +564,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= -github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/karrick/gows v0.3.0 h1:/FGSuBiJMUqNOJPsAdLvHFg7RnkFoWBS8USpdco5ONQ= github.com/karrick/gows v0.3.0/go.mod h1:kdZ/jfdo8yqKYn+BMjBkhP+/oRKUABR1abaomzRi/n8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -589,6 +587,8 @@ github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczG github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/marcboeker/go-duckdb v1.8.3 h1:ZkYwiIZhbYsT6MmJsZ3UPTHrTZccDdM4ztoqSlEMXiQ= @@ -676,8 +676,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg= -github.com/satyrius/gonx v1.4.0/go.mod h1:+r8KNe5d2tjkZU+DfhERo0G6KxkGih+1qYF6tqLHwvk= github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs= @@ -686,10 +684,6 @@ github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKl github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smarty/assertions v1.16.0 h1:EvHNkdRA4QHMrn75NZSoUQ/mAUXAYWfatfB01yTCzfY= -github.com/smarty/assertions v1.16.0/go.mod h1:duaaFdCS0K9dnoM50iyek/eYINOZ64gbh1Xlf6LG7AI= -github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY= -github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -739,6 +733,8 @@ github.com/turbot/pipes-sdk-go v0.9.1 h1:2yRojY2wymvJn6NQyE6A0EDFV267MNe+yDLxPVv github.com/turbot/pipes-sdk-go v0.9.1/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc= github.com/turbot/steampipe-plugin-code v0.7.0 h1:SROYIo/TI/Q/YNfXK+sAIS71umypUFm1Uz851TmoJkM= github.com/turbot/steampipe-plugin-code v0.7.0/go.mod h1:GvdjncWum4sZNmR0iM03SKkIzl7aZKAFtIsyAR+z4YI= +github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0 h1:e/5EYO7B7UZW6joxO/wqtJGYFu+7NMCqMk/tPVbquFY= +github.com/turbot/steampipe-plugin-sdk/v5 v5.8.0/go.mod h1:tYRC7FDKPTZ3MSty/tGLtH6UnVpU3zs1osF5DuktB5Q= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7/go.mod h1:5hzpfalEjfcJWp9yq75/EZoEu2Mzm34eJAPm3HOW2tw= github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= diff --git a/internal/cmdconfig/cmd_hooks.go b/internal/cmdconfig/cmd_hooks.go index f16075c7..c55f0dd4 100644 --- a/internal/cmdconfig/cmd_hooks.go +++ b/internal/cmdconfig/cmd_hooks.go @@ -44,7 +44,7 @@ func preRunHook(cmd *cobra.Command, args []string) error { ew := initGlobalConfig(ctx) // display any warnings ew.ShowWarnings() - // TODO #errors sort exit code https://github.com/turbot/tailpipe/issues/35 + // TODO #errors sort exit code https://github.com/turbot/tailpipe/issues/106 // check for error error_helpers.FailOnError(ew.Error) diff --git a/internal/collector/collector.go b/internal/collector/collector.go index ec940dfc..028e07b2 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -71,10 +71,6 @@ func New(ctx context.Context) (*Collector, error) { } c.parquetWriter = parquetWriter - - if err != nil { - return nil, fmt.Errorf("failed to create colleciton state path: %w", err) - } c.collectionStateRepository, err = collection_state.NewRepository() if err != nil { return nil, fmt.Errorf("failed to create collection state repository: %w", err) @@ -186,7 +182,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { c.executionsLock.RUnlock() if !ok { slog.Error("Event_ChunkWrittenEvent - execution not found", "execution", executionId) - // TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35 + // TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106 return } @@ -201,14 +197,14 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { err := c.parquetWriter.AddJob(executionId, chunkNumber) if err != nil { slog.Error("failed to add chunk to parquet writer", "error", err) - // TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35 + // TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106 } // store collection state data if len(ev.CollectionState) > 0 { err = c.collectionStateRepository.Save(execution.partition, string(ev.CollectionState)) if err != nil { slog.Error("failed to save collection state data", "error", err) - // TODO #errors x what to do with this error? https://github.com/turbot/tailpipe/issues/35 + // TODO #errors what to do with this error? https://github.com/turbot/tailpipe/issues/106 } } case *proto.Event_CompleteEvent: @@ -237,7 +233,7 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { // start thread waiting for execution to complete // - this will wait for all parquet files to be written, and will then combine these into a single parquet file - // TODO #errors x what to do with an error here? https://github.com/turbot/tailpipe/issues/35 + // TODO #errors x what to do with an error here? https://github.com/turbot/tailpipe/issues/106 go func() { err := c.waitForExecution(ctx, completedEvent) if err != nil { @@ -246,9 +242,17 @@ func (c *Collector) handlePluginEvent(ctx context.Context, e *proto.Event) { }() case *proto.Event_ErrorEvent: - slog.Error("Event_ErrorEvent", "error", e.GetErrorEvent().Error) - // TODO #errors x what to do with an error here? - + ev := e.GetErrorEvent() + // set the error on the execution + c.executionsLock.RLock() + execution, ok := c.executions[ev.ExecutionId] + c.executionsLock.RUnlock() + if !ok { + slog.Error("Error Event - execution not found", "execution", ev.ExecutionId) + return + } + execution.state = ExecutionState_ERROR + execution.error = fmt.Errorf("plugin error: %s", ev.Error) } } @@ -259,7 +263,7 @@ func (c *Collector) WaitForCompletion(ctx context.Context) { // wait for any ongoing partitions to complete err := c.waitForExecutions(ctx) if err != nil { - // TODO #errors x https://github.com/turbot/tailpipe/issues/35 + // TODO #errors x https://github.com/turbot/tailpipe/issues/106 slog.Error("error waiting for executions to complete", "error", err) } @@ -272,7 +276,7 @@ func (c *Collector) WaitForCompletion(ctx context.Context) { } func (c *Collector) StatusString() string { - // TODO K we need to test multiple executions https://github.com/turbot/tailpipe/issues/71 + // TODO #testing we need to test multiple executions https://github.com/turbot/tailpipe/issues/71 var str strings.Builder str.WriteString("Collection complete.\n\n") str.WriteString(c.status.String()) @@ -340,7 +344,7 @@ func (c *Collector) waitForExecution(ctx context.Context, ce *proto.EventComplet // check chunk count - ask the parquet writer how many chunks have been written chunksWritten, err := c.parquetWriter.GetChunksWritten(ce.ExecutionId) if err != nil { - return fmt.Errorf("failed to get chunksWritten written: %w", err) + return err } // if no chunks have been written, we are done @@ -385,7 +389,7 @@ func (c *Collector) waitForExecution(ctx context.Context, ce *proto.EventComplet func (c *Collector) waitForExecutions(ctx context.Context) error { // TODO #config configure timeout https://github.com/turbot/tailpipe/issues/1 executionTimeout := executionMaxDuration - retryInterval := 5 * time.Second + retryInterval := 500 * time.Millisecond err := retry.Do(ctx, retry.WithMaxDuration(executionTimeout, retry.NewConstant(retryInterval)), func(ctx context.Context) error { c.executionsLock.RLock() diff --git a/internal/collector/execution.go b/internal/collector/execution.go index c757be88..85e37647 100644 --- a/internal/collector/execution.go +++ b/internal/collector/execution.go @@ -41,7 +41,7 @@ func newExecution(executionId string, part *config.Partition) *execution { e := &execution{ id: executionId, partition: part.UnqualifiedName, - table: part.Table, + table: part.TableName, plugin: part.Plugin.Alias, state: ExecutionState_PENDING, } @@ -66,7 +66,8 @@ func (e *execution) done(err error) { if err != nil { e.state = ExecutionState_ERROR e.error = err - } else { + } else if e.state != ExecutionState_ERROR { + // if state has not already been set to error, set to complete e.state = ExecutionState_COMPLETE } e.executionTiming.End = time.Now() diff --git a/internal/config/connection.go b/internal/config/connection.go index fe52c626..e37d6feb 100644 --- a/internal/config/connection.go +++ b/internal/config/connection.go @@ -5,12 +5,17 @@ import ( "github.com/hashicorp/hcl/v2" "github.com/turbot/pipe-fittings/hclhelpers" "github.com/turbot/pipe-fittings/modconfig" + "github.com/turbot/pipe-fittings/schema" "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" ) +func init() { + // we have a subtype - register it and ALSO implement GetSubType + registerResourceWithSubType(schema.BlockTypeConnection) +} + type TailpipeConnection struct { modconfig.HclResourceImpl - // TODO K rather than plugin - just use a name which in practice will be the plugin name Plugin string `cty:"plugin"` Hcl []byte `cty:"hcl"` // the hcl range for the connection - use our version so we can sty serialise it diff --git a/internal/config/format.go b/internal/config/format.go new file mode 100644 index 00000000..f46fdba5 --- /dev/null +++ b/internal/config/format.go @@ -0,0 +1,63 @@ +package config + +import ( + "fmt" + "github.com/hashicorp/hcl/v2" + "github.com/turbot/pipe-fittings/hclhelpers" + "github.com/turbot/pipe-fittings/modconfig" + "github.com/turbot/pipe-fittings/schema" + "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" +) + +func init() { + // we have a subtype - register it and ALSO implement GetSubType + registerResourceWithSubType(schema.BlockTypeFormat) +} + +type Format struct { + modconfig.HclResourceImpl + + Type string `cty:"type"` + Config *HclBytes `cty:"config"` +} + +// GetSubType returns the subtype for the format block (the type). +// The presence of this function indicates this resource supports 3 part names, +// which affects how it is stored in the eval context +func (f *Format) GetSubType() string { + return f.Type +} + +func NewFormat(block *hcl.Block, fullName string) (modconfig.HclResource, hcl.Diagnostics) { + if len(block.Labels) != 2 { + return nil, hcl.Diagnostics{&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "'format' block requires 1 labels: 'type' and 'name'", + Subject: hclhelpers.BlockRangePointer(block), + }} + } + c := &Format{ + HclResourceImpl: modconfig.NewHclResourceImpl(block, fullName), + Type: block.Labels[0], + } + + // NOTE: as tailpipe does not have the concept of mods, the full name is format.. and + // the unqualified name is . + c.UnqualifiedName = fmt.Sprintf("%s.%s", c.Type, c.ShortName) + return c, nil +} + +func (f *Format) ToProto() *proto.ConfigData { + res := &proto.ConfigData{ + Target: f.Type, + } + if f.Config != nil { + res.Hcl = f.Config.Hcl + res.Range = proto.RangeToProto(f.Config.Range.HclRange()) + } + return res +} + +func (f *Format) SetConfigHcl(u *HclBytes) { + f.Config = u +} diff --git a/internal/config/hcl_bytes.go b/internal/config/hcl_bytes.go index 542c7ee2..38f0b6ce 100644 --- a/internal/config/hcl_bytes.go +++ b/internal/config/hcl_bytes.go @@ -2,11 +2,12 @@ package config import ( "github.com/hashicorp/hcl/v2" + "github.com/turbot/pipe-fittings/hclhelpers" ) type HclBytes struct { - Hcl []byte - Range hcl.Range + Hcl []byte `cty:"hcl"` + Range hclhelpers.Range `cty:"range"` } func HclBytesForRange(sourceHcl []byte, r hcl.Range) *HclBytes { @@ -14,7 +15,7 @@ func HclBytesForRange(sourceHcl []byte, r hcl.Range) *HclBytes { hclForRange := append([]byte{}, sourceHcl[r.Start.Byte:r.End.Byte]...) return &HclBytes{ Hcl: hclForRange, - Range: r, + Range: hclhelpers.NewRange(r), } } func (h *HclBytes) Merge(other *HclBytes) { diff --git a/internal/config/parsed_name.go b/internal/config/parsed_name.go deleted file mode 100644 index 425aaa59..00000000 --- a/internal/config/parsed_name.go +++ /dev/null @@ -1,34 +0,0 @@ -package config - -// -//// TODO move to pipe-fittings and think about combining with existing ParsedResourceName -// -//// ParsedResourceName represents a parsed resource name for a resource with a subtype -//type ParsedResourceName struct { -// ItemType string -// ItemSubType string -// Name string -//} -// -//// TODO do all resources have a subtype??? -//func ParseResourceName(fullName string) (res *ParsedResourceName, err error) { -// res = &ParsedResourceName{} -// if fullName == "" { -// return res, nil -// } -// -// // valid resource name: -// // .. -// -// parts := strings.Split(fullName, ".") -// if len(parts) != 3 { -// return nil, perr.BadRequestWithMessage("invalid resource name: " + fullName) -// } -// -// // no property path specified -// res.ItemType = parts[0] -// res.ItemSubType = parts[1] -// res.Name = parts[2] -// -// return res, nil -//} diff --git a/internal/config/parsed_property_path.go b/internal/config/parsed_property_path.go deleted file mode 100644 index e32c4393..00000000 --- a/internal/config/parsed_property_path.go +++ /dev/null @@ -1,59 +0,0 @@ -package config - -import ( - "fmt" - "strings" - - "github.com/turbot/pipe-fittings/perr" -) - -// TODO move to pipe-fittings and think about combining with existing ParsedResourceName https://github.com/turbot/tailpipe/issues/32 - -// ParsedPropertyPath represents a parsed property path for a resource with a subtype -type ParsedPropertyPath struct { - ItemType string - ItemSubType string - Name string - PropertyPath []string - Original string -} - -func (p *ParsedPropertyPath) PropertyPathString() string { - return strings.Join(p.PropertyPath, ".") -} - -func (p *ParsedPropertyPath) ToResourceName() string { - return BuildResourceName(p.ItemType, p.ItemSubType, p.Name) -} - -func (p *ParsedPropertyPath) String() string { - return p.Original -} - -func ParseResourcePropertyPath(propertyPath string) (*ParsedPropertyPath, error) { - res := &ParsedPropertyPath{Original: propertyPath} - - // valid property paths: - // ... - - parts := strings.Split(propertyPath, ".") - if len(parts) < 3 { - return nil, perr.BadRequestWithMessage("invalid property path: " + propertyPath) - } - - // no property path specified - res.ItemType = parts[0] - res.ItemSubType = parts[1] - res.Name = parts[2] - // if a property path is set, add it - if len(parts) > 3 { - res.PropertyPath = parts[3:] - } - - return res, nil -} - -// TODO do we need this split out? -func BuildResourceName(resourceType, resourceSubType, name string) string { - return fmt.Sprintf("%s.%s.%s", resourceType, resourceSubType, name) -} diff --git a/internal/config/partition.go b/internal/config/partition.go index e5cbf9e8..776b6f27 100644 --- a/internal/config/partition.go +++ b/internal/config/partition.go @@ -2,31 +2,38 @@ package config import ( "fmt" - "strings" - "github.com/hashicorp/hcl/v2" "github.com/turbot/pipe-fittings/hclhelpers" "github.com/turbot/pipe-fittings/modconfig" "github.com/turbot/pipe-fittings/plugin" - "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" + "github.com/turbot/pipe-fittings/schema" + "github.com/turbot/tailpipe/internal/constants" + "strings" ) +func init() { + registerResourceWithSubType(schema.BlockTypePartition) +} + type Partition struct { modconfig.HclResourceImpl - // Type of the partition - Table string `hcl:"type,label"` + // the name of the table this partition is for - this is the first label in the partition block + TableName string + + // if the partition of for a custom table, this will be set to the custom table config + CustomTable *Table `cty:"table"` // Plugin used for this partition - Plugin *plugin.Plugin + Plugin *plugin.Plugin `cty:"-"` // Source of the data for this partition - Source Source `hcl:"source,block"` + Source Source `cty:"source"` // any partition-type specific config data for the partition - Config []byte + Config []byte `cty:"config"` // the config location - ConfigRange hcl.Range + ConfigRange hclhelpers.Range `cty:"config_range"` // an option filter in the format of a SQL where clause Filter string } @@ -41,42 +48,15 @@ func NewPartition(block *hcl.Block, fullName string) (modconfig.HclResource, hcl } c := &Partition{ HclResourceImpl: modconfig.NewHclResourceImpl(block, fullName), - Table: block.Labels[0], + TableName: block.Labels[0], } // NOTE: as tailpipe does not have the concept of mods, the full name is partition.. and // the unqualified name is the . - c.UnqualifiedName = fmt.Sprintf("%s.%s", c.Table, c.ShortName) + c.UnqualifiedName = fmt.Sprintf("%s.%s", c.TableName, c.ShortName) return c, nil } -func (c *Partition) OnDecoded(block *hcl.Block, _ modconfig.ModResourcesProvider) hcl.Diagnostics { - // if plugin is not set, deduce it from the type - if c.Plugin == nil { - c.Plugin = plugin.NewPlugin(c.inferPluginName()) - } - // TODO default connections https://github.com/turbot/tailpipe/issues/31 - // if the connection is not set, set it to the default connection - //if c.Connection == nil { - // c.Connection = getDefaultConnection(c.Plugin.Alias) - //} - return nil -} - -func getDefaultConnection(alias string) *TailpipeConnection { //nolint: unused // TODO: think about default connections https://github.com/turbot/tailpipe/issues/31 - return &TailpipeConnection{ - Plugin: alias, - } -} - -func (c *Partition) ToProto() *proto.ConfigData { - return &proto.ConfigData{ - Target: c.Name(), - Hcl: c.Config, - Range: proto.RangeToProto(c.DeclRange), - } -} - func (c *Partition) SetConfigHcl(u *HclBytes) { if u == nil { return @@ -85,6 +65,10 @@ func (c *Partition) SetConfigHcl(u *HclBytes) { c.ConfigRange = u.Range } -func (c *Partition) inferPluginName() string { - return strings.Split(c.Table, "_")[0] +func (c *Partition) InferPluginName() string { + if c.CustomTable != nil { + return constants.CorePluginName + } + // otherwise just use the first segment of the table name + return strings.Split(c.TableName, "_")[0] } diff --git a/internal/config/source.go b/internal/config/source.go index 27c6bc2e..3efcf3c0 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -1,31 +1,26 @@ package config import ( - "github.com/hashicorp/hcl/v2" "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" ) type Source struct { - Type string `hcl:" type,label"` - Config []byte - Connection *TailpipeConnection `hcl:"connection"` - - // the config location - ConfigRange hcl.Range + Type string `hcl:" type,label" cty:"type"` + Connection *TailpipeConnection `hcl:"connection" cty:"connection"` + // optional: the format (for custom tables) + Format *Format `hcl:"format" cty:"format"` + // the config hcl + Config *HclBytes `cty:"config"` } func (s *Source) ToProto() *proto.ConfigData { return &proto.ConfigData{ Target: "source." + s.Type, - Hcl: s.Config, - Range: proto.RangeToProto(s.ConfigRange), + Hcl: s.Config.Hcl, + Range: proto.RangeToProto(s.Config.Range.HclRange()), } } func (s *Source) SetConfigHcl(u *HclBytes) { - if u == nil { - return - } - s.Config = u.Hcl - s.ConfigRange = u.Range + s.Config = u } diff --git a/internal/config/sub_types.go b/internal/config/sub_types.go new file mode 100644 index 00000000..c3c55c8a --- /dev/null +++ b/internal/config/sub_types.go @@ -0,0 +1,15 @@ +package config + +// TODO rather not do this AND implement GetSubtype https://github.com/turbot/tailpipe/issues/110 + +// global map or resources with subtypes - populated at init +var resourcesWithSubtypes = map[string]struct{}{} + +func registerResourceWithSubType(blockType string) { + resourcesWithSubtypes[blockType] = struct{}{} +} + +func ResourceHasSubtype(blockType string) bool { + _, ok := resourcesWithSubtypes[blockType] + return ok +} diff --git a/internal/config/table.go b/internal/config/table.go new file mode 100644 index 00000000..f47b57b6 --- /dev/null +++ b/internal/config/table.go @@ -0,0 +1,77 @@ +package config + +import ( + "github.com/hashicorp/hcl/v2" + typehelpers "github.com/turbot/go-kit/types" + "github.com/turbot/pipe-fittings/hclhelpers" + "github.com/turbot/pipe-fittings/modconfig" + "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" +) + +type ColumnSchema struct { + Name string `hcl:"name,label" cty:"name"` + Type *string `hcl:"type" cty:"type"` + Source *string `hcl:"source" cty:"source"` +} + +type Table struct { + modconfig.HclResourceImpl + + // the default format for this table (todo make a map keyed by source name?) + DefaultSourceFormat *Format `hcl:"format" cty:"format"` + + Columns []ColumnSchema `hcl:"column,block" cty:"columns"` + + // should we include ALL source fields in addition to any defined columns, or ONLY include the columns defined + AutoMapSourceFields bool `hcl:"automap_source_fields,optional" cty:"automap_source_fields"` + // should we exclude any source fields from the output (only applicable if automap_source_fields is true) + ExcludeSourceFields []string `hcl:"exclude_source_fields,optional" cty:"exclude_source_fields"` +} + +func NewTable(block *hcl.Block, fullName string) (modconfig.HclResource, hcl.Diagnostics) { + if len(block.Labels) != 1 { + return nil, hcl.Diagnostics{&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "'table' block requires 1 label, 'name'", + Subject: hclhelpers.BlockRangePointer(block), + }} + } + c := &Table{ + HclResourceImpl: modconfig.NewHclResourceImpl(block, fullName), + // default to automap source fields + AutoMapSourceFields: true, + } + + // NOTE: as tailpipe does not have the concept of mods, the full name is table. and + // the unqualified name AND the short name is + c.UnqualifiedName = c.ShortName + + return c, nil +} + +func (t *Table) ToProtoSchema() *proto.Schema { + var res = &proto.Schema{ + AutomapSourceFields: t.AutoMapSourceFields, + ExcludeSourceFields: t.ExcludeSourceFields, + } + for _, col := range t.Columns { + s := &proto.ColumnSchema{ + // default source to column name + SourceName: col.Name, + ColumnName: col.Name, + Type: typehelpers.SafeString(col.Type), + } + if col.Source != nil { + s.SourceName = *col.Source + } + res.Columns = append(res.Columns, s) + } + return res +} + +func (t *Table) ToProto() *proto.Table { + return &proto.Table{ + Name: t.ShortName, + Schema: t.ToProtoSchema(), + } +} diff --git a/internal/config/tailpipe_config.go b/internal/config/tailpipe_config.go index 5a4a6d3f..d1e8fb58 100644 --- a/internal/config/tailpipe_config.go +++ b/internal/config/tailpipe_config.go @@ -2,7 +2,6 @@ package config import ( "fmt" - "github.com/turbot/pipe-fittings/modconfig" "github.com/turbot/pipe-fittings/plugin" "github.com/turbot/pipe-fittings/versionfile" @@ -16,17 +15,24 @@ type TailpipeConfig struct { // map of plugin configs, keyed by plugin image ref // (for each image ref we store an array of configs) - Plugins map[string][]*plugin.Plugin - // map of plugin configs, keyed by plugin instance - PluginsInstances map[string]plugin.Plugin + //Plugins map[string][]*plugin.Plugin + //// map of plugin configs, keyed by plugin instance + //PluginsInstances map[string]plugin.Plugin // map of installed plugin versions, keyed by plugin image ref PluginVersions map[string]*versionfile.InstalledVersion + CustomTables map[string]*Table + Formats map[string]*Format } func NewTailpipeConfig() *TailpipeConfig { return &TailpipeConfig{ Partitions: make(map[string]*Partition), Connections: make(map[string]*TailpipeConnection), + //Plugins: make(map[string][]*plugin.Plugin), + //PluginsInstances: make(map[string]plugin.Plugin), + PluginVersions: make(map[string]*versionfile.InstalledVersion), + CustomTables: make(map[string]*Table), + Formats: make(map[string]*Format), } } func (c *TailpipeConfig) Add(resource modconfig.HclResource) error { @@ -37,12 +43,33 @@ func (c *TailpipeConfig) Add(resource modconfig.HclResource) error { case *TailpipeConnection: c.Connections[t.GetUnqualifiedName()] = t return nil + case *Table: + c.CustomTables[t.GetUnqualifiedName()] = t + return nil + case *Format: + c.Formats[t.GetUnqualifiedName()] = t + return nil default: return fmt.Errorf("unsupported resource type %T", t) } } -func (c *TailpipeConfig) Validate() (warnins, errors []string) { - // TODO: implement - return nil, nil +func (c *TailpipeConfig) Validate() error { + // TODO K + return nil +} + +func (c *TailpipeConfig) InitPartitions() { + // populate the plugin property for each partition + for _, partition := range c.Partitions { + // set the table on the plugin (in case it is a custom table) + if _, isCustom := c.CustomTables[partition.TableName]; isCustom { + partition.CustomTable = c.CustomTables[partition.TableName] + } + // if the plugin is not set, infer it from the table + if partition.Plugin == nil { + partition.Plugin = plugin.NewPlugin(partition.InferPluginName()) + } + } + } diff --git a/internal/constants/plugin.go b/internal/constants/plugin.go index ade9f3ab..2816cbb6 100644 --- a/internal/constants/plugin.go +++ b/internal/constants/plugin.go @@ -1,5 +1,6 @@ package constants const ( + CorePluginName = "core" TailpipeHubOCIBase = "hub.tailpipe.io/" ) diff --git a/internal/display/partition.go b/internal/display/partition.go index c69689ea..ad40aff0 100644 --- a/internal/display/partition.go +++ b/internal/display/partition.go @@ -46,7 +46,7 @@ func ListPartitionResources(ctx context.Context) ([]*PartitionResource, error) { partitions := config.GlobalConfig.Partitions for _, p := range partitions { - name := fmt.Sprintf("%s.%s", p.Table, p.ShortName) + name := fmt.Sprintf("%s.%s", p.TableName, p.ShortName) partition := &PartitionResource{ Name: name, Description: p.Description, @@ -67,7 +67,7 @@ func ListPartitionResources(ctx context.Context) ([]*PartitionResource, error) { func GetPartitionResource(ctx context.Context, partitionName string) (*PartitionResource, error) { partitions := config.GlobalConfig.Partitions for _, p := range partitions { - name := fmt.Sprintf("%s.%s", p.Table, p.ShortName) + name := fmt.Sprintf("%s.%s", p.TableName, p.ShortName) if name == partitionName { partition := &PartitionResource{ Name: name, diff --git a/internal/display/table.go b/internal/display/table.go index b59ac591..9ea0ec76 100644 --- a/internal/display/table.go +++ b/internal/display/table.go @@ -158,7 +158,7 @@ func GetTableResource(ctx context.Context, tableName string) (*TableResource, er func (r *TableResource) setPartitions() { for _, partition := range config.GlobalConfig.Partitions { - if partition.Table == r.Name { + if partition.TableName == r.Name { r.Partitions = append(r.Partitions, partition.ShortName) } } diff --git a/internal/parquet/file_job_pool.go b/internal/parquet/file_job_pool.go index cd9e6c38..fb1fd97d 100644 --- a/internal/parquet/file_job_pool.go +++ b/internal/parquet/file_job_pool.go @@ -1,6 +1,7 @@ package parquet import ( + "errors" "fmt" "log/slog" "sync" @@ -132,6 +133,14 @@ func (w *fileJobPool[T]) GetChunksWritten(id string) (int32, error) { if !ok { return 0, fmt.Errorf("group id %s not found", id) } + + // if the job has errors, terminate + job.errorsLock.RLock() + defer job.errorsLock.RUnlock() + if len(job.errors) > 0 { + err := errors.Join(job.errors...) + return -1, fmt.Errorf("job group %s has errors: %w", id, err) + } return job.completionCount, nil } diff --git a/internal/parquet/parquet_worker.go b/internal/parquet/parquet_worker.go index 2189acbc..def085d1 100644 --- a/internal/parquet/parquet_worker.go +++ b/internal/parquet/parquet_worker.go @@ -127,7 +127,7 @@ func (w *parquetConversionWorker) convertFile(jsonlFilePath string, partition *c } // now read row count - return getRowCount(w.db.DB, w.destDir, fileRoot, partition.Table) + return getRowCount(w.db.DB, w.destDir, fileRoot, partition.TableName) } func getRowCount(db *sql.DB, destDir, fileRoot, table string) (int64, error) { diff --git a/internal/parquet/writer.go b/internal/parquet/writer.go index 945e0937..688f07cb 100644 --- a/internal/parquet/writer.go +++ b/internal/parquet/writer.go @@ -75,23 +75,26 @@ func (w *Writer) inferSchemaIfNeeded(executionID string, chunks []int) error { // determine if we have a full schema yet and if not infer from the chunk // NOTE: schema mode will be MUTATED once we infer it + // TODO #testing test this https://github.com/turbot/tailpipe/issues/108 + // first get read lock w.schemaMut.RLock() - m := w.schema.Mode + // is the schema complete (i.e. we are NOT automapping source columns and we have all types defined) + complete := w.schema.Complete() w.schemaMut.RUnlock() // do we have the full schema? - if m != schema.ModeFull { + if !complete { // get write lock w.schemaMut.Lock() - // check again if schema is still not full (to avoid race condition) - if w.schema.Mode != schema.ModeFull { + // check again if schema is still not full (to avoid race condition as another worker may have filled it) + if !w.schema.Complete() { // do the inference - s, err := w.inferSchema(executionID, chunks[0]) + s, err := w.inferChunkSchema(executionID, chunks[0]) if err != nil { return fmt.Errorf("failed to infer schema from first JSON file: %w", err) } - w.SetSchema(s) + w.schema.InitialiseFromInferredSchema(s) } w.schemaMut.Unlock() } @@ -124,7 +127,7 @@ func (w *Writer) SetSchema(rowSchema *schema.RowSchema) { w.schema = rowSchema } -func (w *Writer) inferSchema(executionId string, chunkNumber int) (*schema.RowSchema, error) { +func (w *Writer) inferChunkSchema(executionId string, chunkNumber int) (*schema.RowSchema, error) { jsonFileName := table.ExecutionIdToFileName(executionId, chunkNumber) filePath := filepath.Join(w.sourceDir, jsonFileName) @@ -147,8 +150,8 @@ func (w *Writer) inferSchema(executionId string, chunkNumber int) (*schema.RowSc defer rows.Close() var res = &schema.RowSchema{ - // NOTE: set the mode to full to indicate that we have inferred the schema - Mode: schema.ModeFull, + // NOTE: set autoMap to false as we have inferred the schema + AutoMapSourceFields: false, } // Read the results @@ -171,21 +174,6 @@ func (w *Writer) inferSchema(executionId string, chunkNumber int) (*schema.RowSc return nil, fmt.Errorf("failed during rows iteration: %w", err) } - // now if a partial schema was provided by the plugin override the inferred schema - if w.schema.Mode == schema.ModePartial { - // build a map of the partial schema columns - var partialSchemaMap = make(map[string]*schema.ColumnSchema) - for _, c := range w.schema.Columns { - partialSchemaMap[c.ColumnName] = c - } - for _, c := range res.Columns { - if _, ok := partialSchemaMap[c.ColumnName]; ok { - slog.Info("Overriding inferred schema with partial schema", "columnName", c.ColumnName, "type", partialSchemaMap[c.ColumnName].Type) - c.Type = partialSchemaMap[c.ColumnName].Type - } - } - } - return res, nil } diff --git a/internal/parse/config_parse_context.go b/internal/parse/config_parse_context.go index 6ac29034..f225fa95 100644 --- a/internal/parse/config_parse_context.go +++ b/internal/parse/config_parse_context.go @@ -22,6 +22,9 @@ type ConfigParseContext struct { // map of all resources, keyed by full name resourceMap map[string]modconfig.HclResource + + // the config which is being generated + tailpipeConfig *config.TailpipeConfig } func (c *ConfigParseContext) GetResource(parsedName *modconfig.ParsedResourceName) (resource modconfig.HclResource, found bool) { @@ -35,6 +38,7 @@ func NewConfigParseContext(rootEvalPath string) *ConfigParseContext { ParseContext: parseContext, resourceValues: make(map[string]map[string]cty.Value), resourceMap: make(map[string]modconfig.HclResource), + tailpipeConfig: config.NewTailpipeConfig(), } // we load workspaces separately @@ -151,7 +155,7 @@ func (c *ConfigParseContext) AddDependencies(block *hcl.Block, name string, depe for _, dep := range dependencies { // each dependency object may have multiple traversals for _, t := range dep.Traversals { - parsedPropertyPath, err := config.ParseResourcePropertyPath(hclhelpers.TraversalAsString(t)) + parsedPropertyPath, err := ParseResourcePropertyPath(hclhelpers.TraversalAsString(t)) if err != nil { diags = append(diags, &hcl.Diagnostic{ @@ -186,7 +190,7 @@ func (c *ConfigParseContext) AddDependencies(block *hcl.Block, name string, depe // overriden resourceNameFromDependency func func resourceNameFromDependency(propertyPath string) (string, error) { - parsedPropertyPath, err := config.ParseResourcePropertyPath(propertyPath) + parsedPropertyPath, err := ParseResourcePropertyPath(propertyPath) if err != nil { return "", err diff --git a/internal/parse/decode.go b/internal/parse/decode.go index e616dc66..b3a273b2 100644 --- a/internal/parse/decode.go +++ b/internal/parse/decode.go @@ -2,6 +2,7 @@ package parse import ( "fmt" + "strings" "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/hclsyntax" @@ -11,10 +12,9 @@ import ( "github.com/turbot/pipe-fittings/schema" "github.com/turbot/tailpipe/internal/config" "github.com/zclconf/go-cty/cty/gocty" - "golang.org/x/exp/maps" ) -func decodeTailpipeConfig(parseCtx *ConfigParseContext) (*config.TailpipeConfig, hcl.Diagnostics) { +func decodeTailpipeConfig(parseCtx *ConfigParseContext) hcl.Diagnostics { var diags hcl.Diagnostics blocksToDecode, err := parseCtx.BlocksToDecode() // build list of blocks to decode @@ -23,20 +23,19 @@ func decodeTailpipeConfig(parseCtx *ConfigParseContext) (*config.TailpipeConfig, Severity: hcl.DiagError, Summary: "failed to determine required dependency order", Detail: err.Error()}) - return nil, diags + return diags } // now clear dependencies from run context - they will be rebuilt parseCtx.ClearDependencies() - var tailpipeConfig = config.NewTailpipeConfig() for _, block := range blocksToDecode { resource, res := decodeBlock(block, parseCtx) diags = append(diags, res.Diags...) if !res.Success() || resource == nil { continue } - err := tailpipeConfig.Add(resource) + err := parseCtx.tailpipeConfig.Add(resource) if err != nil { diags = append(diags, &hcl.Diagnostic{ Severity: hcl.DiagError, @@ -47,7 +46,7 @@ func decodeTailpipeConfig(parseCtx *ConfigParseContext) (*config.TailpipeConfig, } } - return tailpipeConfig, diags + return diags } func decodeBlock(block *hcl.Block, parseCtx *ConfigParseContext) (modconfig.HclResource, *parse.DecodeResult) { @@ -83,18 +82,24 @@ func decodeResource(block *hcl.Block, parseCtx *ConfigParseContext) (modconfig.H switch block.Type { case schema.BlockTypePartition: - return decodePartition(block, parseCtx, resource) + res = decodePartition(block, parseCtx, resource) case schema.BlockTypeConnection: - return decodeConnection(block, parseCtx, resource) + res = decodeConnection(block, parseCtx, resource) + case schema.BlockTypeFormat: + res = decodeFormat(block, parseCtx, resource) + // TODO #parsing to support inline Format we need to manually parse the table block https://github.com/turbot/tailpipe/issues/109 + //case schema.BlockTypeTable: + default: // TODO what resources does this include? diags = parse.DecodeHclBody(block.Body, parseCtx.EvalCtx, parseCtx, resource) res.HandleDecodeDiags(diags) - return resource, res } + + return resource, res } -func decodePartition(block *hcl.Block, parseCtx *ConfigParseContext, resource modconfig.HclResource) (modconfig.HclResource, *parse.DecodeResult) { +func decodePartition(block *hcl.Block, parseCtx *ConfigParseContext, resource modconfig.HclResource) *parse.DecodeResult { res := parse.NewDecodeResult() target := resource.(*config.Partition) @@ -112,7 +117,7 @@ func decodePartition(block *hcl.Block, parseCtx *ConfigParseContext, resource mo res.HandleDecodeDiags(diags) // we failed, possibly as result of dependency error - give up for now if !res.Success() { - return nil, res + return res } target.Filter = val.AsString() default: @@ -121,14 +126,12 @@ func decodePartition(block *hcl.Block, parseCtx *ConfigParseContext, resource mo } var unknownBlocks []*hcl.Block for _, block := range blocks { - // TODO K decode plugin block switch block.Type { - case "source": + case schema.BlockTypeSource: // decode source block source, sourceRes := decodeSource(block, parseCtx) res.Merge(sourceRes) - - if !res.Diags.HasErrors() { + if res.Success() { target.Source = *source } default: @@ -136,18 +139,22 @@ func decodePartition(block *hcl.Block, parseCtx *ConfigParseContext, resource mo } } + if !res.Success() { + return res + } + // convert the unknown blocks and attributes to the raw hcl bytes unknown, diags := handleUnknownHcl(block, parseCtx, unknownAttrs, unknownBlocks) res.HandleDecodeDiags(diags) - if !res.Diags.HasErrors() { + if res.Success() { // now set unknown hcl target.SetConfigHcl(unknown) } - return target, res + return res } -func decodeConnection(block *hcl.Block, parseCtx *ConfigParseContext, resource modconfig.HclResource) (modconfig.HclResource, *parse.DecodeResult) { +func decodeConnection(block *hcl.Block, parseCtx *ConfigParseContext, resource modconfig.HclResource) *parse.DecodeResult { res := parse.NewDecodeResult() target := resource.(*config.TailpipeConnection) @@ -161,15 +168,15 @@ func decodeConnection(block *hcl.Block, parseCtx *ConfigParseContext, resource m Detail: fmt.Sprintf("unexpected block body type %T - expected *hclsyntax.Body", block.Body), Subject: hclhelpers.BlockRangePointer(block), }}) - return nil, res + return res } hclBytes := &config.HclBytes{} for _, attr := range syntaxBody.Attributes { hclBytes.Merge(config.HclBytesForRange(parseCtx.FileData[block.DefRange.Filename], attr.Range())) } target.Hcl = hclBytes.Hcl - target.HclRange = hclhelpers.NewRange(hclBytes.Range) - return target, res + target.HclRange = hclBytes.Range + return res } func handleUnknownHcl(block *hcl.Block, parseCtx *ConfigParseContext, unknownAttrs []*hcl.Attribute, unknownBlocks []*hcl.Block) (*config.HclBytes, hcl.Diagnostics) { @@ -199,45 +206,54 @@ func decodeSource(block *hclsyntax.Block, parseCtx *ConfigParseContext) (*config source := &config.Source{} source.Type = block.Labels[0] - attrs, diags := block.Body.JustAttributes() + var unknownBlocks []*hcl.Block + for _, block := range block.Body.Blocks { + unknownBlocks = append(unknownBlocks, block.AsHCLBlock()) + } + attrMap, diags := block.Body.JustAttributes() res.HandleDecodeDiags(diags) - if len(attrs) == 0 { - return source, res + if !res.Success() { + return nil, res } - var blocks []*hcl.Block - for _, block := range block.Body.Blocks { - blocks = append(blocks, block.AsHCLBlock()) + + if len(attrMap)+len(unknownBlocks) == 0 { + return source, res } - // special case for connection - if connectionAttr, ok := attrs["connection"]; ok { - //try to evaluate expression - val, diags := connectionAttr.Expr.Value(parseCtx.EvalCtx) - res.HandleDecodeDiags(diags) - // we failed, possibly as result of dependency error - give up for now - if !res.Success() { - return source, res - } - var conn = &config.TailpipeConnection{} - err := gocty.FromCtyValue(val, conn) - if err != nil { - // failed to decode connection - res.AddDiags(hcl.Diagnostics{&hcl.Diagnostic{ - Severity: hcl.DiagError, - Summary: "failed to decode connection", - Detail: fmt.Sprintf("failed to decode connection: %s", err.Error()), - Subject: hclhelpers.BlockRangePointer(block.AsHCLBlock()), - }}) - return source, res + var unknownAttrs []*hcl.Attribute + for attrName, attr := range attrMap { + switch attrName { + + case schema.AttributeConnection: + target := &config.TailpipeConnection{} + connRes := resourceFromExpression(parseCtx, block.AsHCLBlock(), attr.Expr, target) + res.Merge(connRes) + if res.Success() { + source.Connection = target + } + case schema.AttributeFormat: + target := &config.Format{} + formatRes := resourceFromExpression(parseCtx, block.AsHCLBlock(), attr.Expr, target) + res.Merge(formatRes) + if res.Success() { + source.Format = target + } + + default: + unknownAttrs = append(unknownAttrs, attr) } - source.Connection = conn + + } + + // if we failed for any reason (including dependency errors) give up fdor now + if !res.Success() { + return source, res } - delete(attrs, "connection") // get the unknown hcl - unknown, diags := handleUnknownHcl(block.AsHCLBlock(), parseCtx, maps.Values(attrs), blocks) + unknown, diags := handleUnknownHcl(block.AsHCLBlock(), parseCtx, unknownAttrs, unknownBlocks) res.HandleDecodeDiags(diags) - if res.Diags.HasErrors() { + if !res.Success() { return source, res } @@ -248,16 +264,94 @@ func decodeSource(block *hclsyntax.Block, parseCtx *ConfigParseContext) (*config } +func resourceFromExpression(parseCtx *ConfigParseContext, block *hcl.Block, expr hcl.Expression, target any) *parse.DecodeResult { + var res = parse.NewDecodeResult() + //try to evaluate expression + val, diags := expr.Value(parseCtx.EvalCtx) + + res.HandleDecodeDiags(diags) + // we failed, possibly as result of dependency error - give up for now + if !res.Success() { + return res + } + + err := gocty.FromCtyValue(val, target) + if err != nil { + // failed to decode connection + res.AddDiags(hcl.Diagnostics{&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "failed to decode expression", + Detail: fmt.Sprintf("failed to decode expression: %s", err.Error()), + Subject: hclhelpers.BlockRangePointer(block), + }}) + } + return res +} + +func decodeFormat(block *hcl.Block, parseCtx *ConfigParseContext, resource modconfig.HclResource) *parse.DecodeResult { + res := parse.NewDecodeResult() + format := resource.(*config.Format) + + attrMap, diags := block.Body.JustAttributes() + res.HandleDecodeDiags(diags) + if !res.Success() { + return res + } + var unknownAttrs []*hcl.Attribute + for attrName, attr := range attrMap { + switch attrName { + case schema.AttributeType: + var ty string + connRes := resourceFromExpression(parseCtx, block, attr.Expr, &ty) + res.Merge(connRes) + if res.Success() { + format.Type = ty + } + default: + unknownAttrs = append(unknownAttrs, attr) + } + } + + syntaxBody, ok := block.Body.(*hclsyntax.Body) + if !ok { + // unexpected + res.AddDiags(hcl.Diagnostics{&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "failed to decode format block", + Detail: fmt.Sprintf("unexpected block body type %T - expected *hclsyntax.Body", block.Body), + Subject: hclhelpers.BlockRangePointer(block), + }}) + return res + } + + var unknownBlocks []*hcl.Block + for _, b := range syntaxBody.Blocks { + unknownBlocks = append(unknownBlocks, b.AsHCLBlock()) + } + if len(unknownAttrs)+len(unknownBlocks) == 0 { + return res + } + + // get the unknown hcl + unknown, diags := handleUnknownHcl(block, parseCtx, unknownAttrs, unknownBlocks) + res.HandleDecodeDiags(diags) + if !res.Success() { + return res + } + + // set the unknown hcl on the source + format.SetConfigHcl(unknown) + + return res +} + // return a shell resource for the given block func resourceForBlock(block *hcl.Block) (modconfig.HclResource, hcl.Diagnostics) { - // all blocks must have 2 labels - subtype and name - this is enforced by schema - subType := block.Labels[0] - blockName := block.Labels[1] - fullName := config.BuildResourceName(block.Type, subType, blockName) factoryFuncs := map[string]func(*hcl.Block, string) (modconfig.HclResource, hcl.Diagnostics){ schema.BlockTypePartition: config.NewPartition, schema.BlockTypeConnection: config.NewTailpipeConnection, - //schema.BlockTypePlugin: config.NewPlugin, + schema.BlockTypeTable: config.NewTable, + schema.BlockTypeFormat: config.NewFormat, } factoryFunc, ok := factoryFuncs[block.Type] @@ -269,7 +363,19 @@ func resourceForBlock(block *hcl.Block) (modconfig.HclResource, hcl.Diagnostics) }, } } - return factoryFunc(block, fullName) + + name := fmt.Sprintf("%s.%s", block.Type, strings.Join(block.Labels, ".")) + + parsedName, err := ParseResourceName(name) + if err != nil { + return nil, hcl.Diagnostics{&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: fmt.Sprintf("failed to parse resource name %s", name), + Detail: err.Error(), + Subject: hclhelpers.BlockRangePointer(block), + }} + } + return factoryFunc(block, parsedName.ToResourceName()) } func handleDecodeResult(resource modconfig.HclResource, res *parse.DecodeResult, block *hcl.Block, parseCtx *ConfigParseContext) { @@ -285,7 +391,7 @@ func handleDecodeResult(resource modconfig.HclResource, res *parse.DecodeResult, } // failure :( - if len(res.Depends) > 0 { + if len(res.Depends) > 0 && resource != nil { moreDiags := parseCtx.AddDependencies(block, resource.Name(), res.Depends) res.AddDiags(moreDiags) } diff --git a/internal/parse/load_config.go b/internal/parse/load_config.go index 9abae6a9..1470eed4 100644 --- a/internal/parse/load_config.go +++ b/internal/parse/load_config.go @@ -3,11 +3,6 @@ package parse import ( "context" "fmt" - "log" - "log/slog" - "strings" - - "github.com/gertd/go-pluralize" "github.com/spf13/viper" filehelpers "github.com/turbot/go-kit/files" "github.com/turbot/go-kit/helpers" @@ -18,6 +13,7 @@ import ( "github.com/turbot/pipe-fittings/utils" "github.com/turbot/pipe-fittings/versionfile" "github.com/turbot/tailpipe/internal/config" + "log/slog" ) // LoadTailpipeConfig loads the HCL connection config, resources and workspace profiles @@ -50,48 +46,13 @@ func LoadTailpipeConfig(ctx context.Context) (tailpipeConfig *config.TailpipeCon } tailpipeConfig.PluginVersions = v.Plugins - //// load connections and config from the installation folder - load all spc files from config directory - //include := filehelpers.InclusionsFromExtensions(constants.ConnectionConfigExtensions) - //loadOptions := &loadConfigOptions{include: include} - //ew = loadConfig(ctx, filepaths.EnsureConfigDir(), tailpipeConfig, loadOptions) - //if ew.GetError() != nil { - // return nil, ew - //} - //// merge the warning from this call - //errorsAndWarnings.AddWarning(ew.Warnings...) - // - //// now validate the config - warnings, errors := tailpipeConfig.Validate() - logValidationResult(warnings, errors) - - return tailpipeConfig, errorsAndWarnings -} + // initialise all partitions - this populates the Plugin and CustomTable (where set) properties + tailpipeConfig.InitPartitions() -func logValidationResult(warnings []string, errors []string) { - if len(warnings) > 0 { - error_helpers.ShowWarning(buildValidationLogString(warnings, "warning")) - log.Printf("[TRACE] %s", buildValidationLogString(warnings, "warning")) - } - if len(errors) > 0 { - error_helpers.ShowWarning(buildValidationLogString(errors, "error")) - log.Printf("[TRACE] %s", buildValidationLogString(errors, "error")) - } -} + // now validate the config + ew.Error = tailpipeConfig.Validate() -func buildValidationLogString(items []string, validationType string) string { - count := len(items) - if count == 0 { - return "" - } - var str strings.Builder - str.WriteString(fmt.Sprintf("connection config has has %d validation %s:\n", - count, - pluralize.NewClient().Pluralize(validationType, count, false), - )) - for _, w := range items { - str.WriteString(fmt.Sprintf("\t %s\n", w)) - } - return str.String() + return tailpipeConfig, errorsAndWarnings } // load config from the given folder and update TailpipeConfig @@ -142,10 +103,9 @@ func parseTailpipeConfig(configPath string) (_ *config.TailpipeConfig, err error // we may need to decode more than once as we gather dependencies as we go // continue decoding as long as the number of unresolved blocks decreases prevUnresolvedBlocks := 0 - var tailpipeConfig *config.TailpipeConfig for attempts := 0; ; attempts++ { - tailpipeConfig, diags = decodeTailpipeConfig(parseCtx) + diags = decodeTailpipeConfig(parseCtx) if diags != nil && diags.HasErrors() { return nil, error_helpers.HclDiagsToError("Failed to decode all config files", diags) } @@ -165,6 +125,6 @@ func parseTailpipeConfig(configPath string) (_ *config.TailpipeConfig, err error prevUnresolvedBlocks = unresolvedBlocks } - return tailpipeConfig, nil + return parseCtx.tailpipeConfig, nil } diff --git a/internal/parse/load_config_test.go b/internal/parse/load_config_test.go index 75ddd2ef..f442f9f9 100644 --- a/internal/parse/load_config_test.go +++ b/internal/parse/load_config_test.go @@ -1,78 +1,155 @@ package parse -// result is a struct to hold the expected result of the test - designed to be easily compared with the actual result -// type result struct { -// plugin string -// partitionType string -// partitionConfig string -// sourceType string -// sourceConfig string -// } - // TODO enable and fix this test -// func TestGetPartitionConfig(t *testing.T) { -// var ctx context.Context -// type args struct { -// configPath string -// partition string -// } -// tests := []struct { -// name string -// args args -// want result -// wantErr bool -// }{ -// // TODO #testing add more test cases -// { -// name: "1", -// args: args{ -// configPath: "test_data/configs", -// partition: "partition.aws_cloudtrail_log.cloudtrail_logs", -// }, -// want: result{ -// plugin: "aws", -// partitionType: "aws_cloudtrail_log", -// sourceType: "file_system", -// sourceConfig: `paths = ["/Users/kai/tailpipe_data/flaws_cloudtrail_logs"] -// extensions = [".gz"]`, -// }, - -// wantErr: false, -// }, -// } - -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// tailpipeDir, er := filepath.Abs(tt.args.configPath) -// if er != nil { -// t.Errorf("failed to build absolute config filepath from %s", tt.args.configPath) -// } -// // set app_specific.InstallDir -// app_specific.InstallDir = tailpipeDir - -// config, err := LoadTailpipeConfig(ctx) -// if (err.Error != nil) != tt.wantErr { -// t.Errorf("LoadTailpipeConfig() error = %v, wantErr %v", err, tt.wantErr) -// return -// } -// col, ok := config.Partitions[tt.args.partition] -// if !ok { -// t.Errorf("LoadTailpipeConfig() partition not found") -// return -// } - -// // build the result -// var got = result{ -// plugin: col.Plugin.Alias, -// partitionType: col.Table, -// partitionConfig: string(col.Config), -// sourceType: col.Source.Type, -// sourceConfig: string(col.Source.Config), -// } - -// if !reflect.DeepEqual(got, tt.want) { -// t.Errorf("LoadTailpipeConfig() got = %v, want %v", got, tt.want) -// } -// }) -// } -// } +//func TestLoadTailpipeConfig(t *testing.T) { +// type args struct { +// configPath string +// partition string +// } +// tests := []struct { +// name string +// args args +// want *config.TailpipeConfig +// wantErr bool +// }{ +// // TODO #testing add more test cases +// { +// name: "static tables", +// args: args{ +// configPath: "test_data/static_table_config", +// partition: "partition.aws_cloudtrail_log.cloudtrail_logs", +// }, +// want: &config.TailpipeConfig{ +// PluginVersions: nil, +// Partitions: map[string]*config.Partition{ +// "partition.aws_cloudtrail_log.cloudtrail_logs": {}, +// "partition.aws_vpc_flow_log.flow_logs": {}, +// }, +// }, +// +// wantErr: false, +// }, +// { +// name: "dynamic tables", +// args: args{ +// configPath: "test_data/custom_table_config", +// partition: "partition.aws_cloudtrail_log.cloudtrail_logs", +// }, +// want: &config.TailpipeConfig{ +// Partitions: map[string]*config.Partition{ +// "my_csv_log.test": { +// HclResourceImpl: modconfig.HclResourceImpl{ +// FullName: "partition.my_csv_log.test", +// ShortName: "test", +// UnqualifiedName: "my_csv_log.test", +// DeclRange: hcl.Range{ +// Filename: "test_data/custom_table_config/resources.tpc", +// Start: hcl.Pos{ +// Line: 2, +// Column: 30, +// Byte: 30, +// }, +// End: hcl.Pos{ +// Line: 10, +// Column: 2, +// Byte: 230, +// }, +// }, +// BlockType: "partition", +// }, +// TableName: "my_csv_log", +// Plugin: &plugin.Plugin{ +// Instance: "custom", +// Alias: "custom", +// Plugin: "/plugins/turbot/custom@latest", +// }, +// Source: config.Source{ +// Type: "file_system", +// Config: &config.HclBytes{ +// Hcl: []byte("extensions = [\".csv\"]\npaths = [\"/Users/kai/tailpipe_data/logs\"]"), +// Range: hclhelpers.NewRange(hcl.Range{ +// Filename: "test_data/custom_table_config/resources.tpc", +// Start: hcl.Pos{ +// Line: 4, +// Column: 9, +// Byte: 68, +// }, +// End: hcl.Pos{ +// Line: 5, +// Column: 30, +// Byte: 139, +// }, +// }), +// }, +// }, +// }, +// }, +// CustomTables: map[string]*config.Table{ +// "my_csv_log": { +// HclResourceImpl: modconfig.HclResourceImpl{ +// FullName: "partition.my_csv_log.test", +// ShortName: "test", +// UnqualifiedName: "my_csv_log.test", +// DeclRange: hcl.Range{ +// Filename: "test_data/custom_table_config/resources.tpc", +// Start: hcl.Pos{ +// Line: 2, +// Column: 30, +// Byte: 30, +// }, +// End: hcl.Pos{ +// Line: 10, +// Column: 2, +// Byte: 230, +// }, +// }, +// BlockType: "partition", +// }, +// //Mode: schema.ModePartial, +// Columns: []config.ColumnSchema{ +// { +// Name: "tp_timestamp", +// Source: utils.ToPointer("time_local"), +// }, +// { +// Name: "tp_index", +// Source: utils.ToPointer("account_id"), +// }, +// { +// Name: "org_id", +// Source: utils.ToPointer("org"), +// }, +// { +// Name: "user_id", +// Type: utils.ToPointer("varchar"), +// }, +// }, +// }, +// }, +// }, +// +// wantErr: false, +// }, +// } +// +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// tailpipeDir, er := filepath.Abs(tt.args.configPath) +// if er != nil { +// t.Errorf("failed to build absolute config filepath from %s", tt.args.configPath) +// } +// // set app_specific.InstallDir +// app_specific.InstallDir = tailpipeDir +// +// tailpipeConfig, err := parseTailpipeConfig(tt.args.configPath) +// if (err != nil) != tt.wantErr { +// t.Errorf("LoadTailpipeConfig() error = %v, wantErr %v", err, tt.wantErr) +// return +// } +// +// if !reflect.DeepEqual(tailpipeConfig, tt.want) { +// t.Errorf("LoadTailpipeConfig() = %v, want %v", tailpipeConfig, tt.want) +// } +// }) +// } +//} diff --git a/internal/parse/parsed_property_path.go b/internal/parse/parsed_property_path.go new file mode 100644 index 00000000..e2ffe4ac --- /dev/null +++ b/internal/parse/parsed_property_path.go @@ -0,0 +1,70 @@ +package parse + +import ( + "fmt" + "github.com/turbot/pipe-fittings/perr" + "github.com/turbot/tailpipe/internal/config" + "strings" +) + +// ParsedPropertyPath represents a parsed property path for a resource with a subtype +type ParsedPropertyPath struct { + Type string + SubType string + Name string + PropertyPath []string + Original string +} + +func ParseResourcePropertyPath(propertyPath string) (*ParsedPropertyPath, error) { + res := &ParsedPropertyPath{Original: propertyPath} + + // valid property paths (depending on whether this resource has a subtype): + // ... + // .. + + parts := strings.Split(propertyPath, ".") + + // does this resource type support subtypes + hasSubtype := config.ResourceHasSubtype(parts[0]) + + minParts := 2 + if hasSubtype { + minParts = 3 + } + + if len(parts) < minParts { + return nil, perr.BadRequestWithMessage(fmt.Sprintf("invalid resource name: %s - at least %d parts required", propertyPath, minParts)) + } + + // no property path specified + res.Type = parts[0] + + if hasSubtype { + res.SubType = parts[1] + res.Name = parts[2] + } else { + res.Name = parts[1] + } + // if a property path is set, add it + if len(parts) > minParts { + res.PropertyPath = parts[minParts:] + } + + return res, nil +} + +func (p *ParsedPropertyPath) PropertyPathString() string { + return strings.Join(p.PropertyPath, ".") +} + +func (p *ParsedPropertyPath) ToResourceName() string { + if p.SubType == "" { + return fmt.Sprintf("%s.%s", p.Type, p.Name) + } + return fmt.Sprintf("%s.%s.%s", p.Type, p.SubType, p.Name) +} + +func (p *ParsedPropertyPath) String() string { + return p.Original +} diff --git a/internal/parse/parsed_resource_name.go b/internal/parse/parsed_resource_name.go new file mode 100644 index 00000000..fe097abb --- /dev/null +++ b/internal/parse/parsed_resource_name.go @@ -0,0 +1,62 @@ +package parse + +import ( + "fmt" + "github.com/turbot/pipe-fittings/perr" + "github.com/turbot/tailpipe/internal/config" + "strings" +) + +// ParsedResourceName represents a parsed property path for a resource with a subtype +type ParsedResourceName struct { + Type string + SubType string + Name string + + Original string +} + +func ParseResourceName(propertyPath string) (*ParsedResourceName, error) { + res := &ParsedResourceName{Original: propertyPath} + + // valid property paths (depending on whether this resource has a subtype): + // ... + // .. + + parts := strings.Split(propertyPath, ".") + + // does this resource type support subtypes + hasSubtype := config.ResourceHasSubtype(parts[0]) + + expectedParts := 2 + if hasSubtype { + expectedParts = 3 + } + + if len(parts) != expectedParts { + return nil, perr.BadRequestWithMessage(fmt.Sprintf("invalid resource name: %s - extected %d parts", propertyPath, expectedParts)) + } + + // no property path specified + res.Type = parts[0] + + if hasSubtype { + res.SubType = parts[1] + res.Name = parts[2] + } else { + res.Name = parts[1] + } + + return res, nil +} + +func (p *ParsedResourceName) ToResourceName() string { + if p.SubType == "" { + return fmt.Sprintf("%s.%s", p.Type, p.Name) + } + return fmt.Sprintf("%s.%s.%s", p.Type, p.SubType, p.Name) +} + +func (p *ParsedResourceName) String() string { + return p.Original +} diff --git a/internal/parse/test_data/configs/resources.tpc b/internal/parse/test_data/configs/resources.tpc deleted file mode 100644 index fa7b353c..00000000 --- a/internal/parse/test_data/configs/resources.tpc +++ /dev/null @@ -1,88 +0,0 @@ - - -partition "aws_cloudtrail_log" "cloudtrail_logs" { - plugin = "aws" - source "file_system" { - paths = ["/Users/kai/tailpipe_data/flaws_cloudtrail_logs"] - extensions = [".gz"] - } -} - -partition "aws_vpc_flow_log" "flow_logs" { - plugin = "aws" - source "aws_cloudwatch" { - log_group_name = "/victor/vpc/flowlog" - start_time = "2024-08-12T07:56:26Z" - end_time = "2024-08-13T07:56:26Z" - access_key = "REPLACE" - secret_key = "REPLACE" - session_token = "REPLACE" - } -} - -partition "aws_elb_access_log" "elb_logs" { - plugin = "aws" - source "aws_s3_bucket" { - bucket = "spongebob-097350876455-us-east-1-cffd7fe0" - prefix = "spongebob_5_42_21/alb/AWSLogs/097350876455/elasticloadbalancing/" - extensions = [".gz"] - access_key = "REPLACE" - secret_key = "REPLACE" - session_token = "REPLACE" - } -} - -partition "aws_s3_server_access_log" "s3_server_access_logs" { - plugin = "aws" - source "aws_s3_bucket" { - bucket = "turbot-688720832404-us-east-1" - prefix = "AWSLogs/688720832404/S3/elasticbeanstalk-us-east-1-688720832404/" - extensions = [] # allow all since these logs have no extension - access_key = "REPLACE" - secret_key = "REPLACE" - session_token = "REPLACE" - } -} - -partition "aws_lambda_log" "lambda_logs" { - plugin = "aws" - source "aws_cloudwatch" { - log_group_name = "/aws/lambda/sentry_event_proxy" - start_time = "2024-08-12T07:56:26Z" - end_time = "2024-08-13T07:56:26Z" - access_key = "REPLACE" - secret_key = "REPLACE" - session_token = "REPLACE" - } -} - -partition "pipes_audit_log" "pipes_logs" { - plugin = "pipes" - source "pipes_audit_log_api" { - } -} - -partition "gcp_audit_log" "gcp_logs" { - plugin = "gcp" - source "gcp_audit_log_api" { - credentials = "/Users/graza/gcp/tailpipe-creds.json" - project = "parker-aaa" - log_types = ["activity", "data_access", "system_event"] - } -} - -partition "github_audit_log" "github_logs" { - plugin = "github" - source "file_system" { - paths = ["/Users/cbruno/logs_tailpipe/github"] - extensions = [".json"] - } -} - -partition "nginx_access_log" "njinx_logs" { - plugin = "nginx" - source "file_system" { - paths = ["/Users/graza/tailpipe_data/nginx_access_logs"] - extensions = [".log"] - } -} \ No newline at end of file diff --git a/internal/parse/test_data/custom_table_config/resources.tpc b/internal/parse/test_data/custom_table_config/resources.tpc new file mode 100644 index 00000000..48f2abcd --- /dev/null +++ b/internal/parse/test_data/custom_table_config/resources.tpc @@ -0,0 +1,40 @@ + +partition "my_csv_log" "test"{ + source "file_system" { + paths = ["/Users/kai/tailpipe_data/logs"] + extensions = [".csv"] + + # format MUST be set for a custom table + format = format.csv_logs + } +} + + +# define a custom table 'my_log' +table "my_csv_log" { + format = format.csv_default_logs + # the partition to use + column "tp_timestamp" { + source = "time_local" + } + column "tp_index" { + source = "account_id" + } + column "org_id" { + source = "org" + } + column "user_id" { + type = "varchar" + } +} + + + +format "delimited" "csv_default_logs" { + +} + +format "delimited" "csv_logs" { + delimiter = "\t" + header = false +} \ No newline at end of file diff --git a/internal/parse/test_data/static_table_config/resources.tpc b/internal/parse/test_data/static_table_config/resources.tpc new file mode 100644 index 00000000..f90d5349 --- /dev/null +++ b/internal/parse/test_data/static_table_config/resources.tpc @@ -0,0 +1,23 @@ + +# partition for static table with file_system source +partition "aws_cloudtrail_log" "cloudtrail_logs" { + plugin = "aws" + source "file_system" { + paths = ["/Users/kai/tailpipe_data/flaws_cloudtrail_logs"] + extensions = [".gz"] + } +} + +# partition for static table with aws_cloudwatch source +partition "aws_vpc_flow_log" "flow_logs" { + plugin = "aws" + source "aws_cloudwatch" { + log_group_name = "/victor/vpc/flowlog" + start_time = "2024-08-12T07:56:26Z" + end_time = "2024-08-13T07:56:26Z" + access_key = "REPLACE" + secret_key = "REPLACE" + session_token = "REPLACE" + } +} + diff --git a/internal/plugin_manager/plugin_manager.go b/internal/plugin_manager/plugin_manager.go index 76ed2c61..883d82ff 100644 --- a/internal/plugin_manager/plugin_manager.go +++ b/internal/plugin_manager/plugin_manager.go @@ -71,16 +71,32 @@ func (p *PluginManager) Collect(ctx context.Context, partition *config.Partition // tell the plugin to start the collection req := &proto.CollectRequest{ + TableName: partition.TableName, + PartitionName: partition.ShortName, ExecutionId: executionID, OutputPath: inboxPath, - PartitionData: partition.ToProto(), SourceData: partition.Source.ToProto(), CollectionState: []byte(collectionState), } + if partition.Source.Connection != nil { req.ConnectionData = partition.Source.Connection.ToProto() } + if partition.Source.Format != nil { + req.SourceFormat = partition.Source.Format.ToProto() + } + if partition.CustomTable != nil { + req.CustomTable = partition.CustomTable.ToProto() + // set the default source format if the source dow not provide one + if req.SourceFormat == nil && partition.CustomTable.DefaultSourceFormat != nil { + req.SourceFormat = partition.CustomTable.DefaultSourceFormat.ToProto() + } + if req.SourceFormat == nil { + return nil, fmt.Errorf("no source format defined for custom table %s", partition.CustomTable.ShortName) + } + } + collectResponse, err := pluginClient.Collect(req) if err != nil { return nil, fmt.Errorf("error starting collection for plugin %s: %w", pluginClient.Name, error_helpers.TransformErrorToSteampipe(err)) @@ -119,8 +135,8 @@ func (p *PluginManager) Describe(ctx context.Context, pluginName string) (*Plugi func (p *PluginManager) Close() { p.pluginMutex.Lock() defer p.pluginMutex.Unlock() - for _, plugin := range p.Plugins { - plugin.client.Kill() + for _, plg := range p.Plugins { + plg.client.Kill() } } @@ -195,7 +211,7 @@ func (p *PluginManager) startPlugin(tp *pplugin.Plugin) (*PluginClient, error) { return client, nil } -// TODO #config #debug this is currently provided for debug purposes only +// for debug purposes, plugin start timeout can be set via an environment variable TAILPIPE_PLUGIN_START_TIMEOUT func (p *PluginManager) getPluginStartTimeout() time.Duration { pluginStartTimeout := 1 * time.Minute pluginStartTimeoutStr := os.Getenv(constants.EnvPluginStartTimeout)