prometheus/retrieval/targetpool.go

117 lines
2.2 KiB
Go
Raw Normal View History

package retrieval
import (
2013-01-04 13:41:47 +00:00
"container/heap"
"github.com/prometheus/prometheus/retrieval/format"
2013-01-04 13:41:47 +00:00
"log"
"time"
)
const (
intervalKey = "interval"
)
2013-01-04 13:41:47 +00:00
type TargetPool struct {
done chan bool
manager TargetManager
targets []Target
2013-01-04 13:41:47 +00:00
}
func NewTargetPool(m TargetManager) (p *TargetPool) {
return &TargetPool{
manager: m,
}
2013-01-04 13:41:47 +00:00
}
func (p TargetPool) Len() int {
2013-01-04 13:41:47 +00:00
return len(p.targets)
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p *TargetPool) Pop() interface{} {
2013-01-04 13:41:47 +00:00
oldPool := p.targets
futureLength := p.Len() - 1
element := oldPool[futureLength]
futurePool := oldPool[0:futureLength]
2013-01-04 13:41:47 +00:00
p.targets = futurePool
return element
}
func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(Target))
}
func (p TargetPool) Swap(i, j int) {
2013-01-04 13:41:47 +00:00
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
2013-01-04 13:41:47 +00:00
ticker := time.Tick(interval)
for {
select {
case <-ticker:
p.runIteration(results, interval)
2013-01-04 13:41:47 +00:00
case <-p.done:
log.Printf("TargetPool exiting...")
break
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
2013-01-04 13:41:47 +00:00
p.manager.acquire()
defer p.manager.release()
t.Scrape(earliest, results)
2013-01-04 13:41:47 +00:00
}
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
begin := time.Now()
targetCount := p.Len()
finished := make(chan bool, targetCount)
for i := 0; i < targetCount; i++ {
target := heap.Pop(p).(Target)
2013-01-04 13:41:47 +00:00
if target == nil {
break
}
now := time.Now()
if target.scheduledFor().After(now) {
2013-01-04 13:41:47 +00:00
heap.Push(p, target)
2013-02-21 18:48:29 +00:00
// None of the remaining targets are ready to be scheduled. Signal that
// we're done processing them in this scrape iteration.
for j := i; j < targetCount; j++ {
finished <- true
}
2013-01-04 13:41:47 +00:00
break
}
go func() {
p.runSingle(now, results, target)
2013-01-04 13:41:47 +00:00
heap.Push(p, target)
finished <- true
2013-01-04 13:41:47 +00:00
}()
}
for i := 0; i < targetCount; i++ {
<-finished
}
close(finished)
2013-02-21 18:48:29 +00:00
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}