这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/bosun/expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (e *State) walkFunc(node *parse.FuncNode, T miniprofiler.Timer) *Results {
var res *Results
T.Step("func: "+node.Name, func(T miniprofiler.Timer) {
var in []reflect.Value
for _, a := range node.Args {
for i, a := range node.Args {
var v interface{}
switch t := a.(type) {
case *parse.StringNode:
Expand All @@ -596,6 +596,9 @@ func (e *State) walkFunc(node *parse.FuncNode, T miniprofiler.Timer) *Results {
default:
panic(fmt.Errorf("expr: unknown func arg type"))
}
if f, ok := v.(float64); ok && node.F.Args[i] == parse.TypeNumberSet {
v = fromScalar(f)
}
in = append(in, reflect.ValueOf(v))
}
f := reflect.ValueOf(node.F.F)
Expand Down
50 changes: 35 additions & 15 deletions cmd/bosun/expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,31 +143,32 @@ func TestQueryExpr(t *testing.T) {
},
}
d := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC)
tests := map[string]map[string]map[time.Time]float64{
tests := map[string]map[string]Value{
`window("avg:m{a=*}", "5m", "1h", 2, "max")`: {
"a=b": {
"a=b": Series{
d: 2,
d.Add(time.Second * 2): 6,
},
"a=c": {
"a=c": Series{
d.Add(time.Second * 1): 8,
},
"a=d": {
"a=d": Series{
d.Add(time.Second * 8): 9,
},
},
`window("avg:m{a=*}", "5m", "1h", 2, "avg")`: {
"a=b": {
"a=b": Series{
d: 1.5,
d.Add(time.Second * 2): 5,
},
"a=c": {
"a=c": Series{
d.Add(time.Second * 1): 7.5,
},
"a=d": {
"a=d": Series{
d.Add(time.Second * 8): 8.5,
},
},
`abs(-1)`: {"": Number(1)},
}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -214,15 +215,34 @@ func TestQueryExpr(t *testing.T) {
t.Errorf("missing tag %v", tag)
continue
}
val := r.Value.(Series)
if len(val) != len(ex) {
t.Errorf("unmatched values in %v", tag)
}
for k, v := range ex {
got := val[k]
if got != v {
t.Errorf("%v, %v: got %v, expected %v", tag, k, got, v)
switch val := r.Value.(type) {
case Series:
ex, ok := ex.(Series)
if !ok {
t.Errorf("%v: bad type %T", exprText, ex)
continue
}
if len(val) != len(ex) {
t.Errorf("unmatched values in %v", tag)
continue
}
for k, v := range ex {
got := val[k]
if got != v {
t.Errorf("%v, %v: got %v, expected %v", tag, k, got, v)
}
}
case Number:
ex, ok := ex.(Number)
if !ok {
t.Errorf("%v: bad type %T", exprText, ex)
continue
}
if ex != val {
t.Errorf("%v: got %v, expected %v", exprText, r.Value, ex)
}
default:
t.Errorf("%v: unknown type %T", exprText, r.Value)
}
}
}
Expand Down
99 changes: 66 additions & 33 deletions cmd/bosun/expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var builtins = map[string]parse.Func{
F: First,
},
"forecastlr": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeNumberSet,
Tags: tagFirst,
F: Forecast_lr,
Expand Down Expand Up @@ -196,7 +196,7 @@ var builtins = map[string]parse.Func{
F: Min,
},
"percentile": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeNumberSet,
Tags: tagFirst,
F: Percentile,
Expand Down Expand Up @@ -260,25 +260,25 @@ var builtins = map[string]parse.Func{
F: Des,
},
"dropge": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropGe,
},
"dropg": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropG,
},
"drople": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropLe,
},
"dropl": {
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeScalar},
Args: []parse.FuncType{parse.TypeSeriesSet, parse.TypeNumberSet},
Return: parse.TypeSeriesSet,
Tags: tagFirst,
F: DropL,
Expand Down Expand Up @@ -380,39 +380,41 @@ func Duration(e *State, T miniprofiler.Timer, d string) (*Results, error) {
}, nil
}

func DropValues(e *State, T miniprofiler.Timer, series *Results, threshold float64, dropFunction func(float64, float64) bool) (*Results, error) {
for _, res := range series.Results {
func DropValues(e *State, T miniprofiler.Timer, series *Results, threshold *Results, dropFunction func(float64, float64) bool) (*Results, error) {
f := func(res *Results, s *Result, floats []float64) error {
nv := make(Series)
for k, v := range res.Value.Value().(Series) {
if !dropFunction(float64(v), threshold) {
for k, v := range s.Value.Value().(Series) {
if !dropFunction(float64(v), floats[0]) {
//preserve values which should not be discarded
nv[k] = v
}
}
if len(nv) == 0 {
return nil, fmt.Errorf("series %s is empty", res.Group)
return fmt.Errorf("series %s is empty", s.Group)
}
res.Value = nv
s.Value = nv
res.Results = append(res.Results, s)
return nil
}
return series, nil
return match(f, series, threshold)
}

func DropGe(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropGe(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value >= threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropG(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropG(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value > threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropLe(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropLe(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value <= threshold }
return DropValues(e, T, series, threshold, dropFunction)
}

func DropL(e *State, T miniprofiler.Timer, series *Results, threshold float64) (*Results, error) {
func DropL(e *State, T miniprofiler.Timer, series *Results, threshold *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool { return value < threshold }
return DropValues(e, T, series, threshold, dropFunction)
}
Expand All @@ -421,7 +423,7 @@ func DropNA(e *State, T miniprofiler.Timer, series *Results) (*Results, error) {
dropFunction := func(value float64, threshold float64) bool {
return math.IsNaN(float64(value)) || math.IsInf(float64(value), 0)
}
return DropValues(e, T, series, 0, dropFunction)
return DropValues(e, T, series, fromScalar(0), dropFunction)
}

func parseGraphiteResponse(req *graphite.Request, s *graphite.Response, formatTags []string) ([]*Result, error) {
Expand Down Expand Up @@ -874,32 +876,63 @@ func Change(e *State, T miniprofiler.Timer, query, sduration, eduration string)
if err != nil {
return
}
r, err = reduce(e, T, r, change, (sd - ed).Seconds())
r, err = reduce(e, T, r, change, fromScalar((sd - ed).Seconds()))
return
}

func change(dps Series, args ...float64) float64 {
return avg(dps) * args[0]
}

func reduce(e *State, T miniprofiler.Timer, series *Results, F func(Series, ...float64) float64, args ...float64) (*Results, error) {
func fromScalar(f float64) *Results {
return &Results{
Results: ResultSlice{
&Result{
Value: Number(f),
},
},
}
}

func match(f func(res *Results, series *Result, floats []float64) error, series *Results, numberSets ...*Results) (*Results, error) {
res := *series
res.Results = nil
for _, s := range series.Results {
switch t := s.Value.(type) {
case Series:
if len(t) == 0 {
continue
var floats []float64
for _, num := range numberSets {
for _, n := range num.Results {
if len(n.Group) == 0 || s.Group.Overlaps(n.Group) {
floats = append(floats, float64(n.Value.(Number)))
break
}
}
s.Value = Number(F(t, args...))
res.Results = append(res.Results, s)
default:
panic(fmt.Errorf("expr: expected a series"))
}
if len(floats) != len(numberSets) {
if !series.IgnoreUnjoined {
return nil, fmt.Errorf("unjoined groups for %s", s.Group)
}
continue
}
if err := f(&res, s, floats); err != nil {
return nil, err
}
}
return &res, nil
}

func reduce(e *State, T miniprofiler.Timer, series *Results, F func(Series, ...float64) float64, args ...*Results) (*Results, error) {
f := func(res *Results, s *Result, floats []float64) error {
t := s.Value.(Series)
if len(t) == 0 {
return nil
}
s.Value = Number(F(t, floats...))
res.Results = append(res.Results, s)
return nil
}
return match(f, series, args...)
}

func Abs(e *State, T miniprofiler.Timer, series *Results) *Results {
for _, s := range series.Results {
s.Value = Number(math.Abs(float64(s.Value.Value().(Number))))
Expand Down Expand Up @@ -1070,7 +1103,7 @@ func (e *State) since(dps Series, args ...float64) (a float64) {
return s.Seconds()
}

func Forecast_lr(e *State, T miniprofiler.Timer, series *Results, y float64) (r *Results, err error) {
func Forecast_lr(e *State, T miniprofiler.Timer, series *Results, y *Results) (r *Results, err error) {
return reduce(e, T, series, e.forecast_lr, y)
}

Expand Down Expand Up @@ -1107,20 +1140,20 @@ func (e *State) forecast_lr(dps Series, args ...float64) float64 {
return s.Seconds()
}

func Percentile(e *State, T miniprofiler.Timer, series *Results, p float64) (r *Results, err error) {
func Percentile(e *State, T miniprofiler.Timer, series *Results, p *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, p)
}

func Min(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, 0)
return reduce(e, T, series, percentile, fromScalar(0))
}

func Median(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, .5)
return reduce(e, T, series, percentile, fromScalar(.5))
}

func Max(e *State, T miniprofiler.Timer, series *Results) (r *Results, err error) {
return reduce(e, T, series, percentile, 1)
return reduce(e, T, series, percentile, fromScalar(1))
}

// percentile returns the value at the corresponding percentile between 0 and 1.
Expand Down
4 changes: 3 additions & 1 deletion cmd/bosun/expr/parse/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func (f *FuncNode) Check(t *Tree) error {
for i, a := range f.Args {
ft := f.F.Args[i]
at := a.Return()
if ft != at {
if ft == TypeNumberSet && at == TypeScalar {
// Scalars are promoted to NumberSets during execution.
} else if ft != at {
return fmt.Errorf("parse: expected %v, got %v", ft, at)
}
if err := a.Check(t); err != nil {
Expand Down
15 changes: 7 additions & 8 deletions docs/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This section documents Bosun's expression language, which is used to define the
There are three data types in Bosun's expression language:

1. **Scalar**: This is the simplest type, it is a single numeric value with no group associated with it. Keep in mind that an empty group, `{}` is still a group.
2. **NumberSet**: A number set is a group of tagged numeric values with one value per unique grouping.
2. **NumberSet**: A number set is a group of tagged numeric values with one value per unique grouping. As a special case, a **scalar** may be used in place of a **numberSet** with a single member with an empty group.
3. **SeriesSet**: A series is an array of timestamp-value pairs and an associated group.

In the vast majority of your alerts you will getting ***seriesSets*** back from your time series database and ***reducing*** them into ***numberSets***.
Expand Down Expand Up @@ -205,7 +205,7 @@ Diff returns the last point of each series minus the first point.

Returns the first (least recent) data point in each series.

## forecastlr(seriesSet, y_val scalar) numberSet
## forecastlr(seriesSet, y_val numberSet|scalar) numberSet

Returns the number of seconds until a linear regression of each series will reach y_val.

Expand All @@ -229,7 +229,7 @@ Returns the median value of each series, same as calling percentile(series, .5).

Returns the minimum value of each series, same as calling percentile(series, 0).

## percentile(seriesSet, p scalar) numberSet
## percentile(seriesSet, p numberSet|scalar) numberSet

Returns the value from each series at the percentile p. Min and Max can be simulated using `p <= 0` and `p >= 1`, respectively.

Expand Down Expand Up @@ -344,23 +344,22 @@ Returns series smoothed using Holt-Winters double exponential smoothing. Alpha
(scalar) is the data smoothing factor. Beta (scalar) is the trend smoothing
factor.

## dropg(seriesSet, scalar) seriesSet
## dropg(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values greater than number from a series. Will error if this operation results in an empty series.

## dropge(seriesSet, scalar) seriesSet
## dropge(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values greater than or equal to number from a series. Will error if this operation results in an empty series.

## dropl(seriesSet, scalar) seriesSet
## dropl(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values lower than number from a series. Will error if this operation results in an empty series.

## drople(seriesSet, scalar) seriesSet
## drople(seriesSet, threshold numberSet|scalar) seriesSet

Remove any values lower than or equal to number from a series. Will error if this operation results in an empty series.


## dropna(seriesSet) seriesSet

Remove any NaN or Inf values from a series. Will error if this operation results in an empty series.
Expand Down