diff --git a/cmd/scollector/collectors/collectors.go b/cmd/scollector/collectors/collectors.go index b50f000e47..2ce3850015 100644 --- a/cmd/scollector/collectors/collectors.go +++ b/cmd/scollector/collectors/collectors.go @@ -20,7 +20,7 @@ import ( var collectors []Collector type Collector interface { - Run(chan<- *opentsdb.DataPoint) + Run(chan<- *opentsdb.DataPoint, <-chan struct{}) Name() string Init() } @@ -136,15 +136,16 @@ func Search(s []string) []Collector { } // Run runs specified collectors. Use nil for all collectors. -func Run(cs []Collector) chan *opentsdb.DataPoint { +func Run(cs []Collector) (chan *opentsdb.DataPoint, chan struct{}) { if cs == nil { cs = collectors } ch := make(chan *opentsdb.DataPoint) + quit := make(chan struct{}) for _, c := range cs { - go c.Run(ch) + go c.Run(ch, quit) } - return ch + return ch, quit } type initFunc func(*conf.Conf) diff --git a/cmd/scollector/collectors/interval.go b/cmd/scollector/collectors/interval.go index bdfb70a830..20aa51690f 100644 --- a/cmd/scollector/collectors/interval.go +++ b/cmd/scollector/collectors/interval.go @@ -34,7 +34,7 @@ func (c *IntervalCollector) Init() { } } -func (c *IntervalCollector) Run(dpchan chan<- *opentsdb.DataPoint) { +func (c *IntervalCollector) Run(dpchan chan<- *opentsdb.DataPoint, quit <-chan struct{}) { if c.Enable != nil { go func() { for { @@ -71,7 +71,12 @@ func (c *IntervalCollector) Run(dpchan chan<- *opentsdb.DataPoint) { dpchan <- dp } } - <-next + select { + case <-next: + case <-quit: + return + } + } } diff --git a/cmd/scollector/collectors/program.go b/cmd/scollector/collectors/program.go index 5970ebf0ef..3a213934d0 100644 --- a/cmd/scollector/collectors/program.go +++ b/cmd/scollector/collectors/program.go @@ -80,7 +80,7 @@ func isExecutable(f os.FileInfo) bool { } } -func (c *ProgramCollector) Run(dpchan chan<- *opentsdb.DataPoint) { +func (c *ProgramCollector) Run(dpchan chan<- *opentsdb.DataPoint, quit <-chan struct{}) { if c.Interval == 0 { for { next := time.After(DefaultFreq) @@ -94,7 +94,12 @@ func (c *ProgramCollector) Run(dpchan chan<- *opentsdb.DataPoint) { for { next := time.After(c.Interval) c.runProgram(dpchan) - <-next + select { + case <-next: + case <-quit: + return + } + } } } diff --git a/cmd/scollector/main.go b/cmd/scollector/main.go index 8518106ab3..a7b164ab2c 100644 --- a/cmd/scollector/main.go +++ b/cmd/scollector/main.go @@ -11,6 +11,7 @@ import ( _ "net/http/pprof" "net/url" "os" + "os/signal" "path/filepath" "runtime" "strconv" @@ -182,14 +183,13 @@ func main() { slog.Fatal(err) } } - cdp := collectors.Run(c) + cdp, cquit := collectors.Run(c) if u != nil { slog.Infoln("OpenTSDB host:", u) } if err := collect.InitChan(u, "scollector", cdp); err != nil { slog.Fatal(err) } - if version.VersionDate != "" { v, err := strconv.ParseInt(version.VersionDate, 10, 64) if err == nil { @@ -218,7 +218,15 @@ func main() { } } }() - select {} + sChan := make(chan os.Signal) + signal.Notify(sChan, os.Interrupt) + <-sChan + close(cquit) + // try to flush all datapoints on sigterm, but quit after 5 seconds no matter what. + time.AfterFunc(5*time.Second, func() { + os.Exit(0) + }) + collect.Flush() } func readConf() *conf.Conf { diff --git a/collect/queue.go b/collect/queue.go index 23a4c346fb..a5f1e3360a 100644 --- a/collect/queue.go +++ b/collect/queue.go @@ -34,6 +34,24 @@ func queuer() { } } +// Locks the queue and sends all datapoints. Intended to be used as scollector exits. +func Flush() { + qlock.Lock() + for len(queue) > 0 { + i := len(queue) + if i > BatchSize { + i = BatchSize + } + sending := queue[:i] + queue = queue[i:] + if Debug { + slog.Infof("sending: %d, remaining: %d", i, len(queue)) + } + sendBatch(sending) + } + qlock.Unlock() +} + func send() { for { qlock.Lock()