diff --git a/storage/local/persistence.go b/storage/local/persistence.go index acccc8874..886215d2c 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -369,13 +369,10 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa // the (zero-based) index of the first persisted chunk within the series // file. In case of an error, the returned index is -1 (to avoid the // misconception that the chunk was written at position 0). +// +// Returning an error signals problems with the series file. In this case, the +// caller should quarantine the series. func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { - defer func() { - if err != nil { - p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err)) - } - }() - f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err @@ -915,6 +912,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in // deleted (in which case the returned timestamp will be 0 and must be ignored). // It is the caller's responsibility to make sure nothing is persisted or loaded // for the same fingerprint concurrently. +// +// Returning an error signals problems with the series file. In this case, the +// caller should quarantine the series. func (p *persistence) dropAndPersistChunks( fp model.Fingerprint, beforeTime model.Time, chunks []chunk, ) ( @@ -927,12 +927,6 @@ func (p *persistence) dropAndPersistChunks( // Style note: With the many return values, it was decided to use naked // returns in this method. They make the method more readable, but // please handle with care! - defer func() { - if err != nil { - p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err)) - } - }() - if len(chunks) > 0 { // We have chunks to persist. First check if those are already // too old. If that's the case, the chunks in the series file diff --git a/storage/local/storage.go b/storage/local/storage.go index 2d99c073f..201c2ba8b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,6 +16,7 @@ package local import ( "container/list" + "errors" "fmt" "math" "sync" @@ -1108,8 +1109,20 @@ func (s *memorySeriesStorage) maintainMemorySeries( func (s *memorySeriesStorage) writeMemorySeries( fp model.Fingerprint, series *memorySeries, beforeTime model.Time, ) bool { - cds := series.chunksToPersist() + var ( + persistErr error + cds = series.chunksToPersist() + ) + defer func() { + if persistErr != nil { + s.quarantineSeries(fp, series.metric, persistErr) + s.persistErrors.Inc() + } + // The following is done even in case of an error to ensure + // correct counter bookkeeping and to not pin chunks in memory + // that belong to a series that is scheduled for quarantine + // anyway. for _, cd := range cds { cd.unpin(s.evictRequests) } @@ -1130,9 +1143,9 @@ func (s *memorySeriesStorage) writeMemorySeries( if len(cds) == 0 { return false } - offset, err := s.persistence.persistChunks(fp, chunks) - if err != nil { - s.persistErrors.Inc() + var offset int + offset, persistErr = s.persistence.persistChunks(fp, chunks) + if persistErr != nil { return false } if series.chunkDescsOffset == -1 { @@ -1144,14 +1157,12 @@ func (s *memorySeriesStorage) writeMemorySeries( return false } - newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, err := + newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, persistErr := s.persistence.dropAndPersistChunks(fp, beforeTime, chunks) - if err != nil { - s.persistErrors.Inc() + if persistErr != nil { return false } - if err := series.dropChunks(beforeTime); err != nil { - s.persistErrors.Inc() + if persistErr = series.dropChunks(beforeTime); persistErr != nil { return false } if len(series.chunkDescs) == 0 && allDroppedFromPersistence { @@ -1168,8 +1179,8 @@ func (s *memorySeriesStorage) writeMemorySeries( } else { series.chunkDescsOffset -= numDroppedFromPersistence if series.chunkDescsOffset < 0 { - s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) - series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. + persistErr = errors.New("dropped more chunks from persistence than from memory") + series.chunkDescsOffset = -1 } } return false