这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/bosun/dev.sample.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
tsdbHost = host:[port]
graphiteHost =
influxHost = host:[port]
smtpHost = host:[port]
emailFrom = bosun@example.org
httpListen = :8070
Expand Down Expand Up @@ -47,6 +48,13 @@ alert example.opentsdb.os.high.cpu {
crit = $q >= 1
}

alert example.influx.os.high.cpu {
$db = "opentsdb"
$q = avg(influx($db, '''select derivative(mean(value), 1s) from "os.cpu" GROUP BY host''', "2m", "", "15s")
warn = $q > 20
crit = $q >= 1
}

lookup cpu {
entry host=a,remote=b {
high = 1
Expand All @@ -69,4 +77,3 @@ alert example.opentsdb.cpu.lookup {
alert example.graphite {
crit = avg(graphite("*.cpu.*.cpu.user", "5m", "", "host..cpu")) > 1
}

52 changes: 42 additions & 10 deletions cmd/bosun/expr/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@ import (
// Influx is a map of functions to query InfluxDB.
var Influx = map[string]parse.Func{
"influx": {
Args: []parse.FuncType{parse.TypeString, parse.TypeString, parse.TypeString, parse.TypeString},
Args: []parse.FuncType{parse.TypeString, parse.TypeString, parse.TypeString, parse.TypeString, parse.TypeString},
Return: parse.TypeSeriesSet,
Tags: influxTag,
F: InfluxQuery,
},
}

func influxTag(args []parse.Node) (parse.Tags, error) {
n := args[4].(*parse.StringNode)
t := make(parse.Tags)
for _, k := range strings.Split(n.Text, ",") {
t[k] = struct{}{}
st, err := influxql.ParseStatement(args[1].(*parse.StringNode).Text)
if err != nil {
return nil, err
}
s, ok := st.(*influxql.SelectStatement)
if !ok {
return nil, fmt.Errorf("influx: expected select statement")
}

t := make(parse.Tags, len(s.Dimensions))
for _, d := range s.Dimensions {
if _, ok := d.Expr.(*influxql.Call); ok {
continue
}
t[d.String()] = struct{}{}
}
return t, nil
}

func InfluxQuery(e *State, T miniprofiler.Timer, db, query, startDuration, endDuration string) (*Results, error) {
qres, err := timeInfluxRequest(e, T, db, query, startDuration, endDuration)
func InfluxQuery(e *State, T miniprofiler.Timer, db, query, startDuration, endDuration, groupByInterval string) (*Results, error) {
qres, err := timeInfluxRequest(e, T, db, query, startDuration, endDuration, groupByInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,7 +91,7 @@ func InfluxQuery(e *State, T miniprofiler.Timer, db, query, startDuration, endDu
}

// influxQueryDuration adds time WHERE clauses to query for the given start and end durations.
func influxQueryDuration(now time.Time, query, start, end string) (string, error) {
func influxQueryDuration(now time.Time, query, start, end, groupByInterval string) (string, error) {
sd, err := opentsdb.ParseDuration(start)
if err != nil {
return "", err
Expand Down Expand Up @@ -151,11 +162,32 @@ func influxQueryDuration(now time.Time, query, start, end string) (string, error
}
}

// parse last argument
if len(groupByInterval) > 0 {
gbi, err := time.ParseDuration(groupByInterval)
if err != nil {
return "", err
}
s.Dimensions = append(s.Dimensions,
&influxql.Dimension{Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{&influxql.DurationLiteral{Val: gbi}},
},
})
}

// emtpy aggregate windows should be purged from the result
// this default resembles the opentsdb results.
if s.Fill == influxql.NullFill {
s.Fill = influxql.NoFill
s.FillValue = nil
}

return s.String(), nil
}

func timeInfluxRequest(e *State, T miniprofiler.Timer, db, query, startDuration, endDuration string) (s []influxql.Row, err error) {
q, err := influxQueryDuration(e.now, query, startDuration, endDuration)
func timeInfluxRequest(e *State, T miniprofiler.Timer, db, query, startDuration, endDuration, groupByInterval string) (s []influxql.Row, err error) {
q, err := influxQueryDuration(e.now, query, startDuration, endDuration, groupByInterval)
if err != nil {
return nil, err
}
Expand Down
19 changes: 12 additions & 7 deletions cmd/bosun/expr/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const influxTimeFmt = time.RFC3339Nano
func TestInfluxQueryDuration(t *testing.T) {
type influxTest struct {
query string
gbi string // group by interval
expect string // empty for error
}
date := time.Date(2000, time.January, 1, 2, 0, 0, 0, time.UTC)
Expand All @@ -22,20 +23,24 @@ func TestInfluxQueryDuration(t *testing.T) {
start := date.Add(-dur).Format(influxTimeFmt)
tests := []influxTest{
{
"select * from a",
fmt.Sprintf("SELECT * FROM a WHERE time >= '%s' AND time <= '%s'", start, end),
"select * from a", "",
fmt.Sprintf("SELECT * FROM a WHERE time >= '%s' AND time <= '%s' fill(none)", start, end),
},
{
"select * from a WHERE value > 0",
fmt.Sprintf("SELECT * FROM a WHERE value > 0.000 AND time >= '%s' AND time <= '%s'", start, end),
"select * from a WHERE value > 0", "",
fmt.Sprintf("SELECT * FROM a WHERE value > 0.000 AND time >= '%s' AND time <= '%s' fill(none)", start, end),
},
{
"select * from a WHERE time > 0",
"select * from a WHERE value > 0", "15m",
fmt.Sprintf("SELECT * FROM a WHERE value > 0.000 AND time >= '%s' AND time <= '%s' GROUP BY time(15m) fill(none)", start, end),
},
{
"select * from a WHERE time > 0 fill(none)", "",
"",
},
}
for _, test := range tests {
q, err := influxQueryDuration(date, test.query, dur.String(), "")
q, err := influxQueryDuration(date, test.query, dur.String(), "", test.gbi)
if err != nil && test.expect != "" {
t.Errorf("%v: unexpected error: %v", test.query, err)
} else if q != test.expect {
Expand All @@ -52,7 +57,7 @@ func TestInfluxQuery(t *testing.T) {
return false
},
}
_, err := InfluxQuery(&e, new(miniprofiler.Profile), "db", "select * from alh limit 10", "1n", "")
_, err := InfluxQuery(&e, new(miniprofiler.Profile), "db", "select * from alh limit 10", "1n", "", "")
if err == nil {
t.Fatal("Should have received an error from InfluxQuery")
}
Expand Down
30 changes: 25 additions & 5 deletions docs/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,32 @@ Like band() but for graphite queries.

## InfluxDB Query Functions

### influx(db string, query string, startDuration string, endDuration string) seriesSet
### influx(db string, query string, startDuration string, endDuration, groupByInterval string) seriesSet

Queries with influxql query on database db from startDuration ago to
endDuration ago. WHERE clases for `time` are inserted automatically, and
it is thus an error to specify `time` conditions in query. All tags returned
by InfluxDB will be included in the results.
Queries InfluxDB.

All tags returned by InfluxDB will be included in the results.

* `db` is the database name in InfluxDB
* `query` is an InfluxDB select statement
NB: WHERE clauses for `time` are inserted automatically, and it is thus an error to specify `time` conditions in query.
* `startDuration` and `endDuration` set the time window from now - see the OpenTSDB q() function for more details
They will be merged into the existing WHERE clause in the `query`.
* `groupByInterval` is the `time.Duration` window which will be passed as an argument to a GROUP BY time() clause if given. This groups values in the given time buckets. This groups (or in OpenTSDB lingo "downsamples") the results to this timeframe. [Full documentation on Group by](https://influxdb.com/docs/v0.9/query_language/data_exploration.html#group-by).

### Notes:

* By default, queries will be given a suffix of `fill(none)` to filter out any nil rows.

## examples:

These influx and opentsdb queries should give roughly the same results:

```
influx("db", '''SELECT non_negative_derivative(mean(value)) FROM "os.cpu" GROUP BY host''', "30m", "", "2m")

q("sum:2m-avg:rate{counter,,1}:os.cpu{host=*}", "30m", "")
```

## Logstash Query Functions

Expand Down