这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/11-test-acceptance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
with:
repository: turbot/pipe-fittings
path: pipe-fittings
ref: tp-pragma
ref: tp

- name: Checkout Tailpipe plugin SDK repository
uses: actions/checkout@v4
Expand Down
41 changes: 25 additions & 16 deletions cmd/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,38 +189,47 @@ func getPartitions(args []string) ([]*config.Partition, error) {
}

func getPartitionsForArg(partitions []string, arg string) ([]string, error) {
tablePattern, partitionPattern, err := getPartitionMatchPatternsForArg(partitions, arg)
if err != nil {
return nil, err
}
// now match the partition
var res []string
for _, partition := range partitions {
pattern := tablePattern + "." + partitionPattern
if fnmatch.Match(pattern, partition, fnmatch.FNM_CASEFOLD) {
res = append(res, partition)
}
}
return res, nil
}

func getPartitionMatchPatternsForArg(partitions []string, arg string) (string, string, error) {
var tablePattern, partitionPattern string
parts := strings.Split(arg, ".")
switch len(parts) {
case 1:
var err error
tablePattern, partitionPattern, err = getPartitionMatchPatterns(partitions, arg, parts)
tablePattern, partitionPattern, err = getPartitionMatchPatternsForSinglePartName(partitions, arg)
if err != nil {
return nil, err
return "", "", err
}
case 2:
// use the args as provided
tablePattern = parts[0]
partitionPattern = parts[1]
default:
return nil, fmt.Errorf("invalid partition name: %s", arg)
}

// now match the partition
var res []string
for _, partition := range partitions {
pattern := tablePattern + "." + partitionPattern
if fnmatch.Match(pattern, partition, fnmatch.FNM_CASEFOLD) {
res = append(res, partition)
}
return "", "", fmt.Errorf("invalid partition name: %s", arg)
}
return res, nil
return tablePattern, partitionPattern, nil
}

func getPartitionMatchPatterns(partitions []string, arg string, parts []string) (string, string, error) {
// getPartitionMatchPatternsForSinglePartName returns the table and partition patterns for a single part name
// e.g. if the arg is "aws*"
func getPartitionMatchPatternsForSinglePartName(partitions []string, arg string) (string, string, error) {
var tablePattern, partitionPattern string
// '*' is not valid for a single part arg
if parts[0] == "*" {
if arg == "*" {
return "", "", fmt.Errorf("invalid partition name: %s", arg)
}
// check whether there is table with this name
Expand All @@ -237,7 +246,7 @@ func getPartitionMatchPatterns(partitions []string, arg string, parts []string)
}
// so there IS NOT a table with this name - set table pattern to * and user provided partition name
tablePattern = "*"
partitionPattern = parts[0]
partitionPattern = arg
return tablePattern, partitionPattern, nil
}

Expand Down
119 changes: 119 additions & 0 deletions cmd/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,122 @@ func Test_getPartition(t *testing.T) {
})
}
}

func Test_getPartitionMatchPatternsForArg(t *testing.T) {
type args struct {
partitions []string
arg string
}
tests := []struct {
name string
args args
wantTablePattern string
wantPartPattern string
wantErr bool
}{
{
name: "Valid table and partition pattern",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_s3_cloudtrail_log.p2"},
arg: "aws_s3_cloudtrail_log.p1",
},
wantTablePattern: "aws_s3_cloudtrail_log",
wantPartPattern: "p1",
},
{
name: "Wildcard partition pattern",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_s3_cloudtrail_log.p2", "aws_elb_access_log.p1"},
arg: "aws_s3_cloudtrail_log.*",
},
wantTablePattern: "aws_s3_cloudtrail_log",
wantPartPattern: "*",
},
{
name: "Wildcard in table and partition both",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_s3_cloudtrail_log.p2", "aws_elb_access_log.p1"},
arg: "aws*.*",
},
wantTablePattern: "aws*",
wantPartPattern: "*",
},
{
name: "Wildcard table pattern",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_elb_access_log.p1"},
arg: "*.p1",
},
wantTablePattern: "*",
wantPartPattern: "p1",
},
{
name: "Invalid partition name",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_s3_cloudtrail_log.p2"},
arg: "*",
},
wantErr: true,
},
{
name: "Table exists without partition",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_s3_cloudtrail_log.p2"},
arg: "aws_s3_cloudtrail_log",
},
wantTablePattern: "aws_s3_cloudtrail_log",
wantPartPattern: "*",
},
{
name: "Partition only, multiple tables",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1", "aws_elb_access_log.p1"},
arg: "p1",
},
wantTablePattern: "*",
wantPartPattern: "p1",
},
{
name: "Invalid argument with multiple dots",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1"},
arg: "aws.s3.cloudtrail",
},
wantErr: true,
},
{
name: "Non-existing table name",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1"},
arg: "non_existing_table.p1",
},
wantTablePattern: "non_existing_table",
wantPartPattern: "p1",
},
{
name: "Partition name does not exist",
args: args{
partitions: []string{"aws_s3_cloudtrail_log.p1"},
arg: "p2",
},
wantTablePattern: "*",
wantPartPattern: "p2",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotTablePattern, gotPartPattern, err := getPartitionMatchPatternsForArg(tt.args.partitions, tt.args.arg)
if (err != nil) != tt.wantErr {
t.Errorf("getPartitionMatchPatternsForArg() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotTablePattern != tt.wantTablePattern {
t.Errorf("getPartitionMatchPatternsForArg() gotTablePattern = %v, want %v", gotTablePattern, tt.wantTablePattern)
}
if gotPartPattern != tt.wantPartPattern {
t.Errorf("getPartitionMatchPatternsForArg() gotPartPattern = %v, want %v", gotPartPattern, tt.wantPartPattern)
}
})
}
}
142 changes: 142 additions & 0 deletions cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/turbot/tailpipe/internal/constants"
"github.com/turbot/tailpipe/internal/database"
"github.com/turbot/tailpipe/internal/filepaths"
"golang.org/x/exp/maps"
)

// variable used to assign the output mode flag
Expand All @@ -50,6 +51,8 @@ func connectCmd() *cobra.Command {
cmdconfig.OnCmd(cmd).
AddStringFlag(pconstants.ArgFrom, "", "Specify the start time").
AddStringFlag(pconstants.ArgTo, "", "Specify the end time").
AddStringSliceFlag(pconstants.ArgIndex, nil, "Specify the index to use").
AddStringSliceFlag(pconstants.ArgPartition, nil, "Specify the partition to use").
AddVarFlag(enumflag.New(&connectOutputMode, pconstants.ArgOutput, constants.ConnectOutputModeIds, enumflag.EnumCaseInsensitive),
pconstants.ArgOutput,
fmt.Sprintf("Output format; one of: %s", strings.Join(constants.FlagValues(constants.PluginOutputModeIds), ", ")))
Expand Down Expand Up @@ -167,6 +170,26 @@ func getFilters() ([]string, error) {
toTimestamp := t.Format(time.DateTime)
result = append(result, fmt.Sprintf("tp_date <= DATE '%s' AND tp_timestamp <= TIMESTAMP '%s'", toDate, toTimestamp))
}
if viper.IsSet(pconstants.ArgPartition) {
// we have loaded tailpipe config by this time
availablePartitions := config.GlobalConfig.Partitions
partitionArgs := viper.GetStringSlice(pconstants.ArgPartition)
// get the SQL filters from the provided partition
sqlFilters, err := getPartitionSqlFilters(partitionArgs, maps.Keys(availablePartitions))
if err != nil {
return nil, err
}
result = append(result, sqlFilters)
}
if viper.IsSet(pconstants.ArgIndex) {
indexArgs := viper.GetStringSlice(pconstants.ArgIndex)
// get the SQL filters from the provided index
sqlFilters, err := getIndexSqlFilters(indexArgs)
if err != nil {
return nil, err
}
result = append(result, sqlFilters)
}
return result, nil
}

Expand Down Expand Up @@ -275,3 +298,122 @@ func cleanupOldDbFiles() error {
}
return nil
}

func getPartitionSqlFilters(partitionArgs []string, availablePartitions []string) (string, error) {
// Get table and partition patterns using getPartitionPatterns
tablePatterns, partitionPatterns, err := getPartitionPatterns(partitionArgs, availablePartitions)
if err != nil {
return "", fmt.Errorf("error processing partition args: %w", err)
}

// Handle the case when patterns are empty
if len(tablePatterns) == 0 || len(partitionPatterns) == 0 {
return "", nil
}

// Replace wildcards from '*' to '%' for SQL compatibility
updatedTables, updatedPartitions := replaceWildcards(tablePatterns, partitionPatterns)

var conditions []string

for i := 0; i < len(updatedTables); i++ {
table := updatedTables[i]
partition := updatedPartitions[i]

var tableCondition, partitionCondition string

// If there is no wildcard, use '=' instead of LIKE
if table == "%" {
// Skip table condition if full wildcard
tableCondition = ""
} else if strings.Contains(table, "%") {
tableCondition = fmt.Sprintf("tp_table LIKE '%s'", table)
} else {
tableCondition = fmt.Sprintf("tp_table = '%s'", table)
}

if partition == "%" {
// Skip partition condition if full wildcard
partitionCondition = ""
} else if strings.Contains(partition, "%") {
partitionCondition = fmt.Sprintf("tp_partition LIKE '%s'", partition)
} else {
partitionCondition = fmt.Sprintf("tp_partition = '%s'", partition)
}

// Remove empty conditions and combine valid ones
if tableCondition != "" && partitionCondition != "" {
conditions = append(conditions, fmt.Sprintf("(%s AND %s)", tableCondition, partitionCondition))
} else if tableCondition != "" {
conditions = append(conditions, tableCondition)
} else if partitionCondition != "" {
conditions = append(conditions, partitionCondition)
}
}

// Combine all conditions with OR
sqlFilters := strings.Join(conditions, " OR ")

return sqlFilters, nil
}

func getIndexSqlFilters(indexArgs []string) (string, error) {
// Return empty if no indexes provided
if len(indexArgs) == 0 {
return "", nil
}

// Build SQL filter based on whether wildcards are present
var conditions []string
for _, index := range indexArgs {
if index == "*" {
// Skip index condition if full wildcard
conditions = append(conditions, "")
} else if strings.Contains(index, "*") {
// Replace '*' wildcard with '%' for SQL LIKE compatibility
index = strings.ReplaceAll(index, "*", "%")
conditions = append(conditions, fmt.Sprintf("CAST(tp_index AS VARCHAR) LIKE '%s'", index))
} else {
// Exact match using '='
conditions = append(conditions, fmt.Sprintf("tp_index = '%s'", index))
}
}

// Combine all conditions with OR
sqlFilter := strings.Join(conditions, " OR ")

return sqlFilter, nil
}

// getPartitionPatterns returns the table and partition patterns for the given partition args
func getPartitionPatterns(partitionArgs []string, partitions []string) ([]string, []string, error) {
var tablePatterns []string
var partitionPatterns []string

for _, arg := range partitionArgs {
tablePattern, partitionPattern, err := getPartitionMatchPatternsForArg(partitions, arg)
if err != nil {
return nil, nil, fmt.Errorf("error processing partition arg '%s': %w", arg, err)
}

tablePatterns = append(tablePatterns, tablePattern)
partitionPatterns = append(partitionPatterns, partitionPattern)
}

return tablePatterns, partitionPatterns, nil
}

func replaceWildcards(tablePatterns []string, partitionPatterns []string) ([]string, []string) {
updatedTables := make([]string, len(tablePatterns))
updatedPartitions := make([]string, len(partitionPatterns))

for i, table := range tablePatterns {
updatedTables[i] = strings.ReplaceAll(table, "*", "%")
}

for i, partition := range partitionPatterns {
updatedPartitions[i] = strings.ReplaceAll(partition, "*", "%")
}

return updatedTables, updatedPartitions
}
Loading
Loading