mirror of
https://github.com/prometheus/prometheus
synced 2025-01-14 19:02:38 +00:00
e01b6cdb44
We have an open question of how long does it take for each target pool to have the state retrieved from all participating elements. This commit starts by providing insight into this.
113 lines
2.1 KiB
Go
113 lines
2.1 KiB
Go
package retrieval
|
|
|
|
import (
|
|
"container/heap"
|
|
"github.com/prometheus/prometheus/retrieval/format"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
intervalKey = "interval"
|
|
)
|
|
|
|
type TargetPool struct {
|
|
done chan bool
|
|
manager TargetManager
|
|
targets []Target
|
|
}
|
|
|
|
func NewTargetPool(m TargetManager) (p *TargetPool) {
|
|
return &TargetPool{
|
|
manager: m,
|
|
}
|
|
}
|
|
|
|
func (p TargetPool) Len() int {
|
|
return len(p.targets)
|
|
}
|
|
|
|
func (p TargetPool) Less(i, j int) bool {
|
|
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
|
|
}
|
|
|
|
func (p *TargetPool) Pop() interface{} {
|
|
oldPool := p.targets
|
|
futureLength := p.Len() - 1
|
|
element := oldPool[futureLength]
|
|
futurePool := oldPool[0:futureLength]
|
|
p.targets = futurePool
|
|
|
|
return element
|
|
}
|
|
|
|
func (p *TargetPool) Push(element interface{}) {
|
|
p.targets = append(p.targets, element.(Target))
|
|
}
|
|
|
|
func (p TargetPool) Swap(i, j int) {
|
|
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
|
|
}
|
|
|
|
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
|
|
ticker := time.Tick(interval)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker:
|
|
p.runIteration(results, interval)
|
|
case <-p.done:
|
|
log.Printf("TargetPool exiting...")
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p TargetPool) Stop() {
|
|
p.done <- true
|
|
}
|
|
|
|
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
|
|
p.manager.acquire()
|
|
defer p.manager.release()
|
|
|
|
t.Scrape(earliest, results)
|
|
}
|
|
|
|
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
|
|
begin := time.Now()
|
|
|
|
targetCount := p.Len()
|
|
finished := make(chan bool, targetCount)
|
|
|
|
for i := 0; i < targetCount; i++ {
|
|
target := heap.Pop(p).(Target)
|
|
if target == nil {
|
|
break
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
if target.scheduledFor().After(now) {
|
|
heap.Push(p, target)
|
|
|
|
break
|
|
}
|
|
|
|
go func() {
|
|
p.runSingle(now, results, target)
|
|
heap.Push(p, target)
|
|
finished <- true
|
|
}()
|
|
}
|
|
|
|
for i := 0; i < targetCount; i++ {
|
|
<-finished
|
|
}
|
|
|
|
close(finished)
|
|
|
|
duration := float64(time.Now().Sub(begin) / time.Millisecond)
|
|
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
|
|
}
|