From 47e3c90f9b417938c9caa72e1932f743800d47da Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 9 Mar 2016 18:56:30 +0100 Subject: [PATCH] Clean up error propagation Only return an error where callers are doing something with it except simply logging and ignoring. All the errors touched in this commit flag the storage as dirty anyway, and that fact is logged anyway. So most of what is being removed here is just log spam. As discussed earlier, the class of errors that flags the storage as dirty signals fundamental corruption, no even bubbling up a one-time warning to the user (e.g. about incomplete results) isn't helping much because _anything_ happening in the storage has to be doubted from that point on (and in fact retroactively into the past, too). Flagging the storage dirty, and alerting on it (plus marking the state in the web UI) is the only way I can see right now. As a byproduct, I cleaned up the setDirty method a bit and improved the logged errors. --- storage/local/crashrecovery.go | 8 +++- storage/local/persistence.go | 61 ++++++++++++++----------------- storage/local/persistence_test.go | 51 +++++++------------------- storage/local/storage.go | 58 ++++++----------------------- storage/local/storage_test.go | 56 +++++----------------------- 5 files changed, 71 insertions(+), 163 deletions(-) diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index f51e54e7b..17626c07c 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false, nil) + p.dirtyMtx.Lock() + // Only declare storage clean if it didn't become dirty during crash recovery. + if !p.becameDirty { + p.dirty = false + } + p.dirtyMtx.Unlock() + log.Warn("Crash recovery complete.") return nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a89307bc3..68bc612f0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool { return p.dirty } -// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was -// set to true with this method, it cannot be set to false again. (If we became -// dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) The provided error will be -// logged as a reason if dirty is true. -func (p *persistence) setDirty(dirty bool, err error) { - if dirty { - p.dirtyCounter.Inc() - } +// setDirty flags the storage as dirty in a goroutine-safe way. The provided +// error will be logged as a reason the first time the storage is flagged as dirty. +func (p *persistence) setDirty(err error) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { return } - p.dirty = dirty - if dirty { - p.becameDirty = true - log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") - } + p.dirty = true + p.becameDirty = true + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } // fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) + return nil } - return fps, nil + return fps } // labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { - return nil, err + p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) + return nil } - return lvs, nil + return lvs } // persistChunks persists a number of consecutive chunks of a series. It is the @@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() { // the metric. The caller must have locked the fingerprint. func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, -) error { +) { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) + return } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true, err) - return err + p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) } - return nil } // hasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in the // corresponding series is. This method is goroutine-safe. func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( - hasMetric bool, firstTime, lastTime model.Time, err error, + hasMetric bool, firstTime, lastTime model.Time, ) { - firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) + hasMetric = false } - return + return hasMetric, firstTime, lastTime } // updateArchivedTimeRange updates an archived time range. The caller must make @@ -1069,9 +1063,10 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) if err != nil { - p.setDirty(true, err) + p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) + return nil, err } - return metric, err + return metric, nil } // purgeArchivedMetric deletes an archived fingerprint and its corresponding @@ -1081,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) + p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) } }() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e2da77803..692f494d5 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want := model.Fingerprints{1} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(1); !archived { t.Error("want FP 1 archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } - if err != p.purgeArchivedMetric(1) { + if err := p.purgeArchivedMetric(1); err != nil { t.Fatal(err) } - if err != p.purgeArchivedMetric(3) { + if err := p.purgeArchivedMetric(3); err != nil { // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) want = nil if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) - if err != nil { - t.Fatal(err) - } + outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) want = model.Fingerprints{2} if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { + if archived, _, _ := p.hasArchivedMetric(1); archived { t.Error("want FP 1 not archived") } - if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { + if archived, _, _ := p.hasArchivedMetric(2); !archived { t.Error("want FP 2 archived") } } @@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) { for i, b := range batches { for fp, m := range b.fpToMetric { p.indexMetric(fp, m) - if err := p.archiveMetric(fp, m, 1, 2); err != nil { - t.Fatal(err) - } + p.archiveMetric(fp, m, 1, 2) indexedFpsToMetrics[fp] = m } verifyIndexedState(i, t, b, indexedFpsToMetrics, p) @@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + has, first, last := p.hasArchivedMetric(fp) if !has { t.Errorf("%d. fingerprint %v not found", i, fp) } @@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.labelValuesForLabelName(ln) - if err != nil { - t.Fatal(err) - } + outLvs := p.labelValuesForLabelName(ln) outSet := codable.LabelValueSet{} for _, lv := range outLvs { @@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.fingerprintsForLabelPair(lp) - if err != nil { - t.Fatal(err) - } + outFPs := p.fingerprintsForLabelPair(lp) outSet := codable.FingerprintSet{} for _, fp := range outFPs { diff --git a/storage/local/storage.go b/storage/local/storage.go index bc380bd6b..a24387831 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -425,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} - fps, err := s.persistence.fingerprintsForLabelPair(pair) - if err != nil { - log.Error("Error getting fingerprints for label pair: ", err) - } + fps := s.persistence.fingerprintsForLabelPair(pair) if len(fps) == 0 { return nil } @@ -547,19 +544,14 @@ func (s *memorySeriesStorage) metricForFingerprint( } if from.After(model.Earliest) || through.Before(model.Latest) { // The range lookup is relatively cheap, so let's do it first. - ok, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err) - return metric.Metric{}, false - } + ok, first, last := s.persistence.hasArchivedMetric(fp) if !ok || first.After(through) || last.Before(from) { return metric.Metric{}, false } } - met, err := s.persistence.archivedMetric(fp) - if err != nil { - log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) + met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do. + if met == nil { return metric.Metric{}, false } @@ -571,11 +563,7 @@ func (s *memorySeriesStorage) metricForFingerprint( // LabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { - lvs, err := s.persistence.labelValuesForLabelName(labelName) - if err != nil { - log.Errorf("Error getting label values for label name %q: %v", labelName, err) - } - return lvs + return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. @@ -603,7 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. if err != nil { - s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) return err } if fp != rawFP { @@ -745,11 +733,7 @@ func (s *memorySeriesStorage) getSeriesForRange( if ok { return series } - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") - return nil - } + has, first, last := s.persistence.hasArchivedMetric(fp) if !has { s.invalidPreloadRequestsCount.Inc() return nil @@ -759,7 +743,7 @@ func (s *memorySeriesStorage) getSeriesForRange( } metric, err := s.persistence.archivedMetric(fp) if err != nil { - log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + // Error already logged, storage declared dirty by archivedMetric. return nil } series, err = s.getOrCreateSeries(fp, metric) @@ -1152,12 +1136,7 @@ func (s *memorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.lastTime, - ); err != nil { - log.Errorf("Error archiving metric %v: %v", series.metric, err) - return - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1278,11 +1257,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - log.Error("Error looking up archived time range: ", err) - return - } + has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) if !has || !firstTime.Before(beforeTime) { // Oldest sample not old enough, or metric purged or unarchived in the meantime. return @@ -1295,10 +1270,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor log.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) - return - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do. s.seriesOps.WithLabelValues(archivePurge).Inc() return } @@ -1487,13 +1459,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, s.incNumChunksToPersist(-numChunksNotYetPersisted) } else { - if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log. - With("fingerprint", fp). - With("metric", m). - With("error", err). - Error("Error purging metric from archive.") - } + s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. } if m != nil { // If we know a metric now, unindex it in any case. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 01f30c28b..291b32918 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -471,11 +471,7 @@ func TestDropMetrics(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) @@ -582,11 +578,7 @@ func TestQuarantineMetric(t *testing.T) { s.maintainMemorySeries(fpToBeArchived, 0) s.fpLocker.Lock(fpToBeArchived) s.fpToSeries.del(fpToBeArchived) - if err := s.persistence.archiveMetric( - fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), - ); err != nil { - t.Error(err) - } + s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond)) s.fpLocker.Unlock(fpToBeArchived) // Corrupt the series file for m3. @@ -1144,36 +1136,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } // Drop ~half of the chunks of an archived series. s.maintainArchivedSeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("archived series purged although only half of the chunks dropped") } // Drop everything. s.maintainArchivedSeries(fp, 100000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived series not dropped") } @@ -1199,16 +1177,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), lastTime, - ); err != nil { - t.Fatal(err) - } - - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") } @@ -1220,10 +1190,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if !ok { t.Fatal("could not find series") } - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if archived { t.Fatal("archived") } @@ -1231,10 +1198,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. s.maintainMemorySeries(fp, 10000) - archived, _, _, err = s.persistence.hasArchivedMetric(fp) - if err != nil { - t.Fatal(err) - } + archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("series purged completely") }