diff --git a/main.go b/main.go index 71adcd661..7ea0347cf 100644 --- a/main.go +++ b/main.go @@ -54,10 +54,6 @@ var ( diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") - deleteInterval = flag.Duration("delete.interval", 11*time.Hour, "The amount of time between deletion of old values.") - - deleteAge = flag.Duration("delete.ageMaximum", 15*24*time.Hour, "The relative maximum age for values before they are deleted.") - memoryEvictionInterval = flag.Duration("storage.memory.evictionInterval", 15*time.Minute, "The period at which old data is evicted from memory.") memoryRetentionPeriod = flag.Duration("storage.memory.retentionPeriod", time.Hour, "The period of time to retain in memory during evictions.") diff --git a/storage/local/series.go b/storage/local/series.go index 243dfbabb..4ee1045b9 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -158,20 +158,21 @@ func (s *memorySeries) add(v *metric.SamplePair, persistQueue chan *persistReque } } -func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) { +func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) { s.mtx.Lock() defer s.mtx.Unlock() // For now, always drop the entire range from oldest to t. for _, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { - break + return false } if cd.chunk == nil { continue } cd.evictOnUnpin() } + return true } // purgeOlderThan returns true if all chunks have been purged. @@ -347,6 +348,14 @@ func (s *memorySeries) values() metric.Values { return values } +func (s *memorySeries) firstTime() clientmodel.Timestamp { + return s.chunkDescs[0].firstTime() +} + +func (s *memorySeries) lastTime() clientmodel.Timestamp { + return s.head().lastTime() +} + func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { it.mtx.Lock() defer it.mtx.Unlock() diff --git a/storage/local/storage.go b/storage/local/storage.go index 093b64275..d855e28bc 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -170,8 +170,19 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { s.mtx.RLock() defer s.mtx.RUnlock() - for _, series := range s.fingerprintToSeries { - series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) + for fp, series := range s.fingerprintToSeries { + if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { + if err := s.persistence.ArchiveMetric( + fp, series.metric, series.firstTime(), series.lastTime(), + ); err != nil { + glog.Errorf("Error archiving metric %v: %v", series.metric, err) + } + delete(s.fingerprintToSeries, fp) + s.persistQueue <- &persistRequest{ + fingerprint: fp, + chunkDesc: series.head(), + } + } } } @@ -436,8 +447,13 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint series, ok := s.fingerprintToSeries[fp] if ok { - // TODO: Does this have to be a copy? Ask Julius! - return series.metric + // Copy required here because caller might mutate the returned + // metric. + m := make(clientmodel.Metric, len(series.metric)) + for ln, lv := range series.metric { + m[ln] = lv + } + return m } metric, err := s.persistence.GetArchivedMetric(fp) if err != nil {