diff --git a/collect/collect.go b/collect/collect.go index 0e6a3db7cf..3ff2c0c810 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -50,6 +50,9 @@ var ( // Dropped is the number of dropped data points due to a full queue. dropped int64 + // Dropped is the number of discarded data points due to being invalid + discarded int64 + // Sent is the number of sent data points. sent int64 @@ -71,6 +74,7 @@ var ( const ( descCollectAlloc = "Total number of bytes allocated and still in use by the runtime (via runtime.ReadMemStats)." + descCollectDiscarded = "Counter of discarded data points due to being invalid." descCollectDropped = "Counter of dropped data points due to the queue being full." descCollectGoRoutines = "Total number of goroutines that currently exist (via runtime.NumGoroutine)." descCollectGcCpuFraction = "fraction of CPU time used by GC" @@ -131,6 +135,13 @@ func InitChan(tsdbhost *url.URL, root string, ch chan *opentsdb.DataPoint) error slock.Unlock() return }) + Set("collect.discarded", Tags, func() (i interface{}) { + slock.Lock() + i = discarded + slock.Unlock() + return + }) + Set("collect.sent", Tags, func() (i interface{}) { slock.Lock() i = sent @@ -176,6 +187,7 @@ func InitChan(tsdbhost *url.URL, root string, ch chan *opentsdb.DataPoint) error metadata.AddMetricMeta(metricRoot+"collect.queued", metadata.Gauge, metadata.Item, descCollectQueued) metadata.AddMetricMeta(metricRoot+"collect.sent", metadata.Counter, metadata.PerSecond, descCollectSent) metadata.AddMetricMeta(metricRoot+"collect.dropped", metadata.Counter, metadata.PerSecond, descCollectDropped) + metadata.AddMetricMeta(metricRoot+"collect.discarded", metadata.Counter, metadata.PerSecond, descCollectDiscarded) // Make sure these get zeroed out instead of going unknown on restart Add("collect.post.error", Tags, 0) Add("collect.post.bad_status", Tags, 0) diff --git a/collect/queue.go b/collect/queue.go index 8212e8c909..50dcbcc40d 100644 --- a/collect/queue.go +++ b/collect/queue.go @@ -20,7 +20,7 @@ import ( func queuer() { for dp := range tchan { if err := dp.Clean(); err != nil { - atomic.AddInt64(&dropped, 1) + atomic.AddInt64(&discarded, 1) continue // if anything gets this far that can't be made valid, just drop it silently. } qlock.Lock() @@ -32,6 +32,10 @@ func queuer() { queue = append(queue, dp) select { case dp = <-tchan: + if err := dp.Clean(); err != nil { + atomic.AddInt64(&discarded, 1) + break // if anything gets this far that can't be made valid, just drop it silently. + } continue default: } diff --git a/opentsdb/tsdb.go b/opentsdb/tsdb.go index 58b4493350..868710d4e0 100644 --- a/opentsdb/tsdb.go +++ b/opentsdb/tsdb.go @@ -293,6 +293,9 @@ func (t TagSet) Clean() error { if err != nil { return fmt.Errorf("cleaning value %s for tag %s: %s", v, k, err) } + if kc == "" || vc == "" { + return fmt.Errorf("cleaning value [%s] for tag [%s] result in an empty string", v, k) + } if kc != k || vc != v { delete(t, k) t[kc] = vc