diff --git a/main.go b/main.go index e8f8ab7a7..fd3c533cd 100644 --- a/main.go +++ b/main.go @@ -57,7 +57,7 @@ var ( numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") - persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.") + persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 32*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.") checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.") diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 1dc1a939f..5ce731be7 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -84,7 +84,7 @@ type indexingOp struct { // A Persistence is used by a Storage implementation to store samples // persistently across restarts. The methods are only goroutine-safe if -// explicitly marked as such below. The chunk-related methods persistChunk, +// explicitly marked as such below. The chunk-related methods persistChunks, // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { @@ -283,35 +283,35 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie return lvs, nil } -// persistChunk persists a single chunk of a series. It is the caller's -// responsibility to not modify the chunk concurrently and to not persist or -// drop anything for the same fingerprint concurrently. It returns the -// (zero-based) index of the persisted chunk within the series file. In case of -// an error, the returned index is -1 (to avoid the misconception that the chunk -// was written at position 0). -func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) { - // 1. Open chunk file. +// persistChunks persists a number of consecutive chunks of a series. It is the +// caller's responsibility to not modify the chunks concurrently and to not +// persist or drop anything for the same fingerprint concurrently. It returns +// the (zero-based) index of the first persisted chunk within the series +// file. In case of an error, the returned index is -1 (to avoid the +// misconception that the chunk was written at position 0). +func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) (int, error) { + f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err } defer f.Close() - b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen) + b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+p.chunkLen)) - // 2. Write the header (chunk type and first/last times). - err = writeChunkHeader(b, c) - if err != nil { - return -1, err + for _, c := range chunks { + err = writeChunkHeader(b, c) + if err != nil { + return -1, err + } + + err = c.marshal(b) + if err != nil { + return -1, err + } } - // 3. Write chunk into file. - err = c.marshal(b) - if err != nil { - return -1, err - } - - // 4. Determine index within the file. + // Determine index within the file. b.Flush() offset, err := f.Seek(0, os.SEEK_CUR) if err != nil { @@ -322,7 +322,7 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er return -1, err } - return index - 1, err + return index - len(chunks), err } // loadChunks loads a group of chunks of a timeseries by their index. The chunk diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index ecf941590..c7564caa3 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -83,7 +83,7 @@ func TestPersistLoadDropChunks(t *testing.T) { for fp, chunks := range fpToChunks { for i, c := range chunks { - index, err := p.persistChunk(fp, c) + index, err := p.persistChunks(fp, []chunk{c}) if err != nil { t.Fatal(err) } diff --git a/storage/local/storage.go b/storage/local/storage.go index 4c41f2ba3..ff215680b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -76,9 +76,11 @@ type memorySeriesStorage struct { appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed. - persistQueue chan persistRequest - persistStopped chan struct{} - persistence *persistence + persistQueue chan persistRequest + persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue. + persistStopped chan struct{} + + persistence *persistence countPersistedHeadChunks chan struct{} @@ -145,9 +147,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { appendLastTimestamp: clientmodel.Earliest, appendQueue: make(chan *clientmodel.Sample, appendQueueCap), - persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), - persistStopped: make(chan struct{}), - persistence: p, + // The actual buffering happens within handlePersistQueue, so + // cap of persistQueue just has to be enough to not block while + // handlePersistQueue is writing to disk (20ms or so). + persistQueue: make(chan persistRequest, 1024), + persistQueueCap: o.PersistenceQueueCapacity, + persistStopped: make(chan struct{}), + persistence: p, countPersistedHeadChunks: make(chan struct{}, 1024), @@ -568,32 +574,100 @@ func (s *memorySeriesStorage) maybeEvict() { } func (s *memorySeriesStorage) handlePersistQueue() { - for req := range s.persistQueue { - s.persistQueueLength.Set(float64(len(s.persistQueue))) - start := time.Now() - s.fpLocker.Lock(req.fingerprint) - offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) - if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 { - // This is the first chunk persisted for a newly created - // series that had prior chunks on disk. Finally, we can - // set the chunkDescsOffset. - series.chunkDescsOffset = offset + chunkMaps := chunkMaps{} + chunkCount := 0 + + persistMostConsecutiveChunks := func() { + fp, cds := chunkMaps.pop() + if err := s.persistChunks(fp, cds); err != nil { + // Need to put chunks back for retry. + for _, cd := range cds { + chunkMaps.add(fp, cd) + } + return } - s.fpLocker.Unlock(req.fingerprint) - s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) - if err != nil { - s.persistErrors.Inc() - glog.Error("Error persisting chunk: ", err) - s.persistence.setDirty(true) - continue - } - req.chunkDesc.unpin(s.evictRequests) - chunkOps.WithLabelValues(persistAndUnpin).Inc() + chunkCount -= len(cds) } + +loop: + for { + if chunkCount >= s.persistQueueCap && chunkCount > 0 { + glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount) + persistMostConsecutiveChunks() + } + select { + case req, ok := <-s.persistQueue: + if !ok { + break loop + } + chunkMaps.add(req.fingerprint, req.chunkDesc) + chunkCount++ + default: + if chunkCount > 0 { + persistMostConsecutiveChunks() + continue loop + } + // If we are here, there is nothing to do right now. So + // just wait for a persist request to come in. + req, ok := <-s.persistQueue + if !ok { + break loop + } + chunkMaps.add(req.fingerprint, req.chunkDesc) + chunkCount++ + } + s.persistQueueLength.Set(float64(chunkCount)) + } + + // Drain all requests. + for _, m := range chunkMaps { + for fp, cds := range m { + if s.persistChunks(fp, cds) == nil { + chunkCount -= len(cds) + if (chunkCount+len(cds))/1000 > chunkCount/1000 { + glog.Infof( + "Still draining persist queue, %d chunks left to persist...", + chunkCount, + ) + } + s.persistQueueLength.Set(float64(chunkCount)) + } + } + } + glog.Info("Persist queue drained and stopped.") close(s.persistStopped) } +func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error { + start := time.Now() + chunks := make([]chunk, len(cds)) + for i, cd := range cds { + chunks[i] = cd.chunk + } + s.fpLocker.Lock(fp) + offset, err := s.persistence.persistChunks(fp, chunks) + if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 { + // This is the first chunk persisted for a newly created + // series that had prior chunks on disk. Finally, we can + // set the chunkDescsOffset. + series.chunkDescsOffset = offset + } + s.fpLocker.Unlock(fp) + s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) + if err != nil { + s.persistErrors.Inc() + glog.Error("Error persisting chunks: ", err) + s.persistence.setDirty(true) + return err + } + for _, cd := range cds { + cd.unpin(s.evictRequests) + } + chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) + return nil +} + // waitForNextFP waits an estimated duration, after which we want to process // another fingerprint so that we will process all fingerprints in a tenth of // s.purgeAfter assuming that the system is doing nothing else, e.g. if we want @@ -927,3 +1001,52 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { count := atomic.LoadInt64(&numMemChunks) ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) } + +// chunkMaps is a slice of maps with chunkDescs to be persisted. +// Each chunk map contains n consecutive chunks to persist, where +// n is the index+1. +type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc + +// add adds a chunk to chunkMaps. +func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) { + // Runtime of this method is linear with the number of + // chunkMaps. However, we expect only ever very few maps. + numMaps := len(*cm) + for i, m := range *cm { + if cds, ok := m[fp]; ok { + // Found our fp! Add cd and level up. + cds = append(cds, cd) + delete(m, fp) + if i == numMaps-1 { + *cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{}) + } + (*cm)[i+1][fp] = cds + return + } + } + // Our fp isn't contained in cm yet. Add it to the first map (and add a + // first map if there is none). + if numMaps == 0 { + *cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}} + } + (*cm)[0][fp] = []*chunkDesc{cd} +} + +// pop retrieves and removes a fingerprint with all its chunks. It chooses one +// of the fingerprints with the most chunks. It panics if cm has no entries. +func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) { + m := (*cm)[len(*cm)-1] + for fp, cds := range m { + delete(m, fp) + // Prune empty maps from top level. + for len(m) == 0 { + *cm = (*cm)[:len(*cm)-1] + if len(*cm) == 0 { + break + } + m = (*cm)[len(*cm)-1] + } + return fp, cds + } + panic("popped from empty chunkMaps") +} diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 9b5a37e7e..a30a0a7fe 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -16,12 +16,15 @@ package local import ( "fmt" "math/rand" + "reflect" "testing" "testing/quick" "time" "github.com/golang/glog" + clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility/test" ) @@ -662,3 +665,93 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge } return result } + +func TestChunkMaps(t *testing.T) { + cm := chunkMaps{} + + cd1 := &chunkDesc{refCount: 1} // Abuse refCount as identifier. + cd21 := &chunkDesc{refCount: 21} + cd22 := &chunkDesc{refCount: 22} + cd31 := &chunkDesc{refCount: 31} + cd32 := &chunkDesc{refCount: 32} + cd33 := &chunkDesc{refCount: 33} + cd41 := &chunkDesc{refCount: 41} + cd42 := &chunkDesc{refCount: 42} + cd43 := &chunkDesc{refCount: 43} + cd44 := &chunkDesc{refCount: 44} + cd51 := &chunkDesc{refCount: 51} + cd52 := &chunkDesc{refCount: 52} + cd53 := &chunkDesc{refCount: 53} + cd54 := &chunkDesc{refCount: 54} + cd55 := &chunkDesc{refCount: 55} + + cm.add(5, cd51) + cm.add(3, cd31) + cm.add(5, cd52) + cm.add(1, cd1) + cm.add(4, cd41) + cm.add(4, cd42) + cm.add(5, cd53) + cm.add(3, cd32) + cm.add(2, cd21) + cm.add(5, cd54) + cm.add(3, cd33) + cm.add(4, cd43) + cm.add(2, cd22) + cm.add(4, cd44) + cm.add(5, cd55) + + var fpWant, fpGot clientmodel.Fingerprint + var cdsWant, cdsGot []*chunkDesc + + fpWant = 5 + cdsWant = []*chunkDesc{cd51, cd52, cd53, cd54, cd55} + fpGot, cdsGot = cm.pop() + if fpWant != fpGot { + t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot) + } + if !reflect.DeepEqual(cdsWant, cdsGot) { + t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot) + } + + fpWant = 4 + cdsWant = []*chunkDesc{cd41, cd42, cd43, cd44} + fpGot, cdsGot = cm.pop() + if fpWant != fpGot { + t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot) + } + if !reflect.DeepEqual(cdsWant, cdsGot) { + t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot) + } + + fpWant = 3 + cdsWant = []*chunkDesc{cd31, cd32, cd33} + fpGot, cdsGot = cm.pop() + if fpWant != fpGot { + t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot) + } + if !reflect.DeepEqual(cdsWant, cdsGot) { + t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot) + } + + fpWant = 2 + cdsWant = []*chunkDesc{cd21, cd22} + fpGot, cdsGot = cm.pop() + if fpWant != fpGot { + t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot) + } + if !reflect.DeepEqual(cdsWant, cdsGot) { + t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot) + } + + fpWant = 1 + cdsWant = []*chunkDesc{cd1} + fpGot, cdsGot = cm.pop() + if fpWant != fpGot { + t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot) + } + if !reflect.DeepEqual(cdsWant, cdsGot) { + t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot) + } + +} diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index c04b61adf..e52d87ea2 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -41,7 +41,7 @@ func NewTestStorage(t test.T) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, - PersistenceRetentionPeriod: 24 * 7 * time.Hour, + PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour, }