// Copyright 2022 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package chunks import "sync" // writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers // to avoid using a lot of memory when it's empty. It does that by storing elements into segments // of equal size (segmentSize). When segment is not used anymore, reference to it are removed, // so it can be treated as a garbage. type writeJobQueue struct { maxSize int segmentSize int mtx sync.Mutex // protects all following variables pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it first, last *writeJobQueueSegment // pointer to first and last segment, if any size int // total size of the queue closed bool // after closing the queue, nothing can be pushed to it } type writeJobQueueSegment struct { segment []chunkWriteJob nextRead, nextWrite int // index of next read and next write in this segment. nextSegment *writeJobQueueSegment // next segment, if any } func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue { if maxSize <= 0 || segmentSize <= 0 { panic("invalid queue") } q := &writeJobQueue{ maxSize: maxSize, segmentSize: segmentSize, } q.pushed = sync.NewCond(&q.mtx) q.popped = sync.NewCond(&q.mtx) return q } func (q *writeJobQueue) close() { q.mtx.Lock() defer q.mtx.Unlock() q.closed = true // Unblock all blocked goroutines. q.pushed.Broadcast() q.popped.Broadcast() } // push blocks until there is space available in the queue, and then adds job to the queue. // If queue is closed or gets closed while waiting for space, push returns false. func (q *writeJobQueue) push(job chunkWriteJob) bool { q.mtx.Lock() defer q.mtx.Unlock() // Wait until queue has more space or is closed. for !q.closed && q.size >= q.maxSize { q.popped.Wait() } if q.closed { return false } // Check if this segment has more space for writing, and create new one if not. if q.last == nil || q.last.nextWrite >= q.segmentSize { prevLast := q.last q.last = &writeJobQueueSegment{ segment: make([]chunkWriteJob, q.segmentSize), } if prevLast != nil { prevLast.nextSegment = q.last } if q.first == nil { q.first = q.last } } q.last.segment[q.last.nextWrite] = job q.last.nextWrite++ q.size++ q.pushed.Signal() return true } // pop returns first job from the queue, and true. // If queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false). // If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false. func (q *writeJobQueue) pop() (chunkWriteJob, bool) { q.mtx.Lock() defer q.mtx.Unlock() // wait until something is pushed to the queue, or queue is closed. for q.size == 0 { if q.closed { return chunkWriteJob{}, false } q.pushed.Wait() } res := q.first.segment[q.first.nextRead] q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element q.first.nextRead++ q.size-- // If we have read all possible elements from first segment, we can drop it. if q.first.nextRead >= q.segmentSize { q.first = q.first.nextSegment if q.first == nil { q.last = nil } } q.popped.Signal() return res, true } // length returns number of all jobs in the queue. func (q *writeJobQueue) length() int { q.mtx.Lock() defer q.mtx.Unlock() return q.size }