From d468271e2fb7c941e9494b2b445e1cecdeab412c Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 16 Apr 2013 17:13:29 +0200 Subject: [PATCH] Fix append queue telemetry and parameterize sizes. The original append queue telemetry never worked, because it was updated only upon the exit of the select statement, which would usually liberate the queues of contents. This has been fixed to be reported arbitrarily. The queue sizes are now parameterizable via flags. --- main.go | 4 +++- storage/metric/tiered.go | 12 ++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index c41bb10bc..85bbee144 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,8 @@ var ( scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") + diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") + memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.") ) func main() { @@ -48,7 +50,7 @@ func main() { log.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - ts, err := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) + ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) if err != nil { log.Fatalf("Error opening storage: %s", err) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 570da0667..d4c30d24d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -166,9 +166,17 @@ func (t *tieredStorage) Serve() { writeMemoryTicker = time.Tick(t.writeMemoryInterval) ) - for { - t.reportQueues() + go func() { + reportTicker := time.Tick(time.Second) + for { + <-reportTicker + + t.reportQueues() + } + }() + + for { select { case <-writeMemoryTicker: t.writeMemory()