diff --git a/cmd/bosun/expr/expr.go b/cmd/bosun/expr/expr.go index 48cfbf882b..130d779685 100644 --- a/cmd/bosun/expr/expr.go +++ b/cmd/bosun/expr/expr.go @@ -290,7 +290,7 @@ func (a *Results) Equal(b *Results) (bool, error) { if a.IgnoreOtherUnjoined != b.IgnoreOtherUnjoined { return false, fmt.Errorf("ignoreUnjoined flag does not match a: %v, b: %v", a.IgnoreOtherUnjoined, b.IgnoreOtherUnjoined) } - if a.NaNValue != a.NaNValue { + if a.NaNValue != b.NaNValue { return false, fmt.Errorf("NaNValue does not match a: %v, b: %v", a.NaNValue, b.NaNValue) } sortedA := ResultSliceByGroup(a.Results) diff --git a/cmd/bosun/expr/funcs.go b/cmd/bosun/expr/funcs.go index 565e0e7f3d..bc473a38ec 100644 --- a/cmd/bosun/expr/funcs.go +++ b/cmd/bosun/expr/funcs.go @@ -1,9 +1,11 @@ package expr import ( + "errors" "fmt" "math" "sort" + "strconv" "strings" "time" @@ -44,8 +46,23 @@ func tagRemove(args []parse.Node) (parse.Tags, error) { } func seriesFuncTags(args []parse.Node) (parse.Tags, error) { + s := args[0].(*parse.StringNode).Text + return tagsFromString(s) +} + +func aggrFuncTags(args []parse.Node) (parse.Tags, error) { + if len(args) < 3 { + return nil, errors.New("aggr: expect 3 arguments") + } + if _, ok := args[1].(*parse.StringNode); !ok { + return nil, errors.New("aggr: expect group to be string") + } + s := args[1].(*parse.StringNode).Text + return tagsFromString(s) +} + +func tagsFromString(text string) (parse.Tags, error) { t := make(parse.Tags) - text := args[0].(*parse.StringNode).Text if text == "" { return t, nil } @@ -199,6 +216,15 @@ var builtins = map[string]parse.Func{ F: Streak, }, + // Aggregation functions + "aggr": { + Args: []models.FuncType{models.TypeSeriesSet, models.TypeString, models.TypeString}, + Return: models.TypeSeriesSet, + Tags: aggrFuncTags, + F: Aggr, + Check: aggrCheck, + }, + // Group functions "addtags": { Args: []models.FuncType{models.TypeVariantSet, models.TypeString}, @@ -385,6 +411,184 @@ var builtins = map[string]parse.Func{ }, } +// Aggr combines multiple series matching the specified groups using an aggregator function. If group +// is empty, all given series are combined, regardless of existing groups. +// Available aggregator functions include: avg, min, max, sum, and pN, where N is a float between +// 0 and 1 inclusive, e.g. p.50 represents the 50th percentile. p0 and p1 are equal to min and max, +// respectively, but min and max are preferred for readability. +func Aggr(e *State, series *Results, groups string, aggregator string) (*Results, error) { + results := Results{} + + grps := splitGroups(groups) + if len(grps) == 0 { + // no groups specified, so we merge all group values + res, err := aggr(e, series, aggregator) + if err != nil { + return &results, err + } + res.Group = opentsdb.TagSet{} + results.Results = append(results.Results, res) + return &results, nil + } + + // at least one group specified, so we work out what + // the new group values will be + newGroups := map[string]*Results{} + for _, result := range series.Results { + var vals []string + for _, grp := range grps { + if val, ok := result.Group[grp]; ok { + vals = append(vals, val) + continue + } + return nil, fmt.Errorf("unmatched group in at least one series: %v", grp) + } + groupName := strings.Join(vals, ",") + if _, ok := newGroups[groupName]; !ok { + newGroups[groupName] = &Results{} + } + newGroups[groupName].Results = append(newGroups[groupName].Results, result) + } + + for groupName, series := range newGroups { + res, err := aggr(e, series, aggregator) + if err != nil { + return &results, err + } + vs := strings.Split(groupName, ",") + res.Group = opentsdb.TagSet{} + for i := 0; i < len(grps); i++ { + res.Group.Merge(opentsdb.TagSet{grps[i]: vs[i]}) + } + results.Results = append(results.Results, res) + } + + return &results, nil +} + +// Splits a string of groups by comma, but also trims any added whitespace +// and returns an empty slice if the string is empty. +func splitGroups(groups string) []string { + if len(groups) == 0 { + return []string{} + } + grps := strings.Split(groups, ",") + for i, grp := range grps { + grps[i] = strings.Trim(grp, " ") + } + return grps +} + +func aggr(e *State, series *Results, aggfunc string) (*Result, error) { + res := Result{} + newSeries := make(Series) + var isPerc bool + var percValue float64 + if len(aggfunc) > 0 && aggfunc[0] == 'p' { + var err error + percValue, err = strconv.ParseFloat(aggfunc[1:], 10) + isPerc = err == nil + } + if isPerc { + if percValue < 0 || percValue > 1 { + return nil, fmt.Errorf("expr: aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1") + } + aggfunc = "percentile" + } + + switch aggfunc { + case "percentile": + newSeries = aggrPercentile(series.Results, percValue) + case "min": + newSeries = aggrPercentile(series.Results, 0.0) + case "max": + newSeries = aggrPercentile(series.Results, 1.0) + case "avg": + newSeries = aggrAverage(series.Results) + case "sum": + newSeries = aggrSum(series.Results) + default: + return &res, fmt.Errorf("unknown aggfunc: %v. Options are avg, p50, min, max", aggfunc) + } + + res.Value = newSeries + return &res, nil +} + +func aggrPercentile(series ResultSlice, percValue float64) Series { + newSeries := make(Series) + merged := map[time.Time][]float64{} + for _, result := range series { + for t, v := range result.Value.Value().(Series) { + merged[t] = append(merged[t], v) + } + } + for t := range merged { + // transform points from merged series into a made-up + // single timeseries, so that we can use the existing + // percentile reduction function here + dps := Series{} + for i := range merged[t] { + dps[time.Unix(int64(i), 0)] = merged[t][i] + } + newSeries[t] = percentile(dps, percValue) + } + return newSeries +} + +func aggrAverage(series ResultSlice) Series { + newSeries := make(Series) + counts := map[time.Time]int64{} + for _, result := range series { + for t, v := range result.Value.Value().(Series) { + newSeries[t] += v + counts[t]++ + } + } + for t := range newSeries { + newSeries[t] /= float64(counts[t]) + } + return newSeries +} + +func aggrSum(series ResultSlice) Series { + newSeries := make(Series) + for _, result := range series { + for t, v := range result.Value.Value().(Series) { + newSeries[t] += v + } + } + return newSeries +} + +func aggrCheck(t *parse.Tree, f *parse.FuncNode) error { + if len(f.Args) < 3 { + return errors.New("aggr: expect 3 arguments") + } + if _, ok := f.Args[2].(*parse.StringNode); !ok { + return errors.New("aggr: expect string as aggregator function name") + } + name := f.Args[2].(*parse.StringNode).Text + var isPerc bool + var percValue float64 + if len(name) > 0 && name[0] == 'p' { + var err error + percValue, err = strconv.ParseFloat(name[1:], 10) + isPerc = err == nil + } + if isPerc { + if percValue < 0 || percValue > 1 { + return errors.New("aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1") + } + return nil + } + switch name { + case "avg", "min", "max", "sum": + return nil + } + return fmt.Errorf("aggr: unrecognized aggregation function %s", name) +} + func V(e *State) (*Results, error) { return fromScalar(e.vValue), nil } diff --git a/cmd/bosun/expr/funcs_test.go b/cmd/bosun/expr/funcs_test.go index 24e6abe361..0419c153af 100644 --- a/cmd/bosun/expr/funcs_test.go +++ b/cmd/bosun/expr/funcs_test.go @@ -2,6 +2,7 @@ package expr import ( "fmt" + "math" "testing" "time" @@ -257,3 +258,245 @@ func TestTail(t *testing.T) { } } } + +func TestAggr(t *testing.T) { + seriesA := `series("foo=bar", 0, 1, 100, 2)` + seriesB := `series("foo=baz", 0, 3, 100, 4)` + seriesC := `series("foo=bat", 0, 5, 100, 6)` + + // test median aggregator + err := testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"p.50\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 3, + time.Unix(100, 0): 4, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // test average aggregator + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"avg\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 3, + time.Unix(100, 0): 4, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // test min aggregator + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"min\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 1, + time.Unix(100, 0): 2, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // test max aggregator + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"max\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 5, + time.Unix(100, 0): 6, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // check that min == p0 + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"p0\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 1, + time.Unix(100, 0): 2, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // check that sum aggregator sums up the aligned points in the series + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"sum\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 9, + time.Unix(100, 0): 12, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // check that unknown aggregator errors out + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"\", \"unknown\")", seriesA, seriesB, seriesC), + Results{}, + false, + }) + if err == nil { + t.Errorf("expected unknown aggregator to return error") + } +} + +func TestAggrWithGroups(t *testing.T) { + seriesA := `series("color=blue,type=apple,name=bob", 0, 1)` + seriesB := `series("color=blue,type=apple", 1, 3)` + seriesC := `series("color=green,type=apple", 0, 5)` + + // test aggregator with single group + err := testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"color\", \"p.50\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 1, + time.Unix(1, 0): 3, + }, + Group: opentsdb.TagSet{"color": "blue"}, + }, + &Result{ + Value: Series{ + time.Unix(0, 0): 5, + }, + Group: opentsdb.TagSet{"color": "green"}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } + + // test aggregator with multiple groups + err = testExpression(exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v, %v), \"color,type\", \"p.50\")", seriesA, seriesB, seriesC), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): 1, + time.Unix(1, 0): 3, + }, + Group: opentsdb.TagSet{"color": "blue", "type": "apple"}, + }, + &Result{ + Value: Series{ + time.Unix(0, 0): 5, + }, + Group: opentsdb.TagSet{"color": "green", "type": "apple"}, + }, + }, + }, + false, + }) + if err != nil { + t.Error(err) + } +} + +func TestAggrNaNHandling(t *testing.T) { + // test behavior when NaN is encountered. + seriesD := `series("foo=bar", 0, 0 / 0, 100, 1)` + seriesE := `series("foo=baz", 0, 1, 100, 3)` + + // expect NaN points to be dropped + eio := exprInOut{ + fmt.Sprintf("aggr(merge(%v, %v), \"\", \"p.90\")", seriesD, seriesE), + Results{ + Results: ResultSlice{ + &Result{ + Value: Series{ + time.Unix(0, 0): math.NaN(), + time.Unix(100, 0): 2, + }, + Group: opentsdb.TagSet{}, + }, + }, + }, + false, + } + e, err := New(eio.expr, builtins) + if err != nil { + t.Fatal(err.Error()) + } + backends := &Backends{ + InfluxConfig: client.HTTPConfig{}, + } + providers := &BosunProviders{} + _, _, err = e.Execute(backends, providers, nil, queryTime, 0, false) + if err != nil { + t.Fatal(err.Error()) + } + + results := eio.out.Results + if len(results) != 1 { + t.Errorf("got len(results) == %d, want 1", len(results)) + } + val0 := results[0].Value.(Series)[time.Unix(0, 0)] + if !math.IsNaN(val0) { + t.Errorf("got first point = %f, want NaN", val0) + } + val1 := results[0].Value.(Series)[time.Unix(100, 0)] + if val1 != 2.0 { + t.Errorf("got second point = %f, want %f", val1, 2.0) + } +} diff --git a/docs/expressions.md b/docs/expressions.md index 87605cf268..565ddcd0ea 100644 --- a/docs/expressions.md +++ b/docs/expressions.md @@ -593,6 +593,28 @@ Returns the length of the longest streak of values that evaluate to true (i.e. m Sum. +# Aggregation Functions + +Aggregation functions take a seriesSet, and return a new seriesSet. + +## aggr(series seriesSet, groups string, aggregator string) seriesSet +{: .exprFunc} + +Takes a seriesSet and combines it into a new seriesSet with the groups specified, using an aggregator to merge any series that share the matching group values. If the groups argument is an empty string, all series are combined into a single series, regardless of existing groups. + +The available aggregator functions are: `"avg"` (average), `"min"` (minimum), `"max"` (maximum), `"sum"` and `"pN"` (percentile) where N is a floating point number between 0 and 1 inclusive. For example, `"p.25"` will be the 25th percentile, `"p.999"` will be the 99.9th percentile. `"p0"` and `"p1"` are min and max respectively (However, in these cases it is recommended to use `"min"` and `"max"` for the sake of clarity. + +The aggr function can be particularly useful for removing anomalies when comparing timeseries over periods using the over function. + +Example: + +``` +$weeks = over("sum:1m-avg:os.cpu{region=*,color=*}", "24h", "1w", 3) +$agg = aggr($weeks, "region,color", "p.50") +``` + +The above example uses `over` to load a 24 hour period over the past 3 weeks. We then use the aggr function to combine the three weeks into one, selecting the median (`p.50`) value of the 3 weeks at each timestamp. This results in a new seriesSet, grouped by region and color, that represents a "normal" 24 hour period with anomalies removed. + # Group Functions Group functions modify the OpenTSDB groups.