diff --git a/cmd/bosun/conf/conf.go b/cmd/bosun/conf/conf.go index e4491420c5..18a71aec35 100644 --- a/cmd/bosun/conf/conf.go +++ b/cmd/bosun/conf/conf.go @@ -1181,7 +1181,11 @@ func (c *Conf) Funcs() map[string]eparse.Func { var tags []opentsdb.TagSet for _, tag := range lookups.Tags { var next []opentsdb.TagSet - for _, value := range e.Search.TagValuesByTagKey(tag, 0) { + vals, err := e.Search.TagValuesByTagKey(tag, 0) + if err != nil { + return nil, err + } + for _, value := range vals { for _, s := range tags { t := s.Copy() t[tag] = value diff --git a/cmd/bosun/database/database.go b/cmd/bosun/database/database.go index d73cf82e1a..f9e0960441 100644 --- a/cmd/bosun/database/database.go +++ b/cmd/bosun/database/database.go @@ -11,6 +11,7 @@ import ( "bosun.org/_third_party/github.com/siddontang/ledisdb/config" "bosun.org/_third_party/github.com/siddontang/ledisdb/server" "bosun.org/collect" + "bosun.org/metadata" "bosun.org/opentsdb" ) @@ -24,6 +25,18 @@ type DataAccess interface { PutTagMetadata(tags opentsdb.TagSet, name string, value string, updated time.Time) error GetTagMetadata(tags opentsdb.TagSet, name string) ([]*TagMetadata, error) DeleteTagMetadata(tags opentsdb.TagSet, name string) error + + Search_AddMetricForTag(tagK, tagV, metric string, time int64) error + Search_GetMetricsForTag(tagK, tagV string) (map[string]int64, error) + + Search_AddTagKeyForMetric(metric, tagK string, time int64) error + Search_GetTagKeysForMetric(metric string) (map[string]int64, error) + + Search_AddMetric(metric string, time int64) error + Search_GetAllMetrics() (map[string]int64, error) + + Search_AddTagValue(metric, tagK, tagV string, time int64) error + Search_GetTagValues(metric, tagK string) (map[string]int64, error) } type dataAccess struct { @@ -37,7 +50,10 @@ func NewDataAccess(addr string, isRedis bool) DataAccess { } func newDataAccess(addr string, isRedis bool) *dataAccess { - return &dataAccess{pool: newPool(addr, "", 0, isRedis), isRedis: isRedis} + return &dataAccess{ + pool: newPool(addr, "", 0, isRedis, 1000, true), + isRedis: isRedis, + } } // Start in-process ledis server. Data will go in the specified directory and it will bind to the given port. @@ -56,13 +72,20 @@ func StartLedis(dataDir string, bind string) (stop func(), err error) { return app.Close, nil } -func (d *dataAccess) getConnection() redis.Conn { +//interface so things can get a raw connection (mostly tests), but still discourage it. +type Connector interface { + GetConnection() redis.Conn +} + +func (d *dataAccess) GetConnection() redis.Conn { return d.pool.Get() } -func newPool(server, password string, database int, isRedis bool) *redis.Pool { +func newPool(server, password string, database int, isRedis bool, maxActive int, wait bool) *redis.Pool { return &redis.Pool{ - MaxIdle: 3, + MaxIdle: 50, + MaxActive: maxActive, + Wait: wait, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", server, redis.DialDatabase(database)) @@ -83,10 +106,9 @@ func newPool(server, password string, database int, isRedis bool) *redis.Pool { } return c, err }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - defer collect.StartTimer("redis", opentsdb.TagSet{"op": "Ping"})() - _, err := c.Do("PING") - return err - }, } } + +func init() { + collect.AggregateMeta("bosun.redis", metadata.MilliSecond, "time in milliseconds per redis call.") +} diff --git a/cmd/bosun/database/database_test.go b/cmd/bosun/database/database_test.go deleted file mode 100644 index 95f56cf197..0000000000 --- a/cmd/bosun/database/database_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package database - -import ( - "flag" - "fmt" - "log" - "math/rand" - "os" - "path/filepath" - "testing" - "time" -) - -// data access object to use for all unit tests. Pointed at ephemeral ledis, or redis server passed in with --redis=addr -var testData *dataAccess - -var flagReddisHost = flag.String("redis", "", "redis server to test against") -var flagFlushRedis = flag.Bool("flush", false, "flush database before tests. DANGER!") - -func TestMain(m *testing.M) { - flag.Parse() - rand.Seed(time.Now().UnixNano()) - // For redis tests we just point at an external server. - if *flagReddisHost != "" { - testData = newDataAccess(*flagReddisHost, true) - if *flagFlushRedis { - log.Println("FLUSHING REDIS") - c := testData.getConnection() - _, err := c.Do("FLUSHDB") - if err != nil { - log.Fatal(err) - } - } - } else { - // To test ledis, start a local instance in a new tmp dir. We will attempt to delete it when we're done. - addr := "127.0.0.1:9876" - testPath := filepath.Join(os.TempDir(), "bosun_ledis_test", fmt.Sprint(time.Now().Unix())) - log.Println(testPath) - stop, err := StartLedis(testPath, addr) - if err != nil { - log.Fatal(err) - } - testData = newDataAccess(addr, false) - cleanups = append(cleanups, func() { - stop() - os.RemoveAll(testPath) - }) - } - status := m.Run() - for _, c := range cleanups { - c() - } - os.Exit(status) -} - -var cleanups = []func(){} - -// use random keys in tests to avoid conflicting test data. -func randString(l int) string { - s := "" - for len(s) < l { - s += string("abcdefghijklmnopqrstuvwxyz"[rand.Intn(26)]) - } - return s -} diff --git a/cmd/bosun/database/metric_metadata.go b/cmd/bosun/database/metric_metadata.go index 53ec57b496..c66350afa2 100644 --- a/cmd/bosun/database/metric_metadata.go +++ b/cmd/bosun/database/metric_metadata.go @@ -30,7 +30,7 @@ func (d *dataAccess) PutMetricMetadata(metric string, field string, value string if field != "desc" && field != "unit" && field != "rate" { return fmt.Errorf("Unknown metric metadata field: %s", field) } - conn := d.getConnection() + conn := d.GetConnection() defer conn.Close() _, err := conn.Do("HMSET", metricMetaKey(metric), field, value, "lastTouched", time.Now().UTC().Unix()) return err @@ -38,7 +38,7 @@ func (d *dataAccess) PutMetricMetadata(metric string, field string, value string func (d *dataAccess) GetMetricMetadata(metric string) (*MetricMetadata, error) { defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetMetricMeta"})() - conn := d.getConnection() + conn := d.GetConnection() defer conn.Close() v, err := redis.Values(conn.Do("HGETALL", metricMetaKey(metric))) if err != nil { diff --git a/cmd/bosun/database/search_data.go b/cmd/bosun/database/search_data.go new file mode 100644 index 0000000000..01a1d3654c --- /dev/null +++ b/cmd/bosun/database/search_data.go @@ -0,0 +1,119 @@ +package database + +import ( + "bosun.org/collect" + "bosun.org/opentsdb" + "fmt" + "strconv" + + "bosun.org/_third_party/github.com/garyburd/redigo/redis" +) + +/* +Search data in redis: + +Metrics by tags: +search:metrics:{tagk}={tagv} -> hash of metric name to timestamp + +Tag keys by metric: +search:tagk:{metric} -> hash of tag key to timestamp + +Tag Values By metric/tag key +search:tagv:{metric}:{tagk} -> hash of tag value to timestamp +metric "__all__" is a special key that will hold all values for the tag key, regardless of metric + +All Metrics: +search:allMetrics -> hash of metric name to timestamp +*/ + +const Search_All = "__all__" +const searchAllMetricsKey = "search:allMetrics" + +func searchMetricKey(tagK, tagV string) string { + return fmt.Sprintf("search:metrics:%s=%s", tagK, tagV) +} +func searchTagkKey(metric string) string { + return fmt.Sprintf("search:tagk:%s", metric) +} +func searchTagvKey(metric, tagK string) string { + return fmt.Sprintf("search:tagv:%s:%s", metric, tagK) +} + +func (d *dataAccess) Search_AddMetricForTag(tagK, tagV, metric string, time int64) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddMetricForTag"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("HSET", searchMetricKey(tagK, tagV), metric, time) + return err +} + +func (d *dataAccess) Search_GetMetricsForTag(tagK, tagV string) (map[string]int64, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetMetricsForTag"})() + conn := d.GetConnection() + defer conn.Close() + + return stringInt64Map(conn.Do("HGETALL", searchMetricKey(tagK, tagV))) +} + +func stringInt64Map(d interface{}, err error) (map[string]int64, error) { + vals, err := redis.Strings(d, err) + if err != nil { + return nil, err + } + result := make(map[string]int64) + for i := 1; i < len(vals); i += 2 { + time, _ := strconv.ParseInt(vals[i], 10, 64) + result[vals[i-1]] = time + } + return result, err +} + +func (d *dataAccess) Search_AddTagKeyForMetric(metric, tagK string, time int64) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddTagKeyForMetric"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("HSET", searchTagkKey(metric), tagK, time) + return err +} + +func (d *dataAccess) Search_GetTagKeysForMetric(metric string) (map[string]int64, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagKeysForMetric"})() + conn := d.GetConnection() + defer conn.Close() + + return stringInt64Map(conn.Do("HGETALL", searchTagkKey(metric))) +} + +func (d *dataAccess) Search_AddMetric(metric string, time int64) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddMetric"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("HSET", searchAllMetricsKey, metric, time) + return err +} +func (d *dataAccess) Search_GetAllMetrics() (map[string]int64, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetAllMetrics"})() + conn := d.GetConnection() + defer conn.Close() + + return stringInt64Map(conn.Do("HGETALL", searchAllMetricsKey)) +} + +func (d *dataAccess) Search_AddTagValue(metric, tagK, tagV string, time int64) error { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "AddTagValue"})() + conn := d.GetConnection() + defer conn.Close() + + _, err := conn.Do("HSET", searchTagvKey(metric, tagK), tagV, time) + return err +} +func (d *dataAccess) Search_GetTagValues(metric, tagK string) (map[string]int64, error) { + defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagValues"})() + conn := d.GetConnection() + defer conn.Close() + + return stringInt64Map(conn.Do("HGETALL", searchTagvKey(metric, tagK))) +} diff --git a/cmd/bosun/database/tag_metadata.go b/cmd/bosun/database/tag_metadata.go index 0ac877fde0..7d7901db16 100644 --- a/cmd/bosun/database/tag_metadata.go +++ b/cmd/bosun/database/tag_metadata.go @@ -30,7 +30,7 @@ func tagMetaIdxKey(tagK, tagV string) string { func (d *dataAccess) PutTagMetadata(tags opentsdb.TagSet, name string, value string, updated time.Time) error { defer collect.StartTimer("redis", opentsdb.TagSet{"op": "PutTagMeta"})() - conn := d.getConnection() + conn := d.GetConnection() defer conn.Close() key := tagMetaKey(tags, name) keyValue := fmt.Sprintf("%d:%s", updated.UTC().Unix(), value) @@ -49,7 +49,7 @@ func (d *dataAccess) PutTagMetadata(tags opentsdb.TagSet, name string, value str func (d *dataAccess) DeleteTagMetadata(tags opentsdb.TagSet, name string) error { defer collect.StartTimer("redis", opentsdb.TagSet{"op": "DeleteTagMeta"})() - conn := d.getConnection() + conn := d.GetConnection() defer conn.Close() key := tagMetaKey(tags, name) _, err := conn.Do("DEL", key) @@ -67,7 +67,7 @@ func (d *dataAccess) DeleteTagMetadata(tags opentsdb.TagSet, name string) error func (d *dataAccess) GetTagMetadata(tags opentsdb.TagSet, name string) ([]*TagMetadata, error) { defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetTagMeta"})() - conn := d.getConnection() + conn := d.GetConnection() defer conn.Close() args := []interface{}{} for tagK, tagV := range tags { diff --git a/cmd/bosun/database/test/database_test.go b/cmd/bosun/database/test/database_test.go new file mode 100644 index 0000000000..6a65246747 --- /dev/null +++ b/cmd/bosun/database/test/database_test.go @@ -0,0 +1,33 @@ +package dbtest + +import ( + "math/rand" + "os" + "testing" + "time" + + "bosun.org/cmd/bosun/database" +) + +// data access object to use for all unit tests. Pointed at ephemeral ledis, or redis server passed in with --redis=addr +var testData database.DataAccess + +func TestMain(m *testing.M) { + rand.Seed(time.Now().UnixNano()) + var closeF func() + testData, closeF = StartTestRedis() + status := m.Run() + closeF() + os.Exit(status) +} + +var cleanups = []func(){} + +// use random keys in tests to avoid conflicting test data. +func randString(l int) string { + s := "" + for len(s) < l { + s += string("abcdefghijklmnopqrstuvwxyz"[rand.Intn(26)]) + } + return s +} diff --git a/cmd/bosun/database/metric_metadata_test.go b/cmd/bosun/database/test/metric_metadata_test.go similarity index 98% rename from cmd/bosun/database/metric_metadata_test.go rename to cmd/bosun/database/test/metric_metadata_test.go index b1fd639e65..811c94b99a 100644 --- a/cmd/bosun/database/metric_metadata_test.go +++ b/cmd/bosun/database/test/metric_metadata_test.go @@ -1,4 +1,4 @@ -package database +package dbtest import ( "testing" diff --git a/cmd/bosun/database/test/search_data_test.go b/cmd/bosun/database/test/search_data_test.go new file mode 100644 index 0000000000..a5c26d135f --- /dev/null +++ b/cmd/bosun/database/test/search_data_test.go @@ -0,0 +1,23 @@ +package dbtest + +import ( + "testing" +) + +func TestSearch_Metric_RoundTrip(t *testing.T) { + host := randString(5) + err := testData.Search_AddMetricForTag("host", host, "os.cpu", 42) + if err != nil { + t.Fatal(err) + } + + metrics, err := testData.Search_GetMetricsForTag("host", host) + if err != nil { + t.Fatal(err) + } + if time, ok := metrics["os.cpu"]; !ok { + t.Fatal("Expected to find os.cpu. I didn't.") + } else if time != 42 { + t.Fatalf("Expected timestamp of 42. Got %d", time) + } +} diff --git a/cmd/bosun/database/tag_metadata_test.go b/cmd/bosun/database/test/tag_metadata_test.go similarity index 99% rename from cmd/bosun/database/tag_metadata_test.go rename to cmd/bosun/database/test/tag_metadata_test.go index 644ea59425..c621eeb54b 100644 --- a/cmd/bosun/database/tag_metadata_test.go +++ b/cmd/bosun/database/test/tag_metadata_test.go @@ -1,4 +1,4 @@ -package database +package dbtest import ( "testing" diff --git a/cmd/bosun/database/test/testSetup.go b/cmd/bosun/database/test/testSetup.go new file mode 100644 index 0000000000..64635c92ec --- /dev/null +++ b/cmd/bosun/database/test/testSetup.go @@ -0,0 +1,46 @@ +package dbtest + +import ( + "flag" + "fmt" + "log" + "os" + "path/filepath" + "time" + + "bosun.org/cmd/bosun/database" +) + +var flagReddisHost = flag.String("redis", "", "redis server to test against") +var flagFlushRedis = flag.Bool("flush", false, "flush database before tests. DANGER!") + +func StartTestRedis() (database.DataAccess, func()) { + flag.Parse() + // For redis tests we just point at an external server. + if *flagReddisHost != "" { + testData := database.NewDataAccess(*flagReddisHost, true) + if *flagFlushRedis { + log.Println("FLUSHING REDIS") + c := testData.(database.Connector).GetConnection() + defer c.Close() + _, err := c.Do("FLUSHDB") + if err != nil { + log.Fatal(err) + } + } + return testData, func() {} + } + // To test ledis, start a local instance in a new tmp dir. We will attempt to delete it when we're done. + addr := "127.0.0.1:9876" + testPath := filepath.Join(os.TempDir(), "bosun_ledis_test", fmt.Sprint(time.Now().Unix())) + log.Println(testPath) + stop, err := database.StartLedis(testPath, addr) + if err != nil { + log.Fatal(err) + } + testData := database.NewDataAccess(addr, false) + return testData, func() { + stop() + os.RemoveAll(testPath) + } +} diff --git a/cmd/bosun/sched/bolt.go b/cmd/bosun/sched/bolt.go index 2f9569aa19..e3385d049c 100644 --- a/cmd/bosun/sched/bolt.go +++ b/cmd/bosun/sched/bolt.go @@ -43,9 +43,6 @@ func (c *counterWriter) Write(p []byte) (n int, err error) { const ( dbBucket = "bindata" dbConfigTextBucket = "configText" - dbMetric = "metric" - dbTagk = "tagk" - dbTagv = "tagv" dbNotifications = "notifications" dbSilence = "silence" dbStatus = "status" @@ -59,9 +56,6 @@ func (s *Schedule) save() { } s.Lock("Save") store := map[string]interface{}{ - dbMetric: s.Search.Read.Metric, - dbTagk: s.Search.Read.Tagk, - dbTagv: s.Search.Read.Tagv, dbNotifications: s.Notifications, dbSilence: s.Silence, dbStatus: s.status, @@ -148,16 +142,6 @@ func (s *Schedule) RestoreState() error { s.Notifications = nil db := s.db - - if err := decode(db, dbMetric, &s.Search.Metric); err != nil { - slog.Errorln(dbMetric, err) - } - if err := decode(db, dbTagk, &s.Search.Tagk); err != nil { - slog.Errorln(dbTagk, err) - } - if err := decode(db, dbTagv, &s.Search.Tagv); err != nil { - slog.Errorln(dbTagv, err) - } notifications := make(map[expr.AlertKey]map[string]time.Time) if err := decode(db, dbNotifications, ¬ifications); err != nil { slog.Errorln(dbNotifications, err) @@ -240,7 +224,6 @@ func (s *Schedule) RestoreState() error { migrateOldDataToRedis(db, s.DataAccess) // delete metrictags if they exist. deleteKey(s.db, "metrictags") - s.Search.Copy() slog.Infoln("RestoreState done in", time.Since(start)) return nil } @@ -304,19 +287,30 @@ func (s *Schedule) GetStateFileBackup() ([]byte, error) { } func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { - // metadata-metric + if err := migrateMetricMetadata(db, data); err != nil { + return err + } + if err := migrateTagMetadata(db, data); err != nil { + return err + } + if err := migrateSearch(db, data); err != nil { + return err + } + return nil +} +func migrateMetricMetadata(db *bolt.DB, data database.DataAccess) error { migrated, err := isMigrated(db, "metadata-metric") if err != nil { return err } if !migrated { + slog.Info("Migrating metric metadata to new database format") type MetadataMetric struct { Unit string `json:",omitempty"` Type string `json:",omitempty"` Description string } - slog.Info("Migrating metric metadata to new database format") mms := map[string]*MetadataMetric{} if err := decode(db, "metadata-metric", &mms); err == nil { for name, mm := range mms { @@ -345,8 +339,11 @@ func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { } } } - //metadata - migrated, err = isMigrated(db, "metadata") + return nil +} + +func migrateTagMetadata(db *bolt.DB, data database.DataAccess) error { + migrated, err := isMigrated(db, "metadata") if err != nil { return err } @@ -358,7 +355,6 @@ func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { } metadata := make(map[metadata.Metakey]*Metavalue) if err := decode(db, "metadata", &metadata); err == nil { - for k, v := range metadata { err = data.PutTagMetadata(k.TagSet(), k.Name, fmt.Sprint(v.Value), v.Time) if err != nil { @@ -377,6 +373,60 @@ func migrateOldDataToRedis(db *bolt.DB, data database.DataAccess) error { } return nil } + +func migrateSearch(db *bolt.DB, data database.DataAccess) error { + migrated, err := isMigrated(db, "search") + if err != nil { + return err + } + if !migrated { + slog.Info("Migrating Search data to new database format") + type duple struct{ A, B string } + type present map[string]int64 + type qmap map[duple]present + type smap map[string]present + + metric := qmap{} + if err := decode(db, "metric", &metric); err == nil { + for k, v := range metric { + for metric, time := range v { + data.Search_AddMetricForTag(k.A, k.B, metric, time) + } + } + } else { + return err + } + tagk := smap{} + if err := decode(db, "tagk", &tagk); err == nil { + for metric, v := range tagk { + for tk, time := range v { + data.Search_AddTagKeyForMetric(metric, tk, time) + } + data.Search_AddMetric(metric, time.Now().Unix()) + } + } else { + return err + } + + tagv := qmap{} + if err := decode(db, "tagv", &tagv); err == nil { + for k, v := range tagv { + for val, time := range v { + data.Search_AddTagValue(k.A, k.B, val, time) + data.Search_AddTagValue(database.Search_All, k.B, val, time) + } + } + } else { + return err + } + err = setMigrated(db, "search") + if err != nil { + return err + } + } + return nil +} + func isMigrated(db *bolt.DB, name string) (bool, error) { found := false err := db.View(func(tx *bolt.Tx) error { diff --git a/cmd/bosun/sched/sched.go b/cmd/bosun/sched/sched.go index 978cd12a4a..fc2386bcef 100644 --- a/cmd/bosun/sched/sched.go +++ b/cmd/bosun/sched/sched.go @@ -76,7 +76,6 @@ func (s *Schedule) Init(c *conf.Conf) error { s.Incidents = make(map[uint64]*Incident) s.pendingUnknowns = make(map[*conf.Notification][]*State) s.status = make(States) - s.Search = search.NewSearch() s.LastCheck = time.Now() s.ctx = &checkContext{time.Now(), cache.New(0)} if s.DataAccess == nil { @@ -91,6 +90,7 @@ func (s *Schedule) Init(c *conf.Conf) error { s.DataAccess = database.NewDataAccess(bind, false) } } + s.Search = search.NewSearch(s.DataAccess) if c.StateFile != "" { s.db, err = bolt.Open(c.StateFile, 0600, nil) if err != nil { @@ -512,7 +512,11 @@ const pingFreq = time.Second * 15 func (s *Schedule) PingHosts() { for range time.Tick(pingFreq) { - hosts := s.Search.TagValuesByTagKey("host", s.Conf.PingDuration) + hosts, err := s.Search.TagValuesByTagKey("host", s.Conf.PingDuration) + if err != nil { + slog.Error(err) + continue + } for _, host := range hosts { go pingHost(host) } @@ -947,9 +951,13 @@ func (s *Schedule) GetIncidentEvents(id uint64) (*Incident, []Event, []Action, e return incident, list, actions, nil } -func (s *Schedule) Host(filter string) map[string]*HostData { +func (s *Schedule) Host(filter string) (map[string]*HostData, error) { hosts := make(map[string]*HostData) - for _, h := range s.Search.TagValuesByTagKey("host", time.Hour*7*24) { + allHosts, err := s.Search.TagValuesByTagKey("host", time.Hour*7*24) + if err != nil { + return nil, err + } + for _, h := range allHosts { hosts[h] = newHostData() } for name, host := range hosts { @@ -1021,7 +1029,7 @@ func (s *Schedule) Host(filter string) map[string]*HostData { } } } - return hosts + return hosts, nil } type HostInterface struct { diff --git a/cmd/bosun/sched/sched_test.go b/cmd/bosun/sched/sched_test.go index 633c7b76ab..3fa637c673 100644 --- a/cmd/bosun/sched/sched_test.go +++ b/cmd/bosun/sched/sched_test.go @@ -70,6 +70,30 @@ func (n *nopDataAccess) GetTagMetadata(tags opentsdb.TagSet, name string) ([]*da func (n *nopDataAccess) DeleteTagMetadata(tags opentsdb.TagSet, name string) error { panic("not implemented") } +func (n *nopDataAccess) Search_AddMetricForTag(tagK, tagV, metric string, time int64) error { + panic("not implemented") +} +func (n *nopDataAccess) Search_GetMetricsForTag(tagK, tagV string) (map[string]int64, error) { + panic("not implemented") +} +func (n *nopDataAccess) Search_AddTagKeyForMetric(metric, tagK string, time int64) error { + panic("not implemented") +} +func (n *nopDataAccess) Search_GetTagKeysForMetric(metric string) (map[string]int64, error) { + panic("not implemented") +} +func (n *nopDataAccess) Search_AddMetric(metric string, time int64) error { + panic("not implemented") +} +func (n *nopDataAccess) Search_GetAllMetrics() (map[string]int64, error) { + panic("not implemented") +} +func (n *nopDataAccess) Search_AddTagValue(metric, tagK, tagV string, time int64) error { + panic("not implemented") +} +func (n *nopDataAccess) Search_GetTagValues(metric, tagK string) (map[string]int64, error) { + panic("not implemented") +} func initSched(c *conf.Conf) (*Schedule, error) { c.StateFile = "" diff --git a/cmd/bosun/search/search.go b/cmd/bosun/search/search.go index 9bc5a7517c..6f938760f0 100644 --- a/cmd/bosun/search/search.go +++ b/cmd/bosun/search/search.go @@ -2,13 +2,19 @@ package search // import "bosun.org/cmd/bosun/search" import ( "fmt" + "math/rand" + "reflect" "regexp" "sort" "strings" "sync" "time" + "bosun.org/cmd/bosun/database" + "bosun.org/collect" + "bosun.org/metadata" "bosun.org/opentsdb" + "bosun.org/slog" ) // Search is a struct to hold indexed data about OpenTSDB metric and tag data. @@ -16,145 +22,114 @@ import ( // available tag keys for a metric, and available tag values for a metric and // tag key. type Search struct { - // tagk + tagv -> metrics - Metric qmap - // metric -> tag keys - Tagk smap - // metric + tagk -> tag values - Tagv qmap + DataAccess database.DataAccess - Last map[string]*pair - - // Read replica. Should never be mutated. Can at any time be assigned to a new - // value. - Read *Search + Last map[string]*lastInfo + indexQueue chan *opentsdb.DataPoint sync.RWMutex - - // copy is true when there is a Copy() event pending. - copy bool -} - -type pair struct { - points [2]opentsdb.DataPoint - index int -} - -type MetricTagSet struct { - Metric string `json:"metric"` - Tags opentsdb.TagSet `json:"tags"` } -func (mts *MetricTagSet) key() string { - return mts.Metric + mts.Tags.String() +type lastInfo struct { + lastVal float64 + diffFromPrev float64 + timestamp int64 } -type qmap map[duple]present -type smap map[string]present -type mtsmap map[string]MetricTagSet -type present map[string]int64 - -type duple struct { - A, B string -} - -func (q qmap) Copy() qmap { - m := make(qmap) - for k, v := range q { - m[k] = v.Copy() - } - return m +func init() { + metadata.AddMetricMeta("bosun.search.index_queue", metadata.Gauge, metadata.Count, "Number of datapoints queued for indexing to redis") + metadata.AddMetricMeta("bosun.search.dropped", metadata.Counter, metadata.Count, "Number of datapoints discarded without being saved to redis") } -func (s smap) Copy() smap { - m := make(smap) - for k, v := range s { - m[k] = v.Copy() - } - return m -} -func (t mtsmap) Copy() mtsmap { - m := make(mtsmap) - for k, v := range t { - m[k] = v - } - return m -} -func (p present) Copy() present { - m := make(present) - for k, v := range p { - m[k] = v - } - return m -} - -func NewSearch() *Search { +func NewSearch(data database.DataAccess) *Search { s := Search{ - Metric: make(qmap), - Tagk: make(smap), - Tagv: make(qmap), - Last: make(map[string]*pair), - Read: new(Search), + DataAccess: data, + Last: make(map[string]*lastInfo), + indexQueue: make(chan *opentsdb.DataPoint, 300000), } + collect.Set("search.index_queue", opentsdb.TagSet{}, func() interface{} { return len(s.indexQueue) }) + go s.redisIndex(s.indexQueue) return &s } -// Copy copies current data to the Read replica. -func (s *Search) Copy() { - r := new(Search) - r.Metric = s.Metric.Copy() - r.Tagk = s.Tagk.Copy() - r.Tagv = s.Tagv.Copy() - s.Read = r -} - func (s *Search) Index(mdp opentsdb.MultiDataPoint) { - now := time.Now().Unix() - s.Lock() - if !s.copy { - s.copy = true - go func() { - time.Sleep(time.Second * 20) - s.Lock() - s.Copy() - s.copy = false - s.Unlock() - }() - } for _, dp := range mdp { - var mts MetricTagSet - mts.Metric = dp.Metric - mts.Tags = dp.Tags - key := mts.key() - var q duple - for k, v := range dp.Tags { - q.A, q.B = k, v - if _, ok := s.Metric[q]; !ok { - s.Metric[q] = make(present) - } - s.Metric[q][dp.Metric] = now - - if _, ok := s.Tagk[dp.Metric]; !ok { - s.Tagk[dp.Metric] = make(present) - } - s.Tagk[dp.Metric][k] = now - - q.A, q.B = dp.Metric, k - if _, ok := s.Tagv[q]; !ok { - s.Tagv[q] = make(present) - } - s.Tagv[q][v] = now - } + s.Lock() + metric := dp.Metric + key := metric + dp.Tags.String() p := s.Last[key] if p == nil { - p = new(pair) + p = &lastInfo{} s.Last[key] = p } - if p.points[p.index%2].Timestamp < dp.Timestamp { - p.points[p.index%2] = *dp - p.index++ + if p.timestamp < dp.Timestamp { + if fv, err := getFloat(dp.Value); err == nil { + p.diffFromPrev = (fv - p.lastVal) / float64(dp.Timestamp-p.timestamp) + p.lastVal = fv + } else { + slog.Error(err) + } + p.timestamp = dp.Timestamp + } + s.Unlock() + select { + case s.indexQueue <- dp: + default: + collect.Add("search.dropped", opentsdb.TagSet{}, 1) } } - s.Unlock() +} + +func (s *Search) redisIndex(c <-chan *opentsdb.DataPoint) { + now := time.Now().Unix() + nextUpdateTimes := make(map[string]int64) + updateIfTime := func(key string, f func()) { + nextUpdate, ok := nextUpdateTimes[key] + if !ok || now > nextUpdate { + f() + nextUpdateTimes[key] = now + int64(30*60+rand.Intn(15*60)) //pick a random time between 30 and 45 minutes from now + } + } + for dp := range c { + now = time.Now().Unix() + metric := dp.Metric + for k, v := range dp.Tags { + updateIfTime(fmt.Sprintf("kvm:%s:%s:%s", k, v, metric), func() { + if err := s.DataAccess.Search_AddMetricForTag(k, v, metric, now); err != nil { + slog.Error(err) + } + if err := s.DataAccess.Search_AddTagValue(metric, k, v, now); err != nil { + slog.Error(err) + } + }) + updateIfTime(fmt.Sprintf("mk:%s:%s", metric, k), func() { + if err := s.DataAccess.Search_AddTagKeyForMetric(metric, k, now); err != nil { + slog.Error(err) + } + }) + updateIfTime(fmt.Sprintf("kv:%s:%s", k, v), func() { + if err := s.DataAccess.Search_AddTagValue(database.Search_All, k, v, now); err != nil { + slog.Error(err) + } + }) + updateIfTime(fmt.Sprintf("m:%s", metric), func() { + if err := s.DataAccess.Search_AddMetric(metric, now); err != nil { + slog.Error(err) + } + }) + } + } +} + +var floatType = reflect.TypeOf(float64(0)) + +func getFloat(unk interface{}) (float64, error) { + v := reflect.ValueOf(unk) + v = reflect.Indirect(v) + if !v.Type().ConvertibleTo(floatType) { + return 0, fmt.Errorf("cannot convert %v to float64", v.Type()) + } + fv := v.Convert(floatType) + return fv.Float(), nil } // Match returns all matching values against search. search is a regex, except @@ -186,26 +161,13 @@ func (s *Search) GetLast(metric, tags string, diff bool) (v float64, err error) s.RLock() p := s.Last[metric+tags] if p != nil { - var ok bool - e := p.points[(p.index+1)%2] - v, ok = e.Value.(float64) - if !ok { - err = errNotFloat - } if diff { - o := p.points[p.index%2] - ov, ok := o.Value.(float64) - if !ok { - err = errNotFloat - } - if o.Timestamp == 0 || e.Timestamp == 0 { - err = fmt.Errorf("last: need two data points") - } - v = (v - ov) / float64(e.Timestamp-o.Timestamp) + return p.diffFromPrev, nil } + return p.lastVal, nil } s.RUnlock() - return + return 0, nil } func (s *Search) Expand(q *opentsdb.Query) error { @@ -216,7 +178,10 @@ func (s *Search) Expand(q *opentsdb.Query) error { if v == "*" || !strings.Contains(v, "*") { nvs = append(nvs, v) } else { - vs := s.TagValuesByMetricTagKey(q.Metric, k, 0) + vs, err := s.TagValuesByMetricTagKey(q.Metric, k, 0) + if err != nil { + return err + } ns, err := Match(v, vs) if err != nil { return err @@ -232,83 +197,66 @@ func (s *Search) Expand(q *opentsdb.Query) error { return nil } -func (s *Search) UniqueMetrics() []string { - metrics := make([]string, len(s.Tagk)) +func (s *Search) UniqueMetrics() ([]string, error) { + m, err := s.DataAccess.Search_GetAllMetrics() + if err != nil { + return nil, err + } + metrics := make([]string, len(m)) i := 0 - for k := range s.Read.Tagk { + for k := range m { metrics[i] = k i++ } sort.Strings(metrics) - return metrics + return metrics, nil } -func (s *Search) TagValuesByTagKey(Tagk string, since time.Duration) []string { - um := s.UniqueMetrics() - tagvset := make(map[string]bool) - for _, Metric := range um { - for _, Tagv := range s.tagValuesByMetricTagKey(Metric, Tagk, since) { - tagvset[Tagv] = true - } - } - tagvs := make([]string, len(tagvset)) - i := 0 - for k := range tagvset { - tagvs[i] = k - i++ - } - sort.Strings(tagvs) - return tagvs +func (s *Search) TagValuesByTagKey(Tagk string, since time.Duration) ([]string, error) { + return s.TagValuesByMetricTagKey(database.Search_All, Tagk, since) } -func (s *Search) MetricsByTagPair(Tagk, Tagv string) []string { - r := make([]string, 0) - for k := range s.Read.Metric[duple{Tagk, Tagv}] { +func (s *Search) MetricsByTagPair(tagk, tagv string) ([]string, error) { + metrics, err := s.DataAccess.Search_GetMetricsForTag(tagk, tagv) + if err != nil { + return nil, err + } + r := []string{} + for k := range metrics { r = append(r, k) } sort.Strings(r) - return r + return r, nil } -func (s *Search) TagKeysByMetric(Metric string) []string { - r := make([]string, 0) - for k := range s.Read.Tagk[Metric] { +func (s *Search) TagKeysByMetric(metric string) ([]string, error) { + keys, err := s.DataAccess.Search_GetTagKeysForMetric(metric) + if err != nil { + return nil, err + } + r := []string{} + for k := range keys { r = append(r, k) } sort.Strings(r) - return r + return r, nil } -func (s *Search) MetricsWithTagKeys() map[string][]string { - metricToKeys := make(map[string][]string) - searchResult := s.Read.Tagk.Copy() - for metric, v := range searchResult { - keys := make([]string, len(v)) - i := 0 - for tK := range v { - keys[i] = tK - i++ - } - metricToKeys[metric] = keys - } - return metricToKeys -} - -func (s *Search) tagValuesByMetricTagKey(Metric, Tagk string, since time.Duration) []string { +func (s *Search) TagValuesByMetricTagKey(metric, tagK string, since time.Duration) ([]string, error) { var t int64 if since > 0 { t = time.Now().Add(-since).Unix() } - r := make([]string, 0) - for k, ts := range s.Read.Tagv[duple{Metric, Tagk}] { + vals, err := s.DataAccess.Search_GetTagValues(metric, tagK) + if err != nil { + return nil, err + } + r := []string{} + for k, ts := range vals { if t <= ts { r = append(r, k) } } sort.Strings(r) - return r -} - -func (s *Search) TagValuesByMetricTagKey(Metric, Tagk string, since time.Duration) []string { - return s.tagValuesByMetricTagKey(Metric, Tagk, since) + return r, nil } diff --git a/cmd/bosun/search/search_test.go b/cmd/bosun/search/search_test.go new file mode 100644 index 0000000000..38f01fff9a --- /dev/null +++ b/cmd/bosun/search/search_test.go @@ -0,0 +1,58 @@ +package search + +import ( + "os" + "reflect" + "testing" + "time" + + "bosun.org/cmd/bosun/database/test" + "bosun.org/opentsdb" +) + +var testSearch *Search + +func TestMain(m *testing.M) { + testData, closeF := dbtest.StartTestRedis() + testSearch = NewSearch(testData) + status := m.Run() + closeF() + os.Exit(status) +} + +func checkEqual(t *testing.T, err error, desc string, expected, actual []string) { + if err != nil { + t.Fatal(err) + } + if len(expected) != len(actual) { + t.Fatalf("%s lengths differ. Expected %d, but found %d", desc, len(expected), len(actual)) + } + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("Expect %s: %s. Found %s", desc, expected, actual) + } +} + +func TestIndex(t *testing.T) { + mdp := opentsdb.MultiDataPoint{ + &opentsdb.DataPoint{Metric: "os.cpu", Value: 12.0, Timestamp: 13, Tags: opentsdb.TagSet{"host": "abc", "proc": "7"}}, + &opentsdb.DataPoint{Metric: "os.mem", Value: 4000, Timestamp: 13, Tags: opentsdb.TagSet{"host": "abc1"}}, + &opentsdb.DataPoint{Metric: "os.mem", Value: 4050, Timestamp: 13, Tags: opentsdb.TagSet{"host": "def"}}, + &opentsdb.DataPoint{Metric: "os.cpu2", Value: 12.0, Timestamp: 13, Tags: opentsdb.TagSet{"host": "abc"}}, + } + testSearch.Index(mdp) + time.Sleep(4 * time.Second) + um, err := testSearch.UniqueMetrics() + checkEqual(t, err, "metrics", []string{"os.cpu", "os.cpu2", "os.mem"}, um) + + tagks, err := testSearch.TagKeysByMetric("os.cpu") + checkEqual(t, err, "tagk", []string{"host", "proc"}, tagks) + + tagvs, err := testSearch.TagValuesByTagKey("host", 0) + checkEqual(t, err, "tagvsByTagKeyOnly", []string{"abc", "abc1", "def"}, tagvs) + + tagvs, err = testSearch.TagValuesByMetricTagKey("os.mem", "host", 0) + checkEqual(t, err, "tagvsByTagKeyAndMetric", []string{"abc1", "def"}, tagvs) + + metrics, err := testSearch.MetricsByTagPair("host", "abc") + checkEqual(t, err, "metricsByPair", []string{"os.cpu", "os.cpu2"}, metrics) +} diff --git a/cmd/bosun/web/relay_test.go b/cmd/bosun/web/relay_test.go index b9d43810a9..3c8298d268 100644 --- a/cmd/bosun/web/relay_test.go +++ b/cmd/bosun/web/relay_test.go @@ -6,14 +6,28 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "sort" "testing" "time" "bosun.org/cmd/bosun/conf" + "bosun.org/cmd/bosun/database" + "bosun.org/cmd/bosun/database/test" ) +var testData database.DataAccess + +func TestMain(m *testing.M) { + var closeF func() + testData, closeF = dbtest.StartTestRedis() + status := m.Run() + closeF() + os.Exit(status) +} + func TestRelay(t *testing.T) { + schedule.DataAccess = testData schedule.Init(new(conf.Conf)) rs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(204) @@ -42,7 +56,7 @@ func TestRelay(t *testing.T) { bodygzip := []byte(`[{ "timestamp": 1, "metric": "gzip-works", - "value": "345", + "value": 345, "tags": { "host": "host.gzip", "gzipped": "yup" @@ -57,17 +71,16 @@ func TestRelay(t *testing.T) { } time.Sleep(time.Second) - schedule.Search.Copy() - m := schedule.Search.UniqueMetrics() + m, _ := schedule.Search.UniqueMetrics() sort.Strings(m) if len(m) != 2 || m[0] != "gzip-works" || m[1] != "no-gzip-works" { t.Errorf("bad um: %v", m) } - m = schedule.Search.TagValuesByMetricTagKey("gzip-works", "gzipped", 0) + m, _ = schedule.Search.TagValuesByMetricTagKey("gzip-works", "gzipped", 0) if len(m) != 1 || m[0] != "yup" { t.Errorf("bad tvbmtk: %v", m) } - m = schedule.Search.TagKeysByMetric("no-gzip-works") + m, _ = schedule.Search.TagKeysByMetric("no-gzip-works") sort.Strings(m) if len(m) != 2 || m[0] != "host" || m[1] != "other" { t.Errorf("bad tkbm: %v", m) diff --git a/cmd/bosun/web/search.go b/cmd/bosun/web/search.go index 055af9e4b8..43bc82bf01 100644 --- a/cmd/bosun/web/search.go +++ b/cmd/bosun/web/search.go @@ -11,7 +11,10 @@ import ( // UniqueMetrics returns a sorted list of available metrics. func UniqueMetrics(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { - values := schedule.Search.UniqueMetrics() + values, err := schedule.Search.UniqueMetrics() + if err != nil { + return nil, err + } // remove anything starting with double underscore. q := r.URL.Query() if v := q.Get("unfiltered"); v != "" { @@ -29,28 +32,21 @@ func UniqueMetrics(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) func TagKeysByMetric(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { vars := mux.Vars(r) metric := vars["metric"] - keys := schedule.Search.TagKeysByMetric(metric) - return keys, nil + return schedule.Search.TagKeysByMetric(metric) } func TagValuesByMetricTagKey(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { vars := mux.Vars(r) metric := vars["metric"] tagk := vars["tagk"] - return schedule.Search.TagValuesByMetricTagKey(metric, tagk, 0), nil + return schedule.Search.TagValuesByMetricTagKey(metric, tagk, 0) } func MetricsByTagPair(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { vars := mux.Vars(r) tagk := vars["tagk"] tagv := vars["tagv"] - values := schedule.Search.MetricsByTagPair(tagk, tagv) - return values, nil -} - -func MetricsWithTagKeys(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { - values := schedule.Search.MetricsWithTagKeys() - return values, nil + return schedule.Search.MetricsByTagPair(tagk, tagv) } func TagValuesByTagKey(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -67,6 +63,5 @@ func TagValuesByTagKey(t miniprofiler.Timer, w http.ResponseWriter, r *http.Requ return nil, err } } - values := schedule.Search.TagValuesByTagKey(tagk, time.Duration(since)) - return values, nil + return schedule.Search.TagValuesByTagKey(tagk, time.Duration(since)) } diff --git a/cmd/bosun/web/web.go b/cmd/bosun/web/web.go index bf41384d1e..b7746d59c9 100644 --- a/cmd/bosun/web/web.go +++ b/cmd/bosun/web/web.go @@ -103,7 +103,6 @@ func Listen(listenAddr string, devMode bool, tsdbHost string) error { router.Handle("/api/metadata/delete", JSON(DeleteMetadata)).Methods("DELETE") router.Handle("/api/metric", JSON(UniqueMetrics)) router.Handle("/api/metric/{tagk}/{tagv}", JSON(MetricsByTagPair)) - router.Handle("/api/metric/tagkey", JSON(MetricsWithTagKeys)) router.Handle("/api/rule", JSON(Rule)) router.HandleFunc("/api/shorten", Shorten) router.Handle("/api/silence/clear", JSON(SilenceClear)) @@ -593,7 +592,7 @@ func APIRedirect(w http.ResponseWriter, req *http.Request) { } func Host(t miniprofiler.Timer, w http.ResponseWriter, r *http.Request) (interface{}, error) { - return schedule.Host(r.FormValue("filter")), nil + return schedule.Host(r.FormValue("filter")) } // Last returns the most recent datapoint for a metric+tagset. The metric+tagset