diff --git a/main.go b/main.go index c41bb10bc..85bbee144 100644 --- a/main.go +++ b/main.go @@ -39,6 +39,8 @@ var ( scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") + diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") + memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.") ) func main() { @@ -48,7 +50,7 @@ func main() { log.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - ts, err := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) + ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) if err != nil { log.Fatalf("Error opening storage: %s", err) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 570da0667..d4c30d24d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -166,9 +166,17 @@ func (t *tieredStorage) Serve() { writeMemoryTicker = time.Tick(t.writeMemoryInterval) ) - for { - t.reportQueues() + go func() { + reportTicker := time.Tick(time.Second) + for { + <-reportTicker + + t.reportQueues() + } + }() + + for { select { case <-writeMemoryTicker: t.writeMemory()