这是indexloc提供的服务,不要输入任何密码
Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/bosun/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,11 @@ func (c *Conf) Funcs() map[string]eparse.Func {
var tags []opentsdb.TagSet
for _, tag := range lookups.Tags {
var next []opentsdb.TagSet
for _, value := range e.Search.TagValuesByTagKey(tag, 0) {
vals, err := e.Search.TagValuesByTagKey(tag, 0)
if err != nil {
return nil, err
}
for _, value := range vals {
for _, s := range tags {
t := s.Copy()
t[tag] = value
Expand Down
40 changes: 31 additions & 9 deletions cmd/bosun/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bosun.org/_third_party/github.com/siddontang/ledisdb/config"
"bosun.org/_third_party/github.com/siddontang/ledisdb/server"
"bosun.org/collect"
"bosun.org/metadata"
"bosun.org/opentsdb"
)

Expand All @@ -24,6 +25,18 @@ type DataAccess interface {
PutTagMetadata(tags opentsdb.TagSet, name string, value string, updated time.Time) error
GetTagMetadata(tags opentsdb.TagSet, name string) ([]*TagMetadata, error)
DeleteTagMetadata(tags opentsdb.TagSet, name string) error

Search_AddMetricForTag(tagK, tagV, metric string, time int64) error
Search_GetMetricsForTag(tagK, tagV string) (map[string]int64, error)

Search_AddTagKeyForMetric(metric, tagK string, time int64) error
Search_GetTagKeysForMetric(metric string) (map[string]int64, error)

Search_AddMetric(metric string, time int64) error
Search_GetAllMetrics() (map[string]int64, error)

Search_AddTagValue(metric, tagK, tagV string, time int64) error
Search_GetTagValues(metric, tagK string) (map[string]int64, error)
}

type dataAccess struct {
Expand All @@ -37,7 +50,10 @@ func NewDataAccess(addr string, isRedis bool) DataAccess {
}

func newDataAccess(addr string, isRedis bool) *dataAccess {
return &dataAccess{pool: newPool(addr, "", 0, isRedis), isRedis: isRedis}
return &dataAccess{
pool: newPool(addr, "", 0, isRedis, 1000, true),
isRedis: isRedis,
}
}

// Start in-process ledis server. Data will go in the specified directory and it will bind to the given port.
Expand All @@ -56,13 +72,20 @@ func StartLedis(dataDir string, bind string) (stop func(), err error) {
return app.Close, nil
}

func (d *dataAccess) getConnection() redis.Conn {
//interface so things can get a raw connection (mostly tests), but still discourage it.
type Connector interface {
GetConnection() redis.Conn
}

func (d *dataAccess) GetConnection() redis.Conn {
return d.pool.Get()
}

func newPool(server, password string, database int, isRedis bool) *redis.Pool {
func newPool(server, password string, database int, isRedis bool, maxActive int, wait bool) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
MaxIdle: 50,
MaxActive: maxActive,
Wait: wait,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server, redis.DialDatabase(database))
Expand All @@ -83,10 +106,9 @@ func newPool(server, password string, database int, isRedis bool) *redis.Pool {
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "Ping"})()
_, err := c.Do("PING")
return err
},
}
}

func init() {
collect.AggregateMeta("bosun.redis", metadata.MilliSecond, "time in milliseconds per redis call.")
}
65 changes: 0 additions & 65 deletions cmd/bosun/database/database_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/bosun/database/metric_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func (d *dataAccess) PutMetricMetadata(metric string, field string, value string
if field != "desc" && field != "unit" && field != "rate" {
return fmt.Errorf("Unknown metric metadata field: %s", field)
}
conn := d.getConnection()
conn := d.GetConnection()
defer conn.Close()
_, err := conn.Do("HMSET", metricMetaKey(metric), field, value, "lastTouched", time.Now().UTC().Unix())
return err
}

func (d *dataAccess) GetMetricMetadata(metric string) (*MetricMetadata, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetMetricMeta"})()
conn := d.getConnection()
conn := d.GetConnection()
defer conn.Close()
v, err := redis.Values(conn.Do("HGETALL", metricMetaKey(metric)))
if err != nil {
Expand Down
119 changes: 119 additions & 0 deletions cmd/bosun/database/search_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package database

import (
"bosun.org/collect"
"bosun.org/opentsdb"
"fmt"
"strconv"

"bosun.org/_third_party/github.com/garyburd/redigo/redis"
)

/*
Search data in redis:

Metrics by tags:
search:metrics:{tagk}={tagv} -> hash of metric name to timestamp

Tag keys by metric:
search:tagk:{metric} -> hash of tag key to timestamp

Tag Values By metric/tag key
search:tagv:{metric}:{tagk} -> hash of tag value to timestamp
metric "__all__" is a special key that will hold all values for the tag key, regardless of metric

All Metrics:
search:allMetrics -> hash of metric name to timestamp
*/

const Search_All = "__all__"
const searchAllMetricsKey = "search:allMetrics"

func searchMetricKey(tagK, tagV string) string {
return fmt.Sprintf("search:metrics:%s=%s", tagK, tagV)
}
func searchTagkKey(metric string) string {
return fmt.Sprintf("search:tagk:%s", metric)
}
func searchTagvKey(metric, tagK string) string {
return fmt.Sprintf("search:tagv:%s:%s", metric, tagK)
}

func (d *dataAccess) Search_AddMetricForTag(tagK, tagV, metric string, time int64) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddMetricForTag"})()
conn := d.GetConnection()
defer conn.Close()

_, err := conn.Do("HSET", searchMetricKey(tagK, tagV), metric, time)
return err
}

func (d *dataAccess) Search_GetMetricsForTag(tagK, tagV string) (map[string]int64, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetMetricsForTag"})()
conn := d.GetConnection()
defer conn.Close()

return stringInt64Map(conn.Do("HGETALL", searchMetricKey(tagK, tagV)))
}

func stringInt64Map(d interface{}, err error) (map[string]int64, error) {
vals, err := redis.Strings(d, err)
if err != nil {
return nil, err
}
result := make(map[string]int64)
for i := 1; i < len(vals); i += 2 {
time, _ := strconv.ParseInt(vals[i], 10, 64)
result[vals[i-1]] = time
}
return result, err
}

func (d *dataAccess) Search_AddTagKeyForMetric(metric, tagK string, time int64) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddTagKeyForMetric"})()
conn := d.GetConnection()
defer conn.Close()

_, err := conn.Do("HSET", searchTagkKey(metric), tagK, time)
return err
}

func (d *dataAccess) Search_GetTagKeysForMetric(metric string) (map[string]int64, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagKeysForMetric"})()
conn := d.GetConnection()
defer conn.Close()

return stringInt64Map(conn.Do("HGETALL", searchTagkKey(metric)))
}

func (d *dataAccess) Search_AddMetric(metric string, time int64) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddMetric"})()
conn := d.GetConnection()
defer conn.Close()

_, err := conn.Do("HSET", searchAllMetricsKey, metric, time)
return err
}
func (d *dataAccess) Search_GetAllMetrics() (map[string]int64, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetAllMetrics"})()
conn := d.GetConnection()
defer conn.Close()

return stringInt64Map(conn.Do("HGETALL", searchAllMetricsKey))
}

func (d *dataAccess) Search_AddTagValue(metric, tagK, tagV string, time int64) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddTagValue"})()
conn := d.GetConnection()
defer conn.Close()

_, err := conn.Do("HSET", searchTagvKey(metric, tagK), tagV, time)
return err
}
func (d *dataAccess) Search_GetTagValues(metric, tagK string) (map[string]int64, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagValues"})()
conn := d.GetConnection()
defer conn.Close()

return stringInt64Map(conn.Do("HGETALL", searchTagvKey(metric, tagK)))
}
6 changes: 3 additions & 3 deletions cmd/bosun/database/tag_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func tagMetaIdxKey(tagK, tagV string) string {

func (d *dataAccess) PutTagMetadata(tags opentsdb.TagSet, name string, value string, updated time.Time) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "PutTagMeta"})()
conn := d.getConnection()
conn := d.GetConnection()
defer conn.Close()
key := tagMetaKey(tags, name)
keyValue := fmt.Sprintf("%d:%s", updated.UTC().Unix(), value)
Expand All @@ -49,7 +49,7 @@ func (d *dataAccess) PutTagMetadata(tags opentsdb.TagSet, name string, value str

func (d *dataAccess) DeleteTagMetadata(tags opentsdb.TagSet, name string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "DeleteTagMeta"})()
conn := d.getConnection()
conn := d.GetConnection()
defer conn.Close()
key := tagMetaKey(tags, name)
_, err := conn.Do("DEL", key)
Expand All @@ -67,7 +67,7 @@ func (d *dataAccess) DeleteTagMetadata(tags opentsdb.TagSet, name string) error

func (d *dataAccess) GetTagMetadata(tags opentsdb.TagSet, name string) ([]*TagMetadata, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagMeta"})()
conn := d.getConnection()
conn := d.GetConnection()
defer conn.Close()
args := []interface{}{}
for tagK, tagV := range tags {
Expand Down
33 changes: 33 additions & 0 deletions cmd/bosun/database/test/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dbtest

import (
"math/rand"
"os"
"testing"
"time"

"bosun.org/cmd/bosun/database"
)

// data access object to use for all unit tests. Pointed at ephemeral ledis, or redis server passed in with --redis=addr
var testData database.DataAccess

func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
var closeF func()
testData, closeF = StartTestRedis()
status := m.Run()
closeF()
os.Exit(status)
}

var cleanups = []func(){}

// use random keys in tests to avoid conflicting test data.
func randString(l int) string {
s := ""
for len(s) < l {
s += string("abcdefghijklmnopqrstuvwxyz"[rand.Intn(26)])
}
return s
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package dbtest

import (
"testing"
Expand Down
23 changes: 23 additions & 0 deletions cmd/bosun/database/test/search_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dbtest

import (
"testing"
)

func TestSearch_Metric_RoundTrip(t *testing.T) {
host := randString(5)
err := testData.Search_AddMetricForTag("host", host, "os.cpu", 42)
if err != nil {
t.Fatal(err)
}

metrics, err := testData.Search_GetMetricsForTag("host", host)
if err != nil {
t.Fatal(err)
}
if time, ok := metrics["os.cpu"]; !ok {
t.Fatal("Expected to find os.cpu. I didn't.")
} else if time != 42 {
t.Fatalf("Expected timestamp of 42. Got %d", time)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package database
package dbtest

import (
"testing"
Expand Down
Loading