// Copyright 2021 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package chunks import ( "errors" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/chunkenc" ) const ( // Minimum recorded peak since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. chunkRefMapShrinkThreshold = 1000 // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. chunkRefMapMinShrinkInterval = 10 * time.Minute // Maximum size of segment used by job queue (number of elements). With chunkWriteJob being 64 bytes, // this will use ~512 KiB for empty queue. maxChunkQueueSegmentSize = 8192 ) type chunkWriteJob struct { cutFile bool seriesRef HeadSeriesRef mint int64 maxt int64 chk chunkenc.Chunk ref ChunkDiskMapperRef callback func(error) } // chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion. // Chunks that shall be written get added to the queue, which is consumed asynchronously. // Adding jobs to the queue is non-blocking as long as the queue isn't full. type chunkWriteQueue struct { jobs *writeJobQueue chunkRefMapMtx sync.RWMutex chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it. chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time. // isRunningMtx serves two purposes: // 1. It protects isRunning field. // 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block // while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1. isRunningMtx sync.Mutex isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF // Keeping separate counters instead of only a single CounterVec to improve the performance of the critical // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter shrink prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { counters := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "prometheus_tsdb_chunk_write_queue_operations_total", Help: "Number of operations on the chunk_write_queue.", }, []string{"operation"}, ) segmentSize := size if segmentSize > maxChunkQueueSegmentSize { segmentSize = maxChunkQueueSegmentSize } q := &chunkWriteQueue{ jobs: newWriteJobQueue(size, segmentSize), chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), chunkRefMapLastShrink: time.Now(), writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), shrink: counters.WithLabelValues("shrink"), } if reg != nil { reg.MustRegister(counters) } q.start() return q } func (c *chunkWriteQueue) start() { c.workerWg.Add(1) go func() { defer c.workerWg.Done() for { job, ok := c.jobs.pop() if !ok { return } c.processJob(job) } }() c.isRunningMtx.Lock() c.isRunning = true c.isRunningMtx.Unlock() } func (c *chunkWriteQueue) processJob(job chunkWriteJob) { err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.cutFile) if job.callback != nil { job.callback(err) } c.chunkRefMapMtx.Lock() defer c.chunkRefMapMtx.Unlock() delete(c.chunkRefMap, job.ref) c.completed.Inc() c.shrinkChunkRefMap() } // shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met, // if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method. // // We do this because Go runtime doesn't release internal memory used by map after map has been emptied. // To achieve that we create new map instead and throw the old one away. func (c *chunkWriteQueue) shrinkChunkRefMap() { if len(c.chunkRefMap) > 0 { // Can't shrink it while there is data in it. return } if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold { // Not shrinking it because it has not grown to the minimum threshold yet. return } now := time.Now() if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval { // Not shrinking it because the minimum duration between shrink-events has not passed yet. return } // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. // We are trying to hit the sweet spot in the trade-off between initializing it to a very small size // potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially // resulting in unused allocated memory. c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) c.chunkRefMapPeakSize = 0 c.chunkRefMapLastShrink = now c.shrink.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { defer func() { if err == nil { c.adds.Inc() } }() c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() if !c.isRunning { return errors.New("queue is not running") } c.chunkRefMapMtx.Lock() c.chunkRefMap[job.ref] = job.chk // Keep track of the peak usage of c.chunkRefMap. if len(c.chunkRefMap) > c.chunkRefMapPeakSize { c.chunkRefMapPeakSize = len(c.chunkRefMap) } c.chunkRefMapMtx.Unlock() if ok := c.jobs.push(job); !ok { c.chunkRefMapMtx.Lock() delete(c.chunkRefMap, job.ref) c.chunkRefMapMtx.Unlock() return errors.New("queue is closed") } return nil } func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk { c.chunkRefMapMtx.RLock() defer c.chunkRefMapMtx.RUnlock() chk, ok := c.chunkRefMap[ref] if ok { c.gets.Inc() } return chk } func (c *chunkWriteQueue) stop() { c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() if !c.isRunning { return } c.isRunning = false c.jobs.close() c.workerWg.Wait() } func (c *chunkWriteQueue) queueIsEmpty() bool { return c.queueSize() == 0 } func (c *chunkWriteQueue) queueIsFull() bool { // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh // because one job is currently being processed and blocked in the writer. return c.queueSize() == c.jobs.maxSize+1 } func (c *chunkWriteQueue) queueSize() int { c.chunkRefMapMtx.Lock() defer c.chunkRefMapMtx.Unlock() // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has // been fully processed, it remains in the chunkRefMap until the processing is complete. return len(c.chunkRefMap) }