这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
2 changes: 1 addition & 1 deletion cmd/bosun/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (a *Results) Equal(b *Results) (bool, error) {
if a.IgnoreOtherUnjoined != b.IgnoreOtherUnjoined {
return false, fmt.Errorf("ignoreUnjoined flag does not match a: %v, b: %v", a.IgnoreOtherUnjoined, b.IgnoreOtherUnjoined)
}
if a.NaNValue != a.NaNValue {
if a.NaNValue != b.NaNValue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find!

return false, fmt.Errorf("NaNValue does not match a: %v, b: %v", a.NaNValue, b.NaNValue)
}
sortedA := ResultSliceByGroup(a.Results)
Expand Down
206 changes: 205 additions & 1 deletion cmd/bosun/expr/funcs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package expr

import (
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -44,8 +46,23 @@ func tagRemove(args []parse.Node) (parse.Tags, error) {
}

func seriesFuncTags(args []parse.Node) (parse.Tags, error) {
s := args[0].(*parse.StringNode).Text
return tagsFromString(s)
}

func aggrFuncTags(args []parse.Node) (parse.Tags, error) {
if len(args) < 3 {
return nil, errors.New("aggr: expect 3 arguments")
}
if _, ok := args[1].(*parse.StringNode); !ok {
return nil, errors.New("aggr: expect group to be string")
}
s := args[1].(*parse.StringNode).Text
return tagsFromString(s)
}

func tagsFromString(text string) (parse.Tags, error) {
t := make(parse.Tags)
text := args[0].(*parse.StringNode).Text
if text == "" {
return t, nil
}
Expand Down Expand Up @@ -199,6 +216,15 @@ var builtins = map[string]parse.Func{
F: Streak,
},

// Aggregation functions
"aggr": {
Args: []models.FuncType{models.TypeSeriesSet, models.TypeString, models.TypeString},
Return: models.TypeSeriesSet,
Tags: aggrFuncTags,
F: Aggr,
Check: aggrCheck,
},

// Group functions
"addtags": {
Args: []models.FuncType{models.TypeVariantSet, models.TypeString},
Expand Down Expand Up @@ -385,6 +411,184 @@ var builtins = map[string]parse.Func{
},
}

// Aggr combines multiple series matching the specified groups using an aggregator function. If group
// is empty, all given series are combined, regardless of existing groups.
// Available aggregator functions include: avg, min, max, sum, and pN, where N is a float between
// 0 and 1 inclusive, e.g. p.50 represents the 50th percentile. p0 and p1 are equal to min and max,
// respectively, but min and max are preferred for readability.
func Aggr(e *State, series *Results, groups string, aggregator string) (*Results, error) {
results := Results{}

grps := splitGroups(groups)
if len(grps) == 0 {
// no groups specified, so we merge all group values
res, err := aggr(e, series, aggregator)
if err != nil {
return &results, err
}
res.Group = opentsdb.TagSet{}
results.Results = append(results.Results, res)
return &results, nil
}

// at least one group specified, so we work out what
// the new group values will be
newGroups := map[string]*Results{}
for _, result := range series.Results {
var vals []string
for _, grp := range grps {
if val, ok := result.Group[grp]; ok {
vals = append(vals, val)
continue
}
return nil, fmt.Errorf("unmatched group in at least one series: %v", grp)
}
groupName := strings.Join(vals, ",")
if _, ok := newGroups[groupName]; !ok {
newGroups[groupName] = &Results{}
}
newGroups[groupName].Results = append(newGroups[groupName].Results, result)
}

for groupName, series := range newGroups {
res, err := aggr(e, series, aggregator)
if err != nil {
return &results, err
}
vs := strings.Split(groupName, ",")
res.Group = opentsdb.TagSet{}
for i := 0; i < len(grps); i++ {
res.Group.Merge(opentsdb.TagSet{grps[i]: vs[i]})
}
results.Results = append(results.Results, res)
}

return &results, nil
}

// Splits a string of groups by comma, but also trims any added whitespace
// and returns an empty slice if the string is empty.
func splitGroups(groups string) []string {
if len(groups) == 0 {
return []string{}
}
grps := strings.Split(groups, ",")
for i, grp := range grps {
grps[i] = strings.Trim(grp, " ")
}
return grps
}

func aggr(e *State, series *Results, aggfunc string) (*Result, error) {
res := Result{}
newSeries := make(Series)
var isPerc bool
var percValue float64
if len(aggfunc) > 0 && aggfunc[0] == 'p' {
var err error
percValue, err = strconv.ParseFloat(aggfunc[1:], 10)
isPerc = err == nil
}
if isPerc {
if percValue < 0 || percValue > 1 {
return nil, fmt.Errorf("expr: aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1")
}
aggfunc = "percentile"
}

switch aggfunc {
case "percentile":
newSeries = aggrPercentile(series.Results, percValue)
case "min":
newSeries = aggrPercentile(series.Results, 0.0)
case "max":
newSeries = aggrPercentile(series.Results, 1.0)
case "avg":
newSeries = aggrAverage(series.Results)
case "sum":
newSeries = aggrSum(series.Results)
default:
return &res, fmt.Errorf("unknown aggfunc: %v. Options are avg, p50, min, max", aggfunc)
}

res.Value = newSeries
return &res, nil
}

func aggrPercentile(series ResultSlice, percValue float64) Series {
newSeries := make(Series)
merged := map[time.Time][]float64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
merged[t] = append(merged[t], v)
}
}
for t := range merged {
// transform points from merged series into a made-up
// single timeseries, so that we can use the existing
// percentile reduction function here
dps := Series{}
for i := range merged[t] {
dps[time.Unix(int64(i), 0)] = merged[t][i]
}
newSeries[t] = percentile(dps, percValue)
}
return newSeries
}

func aggrAverage(series ResultSlice) Series {
newSeries := make(Series)
counts := map[time.Time]int64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
newSeries[t] += v
counts[t]++
}
}
for t := range newSeries {
newSeries[t] /= float64(counts[t])
}
return newSeries
}

func aggrSum(series ResultSlice) Series {
newSeries := make(Series)
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
newSeries[t] += v
}
}
return newSeries
}

func aggrCheck(t *parse.Tree, f *parse.FuncNode) error {
if len(f.Args) < 3 {
return errors.New("aggr: expect 3 arguments")
}
if _, ok := f.Args[2].(*parse.StringNode); !ok {
return errors.New("aggr: expect string as aggregator function name")
}
name := f.Args[2].(*parse.StringNode).Text
var isPerc bool
var percValue float64
if len(name) > 0 && name[0] == 'p' {
var err error
percValue, err = strconv.ParseFloat(name[1:], 10)
isPerc = err == nil
}
if isPerc {
if percValue < 0 || percValue > 1 {
return errors.New("aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1")
}
return nil
}
switch name {
case "avg", "min", "max", "sum":
return nil
}
return fmt.Errorf("aggr: unrecognized aggregation function %s", name)
}

func V(e *State) (*Results, error) {
return fromScalar(e.vValue), nil
}
Expand Down
Loading