Merge pull request #149 from prometheus/fix/telemetry/queue-reporting

Fix append queue telemetry and parameterize sizes.
This commit is contained in:
juliusv 2013-04-16 08:19:03 -07:00
commit a13bc4494d
2 changed files with 13 additions and 3 deletions

View File

@ -39,6 +39,8 @@ var (
scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.")
ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.")
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.")
) )
func main() { func main() {
@ -48,7 +50,7 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err) log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
} }
ts, err := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
if err != nil { if err != nil {
log.Fatalf("Error opening storage: %s", err) log.Fatalf("Error opening storage: %s", err)
} }

View File

@ -166,9 +166,17 @@ func (t *tieredStorage) Serve() {
writeMemoryTicker = time.Tick(t.writeMemoryInterval) writeMemoryTicker = time.Tick(t.writeMemoryInterval)
) )
for { go func() {
t.reportQueues() reportTicker := time.Tick(time.Second)
for {
<-reportTicker
t.reportQueues()
}
}()
for {
select { select {
case <-writeMemoryTicker: case <-writeMemoryTicker:
t.writeMemory() t.writeMemory()