// Copyright 2014 Prometheus Team // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package local import ( "bufio" "encoding/binary" "fmt" "io" "math" "os" "path" "strings" "sync" "sync/atomic" "time" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/metric" ) const ( seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" headsFileName = "heads.db" headsTempFileName = "heads.db.tmp" headsFormatVersion = 1 headsMagicString = "PrometheusHeads" fileBufSize = 1 << 16 // 64kiB. chunkHeaderLen = 17 chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 indexingMaxBatchSize = 1024 * 1024 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. indexingQueueCapacity = 1024 * 16 ) const ( flagHeadChunkPersisted byte = 1 << iota // Add more flags here like: // flagFoo // flagBar ) type indexingOpType byte const ( add indexingOpType = iota remove ) type indexingOp struct { fingerprint clientmodel.Fingerprint metric clientmodel.Metric opType indexingOpType } // A Persistence is used by a Storage implementation to store samples // persistently across restarts. The methods are only goroutine-safe if // explicitly marked as such below. The chunk-related methods PersistChunk, // DropChunks, LoadChunks, and LoadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { basePath string chunkLen int // archiveMtx protects the archiving-related methods archiveMetric, // unarchiveMetric, dropArchiveMetric, and getFingerprintsModifiedBefore // from concurrent calls. archiveMtx sync.Mutex archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex labelPairToFingerprints *index.LabelPairFingerprintIndex labelNameToLabelValues *index.LabelNameLabelValuesIndex indexingQueue chan indexingOp indexingStopped chan struct{} indexingFlush chan chan int indexingQueueLength prometheus.Gauge indexingQueueCapacity prometheus.Metric indexingBatchSizes prometheus.Summary indexingBatchLatency prometheus.Summary checkpointDuration prometheus.Gauge dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirty, becameDirty bool } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) if err != nil { return nil, err } archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath) if err != nil { return nil, err } p := &persistence{ basePath: basePath, chunkLen: chunkLen, archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, indexingQueue: make(chan indexingOp, indexingQueueCapacity), indexingStopped: make(chan struct{}), indexingFlush: make(chan chan int), indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "indexing_queue_length", Help: "The number of metrics waiting to be indexed.", }), indexingQueueCapacity: prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"), "The capacity of the indexing queue.", nil, nil, ), prometheus.GaugeValue, float64(indexingQueueCapacity), ), indexingBatchSizes: prometheus.NewSummary( prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, Name: "indexing_batch_sizes", Help: "Quantiles for indexing batch sizes (number of metrics per batch).", }, ), indexingBatchLatency: prometheus.NewSummary( prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, Name: "indexing_batch_latency_milliseconds", Help: "Quantiles for batch indexing latencies in milliseconds.", }, ), checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "checkpoint_duration_milliseconds", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", }), dirty: dirty, } if dirtyFile, err := os.OpenFile(p.dirtyFileName(), os.O_CREATE|os.O_EXCL, 0666); err == nil { dirtyFile.Close() } else if os.IsExist(err) { p.dirty = true } else { return nil, err } if p.dirty { // Blow away the label indexes. We'll rebuild them later. if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil { return nil, err } if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil { return nil, err } } labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath) if err != nil { return nil, err } labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath) if err != nil { return nil, err } p.labelPairToFingerprints = labelPairToFingerprints p.labelNameToLabelValues = labelNameToLabelValues go p.processIndexingQueue() return p, nil } // Describe implements prometheus.Collector. func (p *persistence) Describe(ch chan<- *prometheus.Desc) { ch <- p.indexingQueueLength.Desc() ch <- p.indexingQueueCapacity.Desc() p.indexingBatchSizes.Describe(ch) p.indexingBatchLatency.Describe(ch) ch <- p.checkpointDuration.Desc() } // Collect implements prometheus.Collector. func (p *persistence) Collect(ch chan<- prometheus.Metric) { p.indexingQueueLength.Set(float64(len(p.indexingQueue))) ch <- p.indexingQueueLength ch <- p.indexingQueueCapacity p.indexingBatchSizes.Collect(ch) p.indexingBatchLatency.Collect(ch) ch <- p.checkpointDuration } // dirtyFileName returns the name of the (empty) file used to mark the // persistency layer as dirty. func (p *persistence) dirtyFileName() string { return path.Join(p.basePath, "DIRTY") } // isDirty returns the dirty flag in a goroutine-safe way. func (p *persistence) isDirty() bool { p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() return p.dirty } // setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // set to true with this method, it cannot be set to false again. (If we became // dirty during our runtime, there is no way back. If we were dirty from the // start, a clean-up might make us clean again.) func (p *persistence) setDirty(dirty bool) { p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } p.dirty = dirty if dirty { p.becameDirty = true glog.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } } // crashRecovery is called by loadSeriesMapAndHeads if the persistence appears // to be dirty after the loading (either because the loading resulted in an // error or because the persistence was dirty from the start). Not goroutine // safe. Only call before anything else is running. func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error { glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.") fpsSeen := map[clientmodel.Fingerprint]struct{}{} count := 0 glog.Info("Scanning files.") for i := 0; i < 256; i++ { dirname := path.Join(p.basePath, fmt.Sprintf("%02x", i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue } if err != nil { return err } defer dir.Close() var fis []os.FileInfo for ; err != io.EOF; fis, err = dir.Readdir(1024) { if err != nil { return err } for _, fi := range fis { fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries) if ok { fpsSeen[fp] = struct{}{} } count++ if count%10000 == 0 { glog.Infof("%d files scanned.", count) } } } } glog.Infof("File scan complete. %d fingerprints found.", len(fpsSeen)) glog.Info("Checking for series without series file.") for fp, s := range fingerprintToSeries { if _, seen := fpsSeen[fp]; !seen { // fp exists in fingerprintToSeries, but has no representation on disk. if s.headChunkPersisted { // Oops, head chunk was persisted, but nothing on disk. // Thus, we lost that series completely. Clean up the remnants. delete(fingerprintToSeries, fp) if err := p.dropArchivedMetric(fp); err != nil { // Dropping the archived metric didn't work, so try // to unindex it, just in case it's in the indexes. p.unindexMetric(fp, s.metric) } glog.Warningf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric) continue } // If we are here, the only chunk we have is the head chunk. // Adjust things accordingly. if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 { glog.Warningf( "Lost at least %d chunks for fingerprint %v, metric %v.", len(s.chunkDescs)+s.chunkDescsOffset-1, fp, s.metric, // If chunkDescsOffset is -1, this will underreport. Oh well... ) s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:] s.chunkDescsOffset = 0 } fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete. } } glog.Info("Check for series without series file complete.") if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil { return err } if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil { return err } p.setDirty(false) glog.Warning("Crash recovery complete.") return nil } // TODO: Document. func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { filename := path.Join(dirname, fi.Name()) purge := func() { glog.Warningf("Deleting lost series file %s.", filename) // TODO: Move to lost+found directory? os.Remove(filename) } var fp clientmodel.Fingerprint if len(fi.Name()) != 17 || !strings.HasSuffix(fi.Name(), ".db") { glog.Warningf("Unexpected series file name %s.", filename) purge() return fp, false } fp.LoadFromString(path.Base(dirname) + fi.Name()[:14]) // TODO: Panics if that doesn't parse as hex. bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) if bytesToTrim != 0 { glog.Warningf( "Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.", filename, chunksInFile, bytesToTrim, ) f, err := os.OpenFile(filename, os.O_WRONLY, 0640) if err != nil { glog.Errorf("Could not open file %s: %s", filename, err) purge() return fp, false } if err := f.Truncate(fi.Size() - bytesToTrim); err != nil { glog.Errorf("Failed to truncate file %s: %s", filename, err) purge() return fp, false } } if chunksInFile == 0 { glog.Warningf("No chunks left in file %s.", filename) purge() return fp, false } s, ok := fingerprintToSeries[fp] if ok { // This series is supposed to not be archived. if s == nil { panic("fingerprint mapped to nil pointer") } if bytesToTrim == 0 && s.chunkDescsOffset != -1 && ((s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)) || (!s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)-1)) { // Everything is consistent. We are good. return fp, true } // If we are here, something's fishy. if s.headChunkPersisted { // This is the easy case as we don't have a head chunk // in heads.db. Treat this series as a freshly // unarchived one. No chunks or chunkDescs in memory, no // current head chunk. glog.Warningf( "Treating recovered metric %v, fingerprint %v, as freshly unarchvied, with %d chunks in series file.", s.metric, fp, chunksInFile, ) s.chunkDescs = nil s.chunkDescsOffset = -1 return fp, true } // This is the tricky one: We have a head chunk from heads.db, // but the very same head chunk might already be in the series // file. Strategy: Check the first time of both. If it is the // same or newer, assume the latest chunk in the series file // is the most recent head chunk. If not, keep the head chunk // we got from heads.db. // First, assume the head chunk is not yet persisted. s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:] s.chunkDescsOffset = -1 // Load all the chunk descs (which assumes we have none from the future). cds, err := p.loadChunkDescs(fp, clientmodel.Now()) if err != nil { glog.Errorf( "Failed to load chunk descriptors for metric %v, fingerprint %v: %s", s.metric, fp, err, ) purge() return fp, false } if cds[len(cds)-1].firstTime().Before(s.head().firstTime()) { s.chunkDescs = append(cds, s.chunkDescs...) glog.Warningf( "Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered head chunk from checkpoint.", s.metric, fp, chunksInFile, ) } else { glog.Warningf( "Recovered metric %v, fingerprint %v: head chunk found among the %d recovered chunks in series file.", s.metric, fp, chunksInFile, ) s.chunkDescs = cds s.headChunkPersisted = true } s.chunkDescsOffset = 0 return fp, true } // This series is supposed to be archived. metric, err := p.getArchivedMetric(fp) if err != nil { glog.Errorf( "Fingerprint %v assumed archived but couldn't be looked up in archived index: %s", fp, err, ) purge() return fp, false } if metric == nil { glog.Warningf( "Fingerprint %v assumed archived but couldn't be found in archived index.", fp, ) purge() return fp, false } // This series looks like a properly archived one. return fp, true } func (p *persistence) cleanUpArchiveIndexes( fpToSeries map[clientmodel.Fingerprint]*memorySeries, fpsSeen map[clientmodel.Fingerprint]struct{}, ) error { glog.Info("Cleaning up archive indexes.") var fp codable.Fingerprint var m codable.Metric count := 0 if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error { count++ if count%10000 == 0 { glog.Infof("%d archived metrics checked.", count) } if err := kv.Key(&fp); err != nil { return err } _, fpSeen := fpsSeen[clientmodel.Fingerprint(fp)] inMemory := false if fpSeen { _, inMemory = fpToSeries[clientmodel.Fingerprint(fp)] } if !fpSeen || inMemory { if inMemory { glog.Warningf("Archive clean-up: Fingerprint %v is not archived. Purging from archive indexes.", clientmodel.Fingerprint(fp)) } if !fpSeen { glog.Warningf("Archive clean-up: Fingerprint %v is unknown. Purging from archive indexes.", clientmodel.Fingerprint(fp)) } if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } // Delete from timerange index, too. p.archivedFingerprintToTimeRange.Delete(fp) // TODO: Ignoring errors here as fp might not be in // timerange index (which is good) but which would // return an error. Delete signature could be changed // like the Get signature to detect a real error. return nil } // fp is legitimately archived. Make sure it is in timerange index, too. has, err := p.archivedFingerprintToTimeRange.Has(fp) if err != nil { return err } if has { return nil // All good. } glog.Warningf("Archive clean-up: Fingerprint %v is not in time-range index. Unarchiving it for recovery.") if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil { return err } if err := kv.Value(&m); err != nil { return err } series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { return err } series.chunkDescs = cds series.chunkDescsOffset = 0 fpToSeries[clientmodel.Fingerprint(fp)] = series return nil }); err != nil { return err } count = 0 if err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { count++ if count%10000 == 0 { glog.Infof("%d archived time ranges checked.", count) } if err := kv.Key(&fp); err != nil { return err } has, err := p.archivedFingerprintToMetrics.Has(fp) if err != nil { return err } if has { return nil // All good. } glog.Warningf("Archive clean-up: Purging unknown fingerprint %v in time-range index.", fp) if err := p.archivedFingerprintToTimeRange.Delete(fp); err != nil { return err } return nil }); err != nil { return err } glog.Info("Clean-up of archive indexes complete.") return nil } func (p *persistence) rebuildLabelIndexes( fpToSeries map[clientmodel.Fingerprint]*memorySeries, ) error { count := 0 glog.Info("Rebuilding label indexes.") glog.Info("Indexing metrics in memory.") for fp, s := range fpToSeries { p.indexMetric(fp, s.metric) count++ if count%10000 == 0 { glog.Infof("%d metrics queued for indexing.", count) } } glog.Info("Indexing archived metrics.") var fp codable.Fingerprint var m codable.Metric if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error { if err := kv.Key(&fp); err != nil { return err } if err := kv.Value(&m); err != nil { return err } p.indexMetric(clientmodel.Fingerprint(fp), clientmodel.Metric(m)) count++ if count%10000 == 0 { glog.Infof("%d metrics queued for indexing.", count) } return nil }); err != nil { return err } glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)") return nil } // getFingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not yet made it into the index. (Same // applies correspondingly to UnindexMetric.) func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { return nil, err } return fps, nil } // getLabelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not yet made it into the index. (Same // applies correspondingly to UnindexMetric.) func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { return nil, err } return lvs, nil } // persistChunk persists a single chunk of a series. It is the caller's // responsibility to not modify chunk concurrently and to not persist or drop // anything for the same fingerprint concurrently. It returns the (zero-based) // index of the persisted chunk within the series file. In case of an error, the // returned index is -1 (to avoid the misconception that the chunk was written // at position 0). func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err } defer f.Close() b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen) // 2. Write the header (chunk type and first/last times). err = writeChunkHeader(b, c) if err != nil { return -1, err } // 3. Write chunk into file. err = c.marshal(b) if err != nil { return -1, err } // 4. Determine index within the file. b.Flush() offset, err := f.Seek(0, os.SEEK_CUR) if err != nil { return -1, err } index, err := p.chunkIndexForOffset(offset) if err != nil { return -1, err } return index - 1, err } // loadChunks loads a group of chunks of a timeseries by their index. The chunk // with the earliest time will have index 0, the following ones will have // incrementally larger indexes. The indexOffset denotes the offset to be added to // each index in indexes. It is the caller's responsibility to not persist or // drop anything for the same fingerprint concurrently. func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { // TODO: we need to verify at some point that file length is a multiple of // the chunk size. When is the best time to do this, and where to remember // it? Right now, we only do it when loading chunkDescs. f, err := p.openChunkFileForReading(fp) if err != nil { return nil, err } defer f.Close() chunks := make([]chunk, 0, len(indexes)) typeBuf := make([]byte, 1) for _, idx := range indexes { _, err := f.Seek(p.offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) if err != nil { return nil, err } n, err := f.Read(typeBuf) if err != nil { return nil, err } if n != 1 { panic("read returned != 1 bytes") } _, err = f.Seek(chunkHeaderLen-1, os.SEEK_CUR) if err != nil { return nil, err } chunk := chunkForType(typeBuf[0]) chunk.unmarshal(f) chunks = append(chunks, chunk) } return chunks, nil } // loadChunkDescs loads chunkDescs for a series up until a given time. It is // the caller's responsibility to not persist or drop anything for the same // fingerprint concurrently. func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil } if err != nil { return nil, err } defer f.Close() fi, err := f.Stat() if err != nil { return nil, err } totalChunkLen := chunkHeaderLen + p.chunkLen if fi.Size()%int64(totalChunkLen) != 0 { // TODO: record number of encountered corrupt series files in a metric? // Truncate the file size to the nearest multiple of chunkLen. truncateTo := fi.Size() - fi.Size()%int64(totalChunkLen) glog.Infof("Bad series file size for %s: %d bytes (no multiple of %d). Truncating to %d bytes.", fp, fi.Size(), totalChunkLen, truncateTo) // TODO: this doesn't work, as this is a read-only file handle. if err := f.Truncate(truncateTo); err != nil { return nil, err } } numChunks := int(fi.Size()) / totalChunkLen cds := make([]*chunkDesc, 0, numChunks) for i := 0; i < numChunks; i++ { _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) if err != nil { return nil, err } chunkTimesBuf := make([]byte, 16) _, err = io.ReadAtLeast(f, chunkTimesBuf, 16) if err != nil { return nil, err } cd := &chunkDesc{ chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } if !cd.chunkLastTime.Before(beforeTime) { // From here on, we have chunkDescs in memory already. break } cds = append(cds, cd) } chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) atomic.AddInt64(&numMemChunkDescs, int64(len(cds))) return cds, nil } // checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping // and all open (non-full) head chunks. Do not call concurrently with // loadSeriesMapAndHeads. func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { glog.Info("Checkpointing in-memory metrics and head chunks...") begin := time.Now() f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { return } defer func() { closeErr := f.Close() if err != nil { return } err = closeErr if err != nil { return } err = os.Rename(p.headsTempFileName(), p.headsFileName()) duration := time.Since(begin) p.checkpointDuration.Set(float64(duration) / float64(time.Millisecond)) glog.Infof("Done checkpointing in-memory metrics and head chunks in %v.", duration) }() w := bufio.NewWriterSize(f, fileBufSize) if _, err = w.WriteString(headsMagicString); err != nil { return } var numberOfSeriesOffset int if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil { return } numberOfSeriesOffset += len(headsMagicString) numberOfSeriesInHeader := uint64(fingerprintToSeries.length()) // We have to write the number of series as uint64 because we might need // to overwrite it later, and a varint might change byte width then. if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil { return } iter := fingerprintToSeries.iter() defer func() { // Consume the iterator in any case to not leak goroutines. for _ = range iter { } }() var realNumberOfSeries uint64 for m := range iter { func() { // Wrapped in function to use defer for unlocking the fp. fpLocker.Lock(m.fp) defer fpLocker.Unlock(m.fp) if len(m.series.chunkDescs) == 0 { // This series was completely purged or archived in the meantime. Ignore. return } realNumberOfSeries++ var seriesFlags byte if m.series.headChunkPersisted { seriesFlags |= flagHeadChunkPersisted } if err = w.WriteByte(seriesFlags); err != nil { return } if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil { return } var buf []byte buf, err = codable.Metric(m.series.metric).MarshalBinary() if err != nil { return } w.Write(buf) if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { return } if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { return } if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { return } for i, chunkDesc := range m.series.chunkDescs { if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 { if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return } if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { return } } else { // This is the non-persisted head chunk. Fully marshal it. if err = w.WriteByte(chunkType(chunkDesc.chunk)); err != nil { return } if err = chunkDesc.chunk.marshal(w); err != nil { return } } } }() if err != nil { return } } if err = w.Flush(); err != nil { return } if realNumberOfSeries != numberOfSeriesInHeader { // The number of series has changed in the meantime. // Rewrite it in the header. if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil { return } if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil { return } } return } // loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all // open (non-full) head chunks. If recoverable corruption is detected, or if the // dirty flag was set from the beginning, crash recovery is run, which might // take a while. If an unrecoverable error is encountered, it is returned. Call // this method during start-up while nothing else is running in storage // land. This method is utterly goroutine-unsafe. func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { var chunksTotal, chunkDescsTotal int64 fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries) sm = &seriesMap{m: fingerprintToSeries} defer func() { if sm != nil && p.dirty { glog.Warning("Persistence layer appears dirty.") err = p.crashRecovery(fingerprintToSeries) if err != nil { sm = nil } } if err == nil { atomic.AddInt64(&numMemChunks, chunksTotal) atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) } }() f, err := os.Open(p.headsFileName()) if os.IsNotExist(err) { return sm, nil } if err != nil { glog.Warning("Could not open heads file:", err) p.dirty = true return } defer f.Close() r := bufio.NewReaderSize(f, fileBufSize) buf := make([]byte, len(headsMagicString)) if _, err := io.ReadFull(r, buf); err != nil { glog.Warning("Could not read from heads file:", err) p.dirty = true return sm, nil } magic := string(buf) if magic != headsMagicString { glog.Warningf( "unexpected magic string, want %q, got %q", headsMagicString, magic, ) p.dirty = true return } if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil { glog.Warningf("unknown heads format version, want %d", headsFormatVersion) p.dirty = true return sm, nil } numSeries, err := codable.DecodeUint64(r) if err != nil { glog.Warning("Could not decode number of series:", err) p.dirty = true return sm, nil } for ; numSeries > 0; numSeries-- { seriesFlags, err := r.ReadByte() if err != nil { glog.Warning("Could not read series flags:", err) p.dirty = true return sm, nil } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 fp, err := codable.DecodeUint64(r) if err != nil { glog.Warning("Could not decode fingerprint:", err) p.dirty = true return sm, nil } var metric codable.Metric if err := metric.UnmarshalFromReader(r); err != nil { glog.Warning("Could not decode metric:", err) p.dirty = true return sm, nil } chunkDescsOffset, err := binary.ReadVarint(r) if err != nil { glog.Warning("Could not decode chunk descriptor offset:", err) p.dirty = true return sm, nil } savedFirstTime, err := binary.ReadVarint(r) if err != nil { glog.Warning("Could not decode saved first time:", err) p.dirty = true return sm, nil } numChunkDescs, err := binary.ReadVarint(r) if err != nil { glog.Warning("Could not decode number of chunk descriptors:", err) p.dirty = true return sm, nil } chunkDescs := make([]*chunkDesc, numChunkDescs) chunkDescsTotal += numChunkDescs for i := int64(0); i < numChunkDescs; i++ { if headChunkPersisted || i < numChunkDescs-1 { firstTime, err := binary.ReadVarint(r) if err != nil { glog.Warning("Could not decode first time:", err) p.dirty = true return sm, nil } lastTime, err := binary.ReadVarint(r) if err != nil { glog.Warning("Could not decode last time:", err) p.dirty = true return sm, nil } chunkDescs[i] = &chunkDesc{ chunkFirstTime: clientmodel.Timestamp(firstTime), chunkLastTime: clientmodel.Timestamp(lastTime), } } else { // Non-persisted head chunk. chunksTotal++ chunkType, err := r.ReadByte() if err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true return sm, nil } chunk := chunkForType(chunkType) if err := chunk.unmarshal(r); err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true return sm, nil } chunkDescs[i] = newChunkDesc(chunk) } } fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{ metric: clientmodel.Metric(metric), chunkDescs: chunkDescs, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), headChunkPersisted: headChunkPersisted, } } return sm, nil } // dropChunks deletes all chunks from a series whose last sample time is before // beforeTime. It returns the timestamp of the first sample in the oldest chunk // _not_ dropped, the number of deleted chunks, and true if all chunks of the // series have been deleted (in which case the returned timestamp will be 0 and // must be ignored). It is the caller's responsibility to make sure nothing is // persisted or loaded for the same fingerprint concurrently. func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ( firstTimeNotDropped clientmodel.Timestamp, numDropped int, allDropped bool, err error, ) { defer func() { if err != nil { p.setDirty(true) } }() f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return 0, 0, true, nil } if err != nil { return 0, 0, false, err } defer f.Close() // Find the first chunk that should be kept. var i int var firstTime clientmodel.Timestamp for ; ; i++ { _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) if err != nil { return 0, 0, false, err } timeBuf := make([]byte, 16) _, err = io.ReadAtLeast(f, timeBuf, 16) if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. chunkOps.WithLabelValues(purge).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { return 0, 0, true, err } return 0, i, true, nil } if err != nil { return 0, 0, false, err } lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:])) if !lastTime.Before(beforeTime) { firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf)) chunkOps.WithLabelValues(purge).Add(float64(i)) break } } // We've found the first chunk that should be kept. Seek backwards to the // beginning of its header and start copying everything from there into a new // file. _, err = f.Seek(-(chunkHeaderFirstTimeOffset + 16), os.SEEK_CUR) if err != nil { return 0, 0, false, err } temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640) if err != nil { return 0, 0, false, err } defer temp.Close() if _, err := io.Copy(temp, f); err != nil { return 0, 0, false, err } if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil { return 0, 0, false, err } return firstTime, i, false, nil } // indexMetric queues the given metric for addition to the indexes needed by // getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsModifiedBefore. If the queue is full, this method blocks // until the metric can be queued. This method is goroutine-safe. func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { p.indexingQueue <- indexingOp{fp, m, add} } // unindexMetric queues references to the given metric for removal from the // indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsModifiedBefore. The index of fingerprints to archived metrics // is not affected by this removal. (In fact, never call this method for an // archived metric. To drop an archived metric, call dropArchivedFingerprint.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { p.indexingQueue <- indexingOp{fp, m, remove} } // waitForIndexing waits until all items in the indexing queue are processed. If // queue processing is currently on hold (to gather more ops for batching), this // method will trigger an immediate start of processing. This method is // goroutine-safe. func (p *persistence) waitForIndexing() { wait := make(chan int) for { p.indexingFlush <- wait if <-wait == 0 { break } } } // archiveMetric persists the mapping of the given fingerprint to the given // metric, together with the first and last timestamp of the series belonging to // the metric. This method is goroutine-safe. func (p *persistence) archiveMetric( fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { p.archiveMtx.Lock() defer p.archiveMtx.Unlock() if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { p.setDirty(true) return err } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { p.setDirty(true) return err } return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) return } // updateArchivedTimeRange updates an archived time range. The caller must make // sure that the fingerprint is currently archived (the time range will // otherwise be added without the corresponding metric in the archive). func (p *persistence) updateArchivedTimeRange( fp clientmodel.Fingerprint, first, last clientmodel.Timestamp, ) error { return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}) } // getFingerprintsModifiedBefore returns the fingerprints of archived timeseries // that have live samples before the provided timestamp. This method is // goroutine-safe. func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { // The locking makes sure archivedFingerprintToTimeRange won't be // mutated while being iterated over (which will probably not result in // races, but might still yield weird results). p.archiveMtx.Lock() defer p.archiveMtx.Unlock() var fp codable.Fingerprint var tr codable.TimeRange fps := []clientmodel.Fingerprint{} p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { if err := kv.Value(&tr); err != nil { return err } if tr.First.Before(beforeTime) { if err := kv.Key(&fp); err != nil { return err } fps = append(fps, clientmodel.Fingerprint(fp)) } return nil }) return fps, nil } // getArchivedMetric retrieves the archived metric with the given // fingerprint. This method is goroutine-safe. func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) return metric, err } // dropArchivedMetric deletes an archived fingerprint and its corresponding // metric entirely. It also queues the metric for un-indexing (no need to call // unindexMetric for the deleted metric.) This method is goroutine-safe. func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) { defer func() { if err != nil { p.setDirty(true) } }() p.archiveMtx.Lock() defer p.archiveMtx.Unlock() metric, err := p.getArchivedMetric(fp) if err != nil || metric == nil { return err } if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil { return err } if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return err } p.unindexMetric(fp, metric) return nil } // unarchiveMetric deletes an archived fingerprint and its metric, but (in // contrast to dropArchivedMetric) does not un-index the metric. If a metric // was actually deleted, the method returns true and the first time of the // deleted metric. This method is goroutine-safe. func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, clientmodel.Timestamp, error) { p.archiveMtx.Lock() defer p.archiveMtx.Unlock() firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil || !has { return false, firstTime, err } if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil { return false, firstTime, err } if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return false, firstTime, err } return true, firstTime, nil } // close flushes the indexing queue and other buffered data and releases any // held resources. It also removes the dirty marker file if successful and if // the persistence is currently not marked as dirty. func (p *persistence) close() error { close(p.indexingQueue) <-p.indexingStopped var lastError error if err := p.archivedFingerprintToMetrics.Close(); err != nil { lastError = err glog.Error("Error closing archivedFingerprintToMetric index DB: ", err) } if err := p.archivedFingerprintToTimeRange.Close(); err != nil { lastError = err glog.Error("Error closing archivedFingerprintToTimeRange index DB: ", err) } if err := p.labelPairToFingerprints.Close(); err != nil { lastError = err glog.Error("Error closing labelPairToFingerprints index DB: ", err) } if err := p.labelNameToLabelValues.Close(); err != nil { lastError = err glog.Error("Error closing labelNameToLabelValues index DB: ", err) } if lastError == nil && !p.isDirty() { lastError = os.Remove(p.dirtyFileName()) } return lastError } func (p *persistence) dirNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() return path.Join(p.basePath, fpStr[0:2]) } func (p *persistence) fileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil { return nil, err } return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) } func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { return os.Open(p.fileNameForFingerprint(fp)) } func writeChunkHeader(w io.Writer, c chunk) error { header := make([]byte, chunkHeaderLen) header[chunkHeaderTypeOffset] = chunkType(c) binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) _, err := w.Write(header) return err } func (p *persistence) offsetForChunkIndex(i int) int64 { return int64(i * (chunkHeaderLen + p.chunkLen)) } func (p *persistence) chunkIndexForOffset(offset int64) (int, error) { if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 { return -1, fmt.Errorf( "offset %d is not a multiple of on-disk chunk length %d", offset, chunkHeaderLen+p.chunkLen, ) } return int(offset) / (chunkHeaderLen + p.chunkLen), nil } func (p *persistence) headsFileName() string { return path.Join(p.basePath, headsFileName) } func (p *persistence) headsTempFileName() string { return path.Join(p.basePath, headsTempFileName) } func (p *persistence) processIndexingQueue() { batchSize := 0 nameToValues := index.LabelNameLabelValuesMapping{} pairToFPs := index.LabelPairFingerprintsMapping{} batchTimeout := time.NewTimer(indexingBatchTimeout) defer batchTimeout.Stop() commitBatch := func() { p.indexingBatchSizes.Observe(float64(batchSize)) defer func(begin time.Time) { p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond)) }(time.Now()) if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { glog.Error("Error indexing label pair to fingerprints batch: ", err) } if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil { glog.Error("Error indexing label name to label values batch: ", err) } batchSize = 0 nameToValues = index.LabelNameLabelValuesMapping{} pairToFPs = index.LabelPairFingerprintsMapping{} batchTimeout.Reset(indexingBatchTimeout) } var flush chan chan int loop: for { // Only process flush requests if the queue is currently empty. if len(p.indexingQueue) == 0 { flush = p.indexingFlush } else { flush = nil } select { case <-batchTimeout.C: // Only commit if we have something to commit _and_ // nothing is waiting in the queue to be picked up. That // prevents a death spiral if the LookupSet calls below // are slow for some reason. if batchSize > 0 && len(p.indexingQueue) == 0 { commitBatch() } else { batchTimeout.Reset(indexingBatchTimeout) } case r := <-flush: if batchSize > 0 { commitBatch() } r <- len(p.indexingQueue) case op, ok := <-p.indexingQueue: if !ok { if batchSize > 0 { commitBatch() } break loop } batchSize++ for ln, lv := range op.metric { lp := metric.LabelPair{Name: ln, Value: lv} baseFPs, ok := pairToFPs[lp] if !ok { var err error baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp) if err != nil { glog.Errorf("Error looking up label pair %v: %s", lp, err) continue } pairToFPs[lp] = baseFPs } baseValues, ok := nameToValues[ln] if !ok { var err error baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln) if err != nil { glog.Errorf("Error looking up label name %v: %s", ln, err) continue } nameToValues[ln] = baseValues } switch op.opType { case add: baseFPs[op.fingerprint] = struct{}{} baseValues[lv] = struct{}{} case remove: delete(baseFPs, op.fingerprint) if len(baseFPs) == 0 { delete(baseValues, lv) } default: panic("unknown op type") } } if batchSize >= indexingMaxBatchSize { commitBatch() } } } close(p.indexingStopped) } // exists returns true when the given file or directory exists. func exists(path string) (bool, error) { _, err := os.Stat(path) if err == nil { return true, nil } if os.IsNotExist(err) { return false, nil } return false, err }