Clean up error propagation

Only return an error where callers are doing something with it except
simply logging and ignoring.

All the errors touched in this commit flag the storage as dirty
anyway, and that fact is logged anyway. So most of what is being
removed here is just log spam.

As discussed earlier, the class of errors that flags the storage as
dirty signals fundamental corruption, no even bubbling up a one-time
warning to the user (e.g. about incomplete results) isn't helping much
because _anything_ happening in the storage has to be doubted from
that point on (and in fact retroactively into the past, too). Flagging
the storage dirty, and alerting on it (plus marking the state in the
web UI) is the only way I can see right now.

As a byproduct, I cleaned up the setDirty method a bit and improved
the logged errors.
This commit is contained in:
beorn7 2016-03-09 18:56:30 +01:00
parent 99854a84d7
commit 47e3c90f9b
5 changed files with 71 additions and 163 deletions

View File

@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
}
}
p.setDirty(false, nil)
p.dirtyMtx.Lock()
// Only declare storage clean if it didn't become dirty during crash recovery.
if !p.becameDirty {
p.dirty = false
}
p.dirtyMtx.Unlock()
log.Warn("Crash recovery complete.")
return nil
}

View File

@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool {
return p.dirty
}
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
// set to true with this method, it cannot be set to false again. (If we became
// dirty during our runtime, there is no way back. If we were dirty from the
// start, a clean-up might make us clean again.) The provided error will be
// logged as a reason if dirty is true.
func (p *persistence) setDirty(dirty bool, err error) {
if dirty {
// setDirty flags the storage as dirty in a goroutine-safe way. The provided
// error will be logged as a reason the first time the storage is flagged as dirty.
func (p *persistence) setDirty(err error) {
p.dirtyCounter.Inc()
}
p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock()
if p.becameDirty {
return
}
p.dirty = dirty
if dirty {
p.dirty = true
p.becameDirty = true
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}
}
// fingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) {
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil {
return nil, err
p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
return nil
}
return fps, nil
return fps
}
// labelValuesForLabelName returns the label values for the given label
// name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil {
return nil, err
p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
return nil
}
return lvs, nil
return lvs
}
// persistChunks persists a number of consecutive chunks of a series. It is the
@ -1008,29 +1003,28 @@ func (p *persistence) waitForIndexing() {
// the metric. The caller must have locked the fingerprint.
func (p *persistence) archiveMetric(
fp model.Fingerprint, m model.Metric, first, last model.Time,
) error {
) {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
p.setDirty(true, err)
return err
p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err))
return
}
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
p.setDirty(true, err)
return err
p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err))
}
return nil
}
// hasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in the
// corresponding series is. This method is goroutine-safe.
func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
hasMetric bool, firstTime, lastTime model.Time, err error,
hasMetric bool, firstTime, lastTime model.Time,
) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil {
p.setDirty(true, err)
p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err))
hasMetric = false
}
return
return hasMetric, firstTime, lastTime
}
// updateArchivedTimeRange updates an archived time range. The caller must make
@ -1069,9 +1063,10 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model
func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
if err != nil {
p.setDirty(true, err)
p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err))
return nil, err
}
return metric, err
return metric, nil
}
// purgeArchivedMetric deletes an archived fingerprint and its corresponding
@ -1081,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
defer func() {
if err != nil {
p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err))
p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err))
}
}()

View File

@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
p.indexMetric(2, m2)
p.waitForIndexing()
outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
want := model.Fingerprints{1}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived {
if archived, _, _ := p.hasArchivedMetric(1); !archived {
t.Error("want FP 1 archived")
}
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
if archived, _, _ := p.hasArchivedMetric(2); !archived {
t.Error("want FP 2 archived")
}
if err != p.purgeArchivedMetric(1) {
if err := p.purgeArchivedMetric(1); err != nil {
t.Fatal(err)
}
if err != p.purgeArchivedMetric(3) {
if err := p.purgeArchivedMetric(3); err != nil {
// Purging something that has not beet archived is not an error.
t.Fatal(err)
}
p.waitForIndexing()
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
want = nil
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs)
}
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived {
if archived, _, _ := p.hasArchivedMetric(1); archived {
t.Error("want FP 1 not archived")
}
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived {
if archived, _, _ := p.hasArchivedMetric(2); !archived {
t.Error("want FP 2 archived")
}
}
@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) {
for i, b := range batches {
for fp, m := range b.fpToMetric {
p.indexMetric(fp, m)
if err := p.archiveMetric(fp, m, 1, 2); err != nil {
t.Fatal(err)
}
p.archiveMetric(fp, m, 1, 2)
indexedFpsToMetrics[fp] = m
}
verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
}
// Check that archived metrics are in membership index.
has, first, last, err := p.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
has, first, last := p.hasArchivedMetric(fp)
if !has {
t.Errorf("%d. fingerprint %v not found", i, fp)
}
@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs {
outLvs, err := p.labelValuesForLabelName(ln)
if err != nil {
t.Fatal(err)
}
outLvs := p.labelValuesForLabelName(ln)
outSet := codable.LabelValueSet{}
for _, lv := range outLvs {
@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label pair -> fingerprints mappings.
for lp, fps := range b.expectedLpToFps {
outFPs, err := p.fingerprintsForLabelPair(lp)
if err != nil {
t.Fatal(err)
}
outFPs := p.fingerprintsForLabelPair(lp)
outSet := codable.FingerprintSet{}
for _, fp := range outFPs {

View File

@ -425,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
var result map[model.Fingerprint]struct{}
for _, pair := range pairs {
intersection := map[model.Fingerprint]struct{}{}
fps, err := s.persistence.fingerprintsForLabelPair(pair)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
fps := s.persistence.fingerprintsForLabelPair(pair)
if len(fps) == 0 {
return nil
}
@ -547,19 +544,14 @@ func (s *memorySeriesStorage) metricForFingerprint(
}
if from.After(model.Earliest) || through.Before(model.Latest) {
// The range lookup is relatively cheap, so let's do it first.
ok, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.Errorf("Error retrieving archived time range for fingerprint %v: %v", fp, err)
return metric.Metric{}, false
}
ok, first, last := s.persistence.hasArchivedMetric(fp)
if !ok || first.After(through) || last.Before(from) {
return metric.Metric{}, false
}
}
met, err := s.persistence.archivedMetric(fp)
if err != nil {
log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
met, _ := s.persistence.archivedMetric(fp) // Ignoring error, there is nothing we can do.
if met == nil {
return metric.Metric{}, false
}
@ -571,11 +563,7 @@ func (s *memorySeriesStorage) metricForFingerprint(
// LabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
lvs, err := s.persistence.labelValuesForLabelName(labelName)
if err != nil {
log.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
return s.persistence.labelValuesForLabelName(labelName)
}
// DropMetric implements Storage.
@ -603,7 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below.
if err != nil {
s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
return err
}
if fp != rawFP {
@ -745,11 +733,7 @@ func (s *memorySeriesStorage) getSeriesForRange(
if ok {
return series
}
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
has, first, last := s.persistence.hasArchivedMetric(fp)
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil
@ -759,7 +743,7 @@ func (s *memorySeriesStorage) getSeriesForRange(
}
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
// Error already logged, storage declared dirty by archivedMetric.
return nil
}
series, err = s.getOrCreateSeries(fp, metric)
@ -1152,12 +1136,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
s.fpToSeries.del(fp)
s.numSeries.Dec()
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.lastTime,
); err != nil {
log.Errorf("Error archiving metric %v: %v", series.metric, err)
return
}
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
if oldWatermark < int64(series.lastTime) {
@ -1278,11 +1257,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.Error("Error looking up archived time range: ", err)
return
}
has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp)
if !has || !firstTime.Before(beforeTime) {
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
return
@ -1295,10 +1270,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
log.Error("Error dropping persisted chunks: ", err)
}
if allDropped {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
return
}
s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do.
s.seriesOps.WithLabelValues(archivePurge).Inc()
return
}
@ -1487,13 +1459,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
s.incNumChunksToPersist(-numChunksNotYetPersisted)
} else {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error purging metric from archive.")
}
s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do.
}
if m != nil {
// If we know a metric now, unindex it in any case.

View File

@ -471,11 +471,7 @@ func TestDropMetrics(t *testing.T) {
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
s.fpLocker.Unlock(fpToBeArchived)
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
@ -582,11 +578,7 @@ func TestQuarantineMetric(t *testing.T) {
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
s.fpLocker.Unlock(fpToBeArchived)
// Corrupt the series file for m3.
@ -1144,36 +1136,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
archived, _, _, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
archived, _, _ := s.persistence.hasArchivedMetric(fp)
if !archived {
t.Fatal("not archived")
}
// Drop ~half of the chunks of an archived series.
s.maintainArchivedSeries(fp, 10000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
archived, _, _ = s.persistence.hasArchivedMetric(fp)
if !archived {
t.Fatal("archived series purged although only half of the chunks dropped")
}
// Drop everything.
s.maintainArchivedSeries(fp, 100000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
archived, _, _ = s.persistence.hasArchivedMetric(fp)
if archived {
t.Fatal("archived series not dropped")
}
@ -1199,16 +1177,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), lastTime,
); err != nil {
t.Fatal(err)
}
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
archived, _, _ = s.persistence.hasArchivedMetric(fp)
if !archived {
t.Fatal("not archived")
}
@ -1220,10 +1190,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if !ok {
t.Fatal("could not find series")
}
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
archived, _, _ = s.persistence.hasArchivedMetric(fp)
if archived {
t.Fatal("archived")
}
@ -1231,10 +1198,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// This will archive again, but must not drop it completely, despite the
// memorySeries being empty.
s.maintainMemorySeries(fp, 10000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
archived, _, _ = s.persistence.hasArchivedMetric(fp)
if !archived {
t.Fatal("series purged completely")
}