From 9410f15a732ec4899e9876db32591c1690eb4a60 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 30 Jun 2014 13:19:07 +0200 Subject: [PATCH] Only evict memory series after they are on disk. This fixes the problem where samples become temporarily unavailable for queries while they are being flushed to disk. Although the entire flushing code could use some major refactoring, I'm explicitly trying to do the minimal change to fix the problem since there's a whole new storage implementation in the pipeline. Change-Id: I0f5393a30b88654c73567456aeaea62f8b3756d9 --- storage/metric/tiered/memory.go | 39 ++++++++++++++++++++-------- storage/metric/tiered/memory_test.go | 4 ++- storage/metric/tiered/tiered.go | 1 + web/api/api.go | 1 + 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/storage/metric/tiered/memory.go b/storage/metric/tiered/memory.go index 9b636bb88..9341679ef 100644 --- a/storage/metric/tiered/memory.go +++ b/storage/metric/tiered/memory.go @@ -33,7 +33,8 @@ type stream interface { add(metric.Values) clone() metric.Values - expunge(age clientmodel.Timestamp) metric.Values + getOlderThan(age clientmodel.Timestamp) metric.Values + evictOlderThan(age clientmodel.Timestamp) size() int clear() @@ -89,7 +90,19 @@ func (s *arrayStream) clone() metric.Values { return clone } -func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { +func (s *arrayStream) getOlderThan(t clientmodel.Timestamp) metric.Values { + s.RLock() + defer s.RUnlock() + + finder := func(i int) bool { + return s.values[i].Timestamp.After(t) + } + + i := sort.Search(len(s.values), finder) + return s.values[:i] +} + +func (s *arrayStream) evictOlderThan(t clientmodel.Timestamp) { s.Lock() defer s.Unlock() @@ -98,10 +111,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { } i := sort.Search(len(s.values), finder) - expunged := s.values[:i] s.values = s.values[i:] - - return expunged } func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values { @@ -282,11 +292,9 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *client } func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) { - emptySeries := []clientmodel.Fingerprint{} - s.RLock() - for fingerprint, stream := range s.fingerprintToSeries { - toArchive := stream.expunge(flushOlderThan) + for _, stream := range s.fingerprintToSeries { + toArchive := stream.getOlderThan(flushOlderThan) queued := make(clientmodel.Samples, 0, len(toArchive)) // NOTE: This duplication will go away soon. for _, value := range toArchive { @@ -303,20 +311,29 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue if len(queued) > 0 { queue <- queued } + } + s.RUnlock() +} +func (s *memorySeriesStorage) Evict(flushOlderThan clientmodel.Timestamp) { + emptySeries := []clientmodel.Fingerprint{} + + s.RLock() + for fingerprint, stream := range s.fingerprintToSeries { + stream.evictOlderThan(flushOlderThan) if stream.size() == 0 { emptySeries = append(emptySeries, fingerprint) } } s.RUnlock() + s.Lock() for _, fingerprint := range emptySeries { if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 { - s.Lock() s.dropSeries(&fingerprint) - s.Unlock() } } + s.Unlock() } // Drop a label value from the label names to label values index. diff --git a/storage/metric/tiered/memory_test.go b/storage/metric/tiered/memory_test.go index b3b52c8fe..a6a53bb02 100644 --- a/storage/metric/tiered/memory_test.go +++ b/storage/metric/tiered/memory_test.go @@ -171,7 +171,8 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { } toDisk := make(chan clientmodel.Samples, 2) - s.Flush(clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)), toDisk) + flushOlderThan := clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)) + s.Flush(flushOlderThan, toDisk) if len(toDisk) != 1 { t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk)) } @@ -179,6 +180,7 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { if len(diskSamples) != 1 { t.Fatalf("Got %d disk samples, expected 1", len(diskSamples)) } + s.Evict(flushOlderThan) fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common)) if err != nil { diff --git a/storage/metric/tiered/tiered.go b/storage/metric/tiered/tiered.go index e0bd6031c..cc02e8480 100644 --- a/storage/metric/tiered/tiered.go +++ b/storage/metric/tiered/tiered.go @@ -364,6 +364,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) { glog.Infof("Writing %d samples...", len(samples)) t.DiskStorage.AppendSamples(samples) } + t.memoryArena.Evict(flushOlderThan) glog.Info("Done flushing.") } diff --git a/web/api/api.go b/web/api/api.go index 3d749380b..4c42ba6ec 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -15,6 +15,7 @@ package api import ( "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config"