From 2363a90adc48b6408315a1f5a9c850fa33fe997f Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 6 Feb 2017 17:39:59 +0100 Subject: [PATCH] storage: Do not throw away fully persisted memory series in checkpointing --- storage/local/persistence.go | 70 +++++++++++++++++++++++-------- storage/local/persistence_test.go | 31 ++++++++++++-- 2 files changed, 81 insertions(+), 20 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a76d5de8e..4a3150c7b 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -670,12 +670,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap defer fpLocker.Unlock(m.fp) chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark - if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 { - // This series was completely purged or archived in the meantime or has - // no chunks that need persisting. Ignore. + if len(m.series.chunkDescs) == 0 { + // This series was completely purged or archived + // in the meantime. Ignore. return } realNumberOfSeries++ + + // Sanity checks. + if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { + panic("encountered unknown chunk desc offset in combination with positive persist watermark") + } + + // These are the values to save in the normal case. + var ( + // persistWatermark is zero as we only checkpoint non-persisted chunks. + persistWatermark int64 + // chunkDescsOffset is shifted by the original persistWatermark for the same reason. + chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark) + numChunkDescs = int64(chunksToPersist) + ) + // However, in the special case of a series being fully + // persisted but still in memory (i.e. not archived), we + // need to save a "placeholder", for which we use just + // the chunk desc of the last chunk. Values have to be + // adjusted accordingly. (The reason for doing it in + // this weird way is to keep the checkpoint format + // compatible with older versions.) + if chunksToPersist == 0 { + persistWatermark = 1 + chunkDescsOffset-- // Save one chunk desc after all. + numChunkDescs = 1 + } + // seriesFlags left empty in v2. if err = w.WriteByte(0); err != nil { return @@ -691,9 +718,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = w.Write(buf); err != nil { return } - // persistWatermark. We only checkpoint chunks that need persisting, so - // this is always 0. - if _, err = codable.EncodeVarint(w, 0); err != nil { + if _, err = codable.EncodeVarint(w, persistWatermark); err != nil { return } if m.series.modTime.IsZero() { @@ -705,28 +730,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } - // chunkDescsOffset. - if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { - panic("encountered unknown chunk desc offset in combination with positive persist watermark") - } - if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil { + if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil { return } if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { return } - // Number of chunkDescs. - if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil { + if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil { return } - for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { - if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + if chunksToPersist == 0 { + // Save the one placeholder chunk desc for a fully persisted series. + chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1] + if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { return } - if err = chunkDesc.C.Marshal(w); err != nil { + lt, err := chunkDesc.LastTime() + if err != nil { return } - p.checkpointChunksWritten.Observe(float64(chunksToPersist)) + if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { + return + } + } else { + // Save (only) the non-persisted chunks. + for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { + if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + return + } + if err = chunkDesc.C.Marshal(w); err != nil { + return + } + p.checkpointChunksWritten.Observe(float64(chunksToPersist)) + } } // Series is checkpointed now, so declare it clean. In case the entire // checkpoint fails later on, this is fine, as the storage's series diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e88b17592..f881dd4e6 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -484,7 +484,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true - s3.persistWatermark = 1 + // Create another chunk in s3. + s3.add(model.SamplePair{Timestamp: 3, Value: 1.4}) + s3.headChunkClosed = true + s3.persistWatermark = 2 for i := 0; i < 10000; i++ { s4.add(model.SamplePair{ Timestamp: model.Time(i), @@ -512,8 +515,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if err != nil { t.Fatal(err) } - if loadedSM.length() != 3 { - t.Errorf("want 3 series in map, got %d", loadedSM.length()) + if loadedSM.length() != 4 { + t.Errorf("want 4 series in map, got %d", loadedSM.length()) } if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS1.metric, m1) { @@ -537,6 +540,28 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin } else { t.Errorf("couldn't find %v in loaded map", m1) } + if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok { + if !reflect.DeepEqual(loadedS3.metric, m3) { + t.Errorf("want metric %v, got %v", m3, loadedS3.metric) + } + if loadedS3.head().C != nil { + t.Error("head chunk not evicted") + } + if loadedS3.chunkDescsOffset != 1 { + t.Errorf("want chunkDescsOffset 1, got %d", loadedS3.chunkDescsOffset) + } + if !loadedS3.headChunkClosed { + t.Error("headChunkClosed is false") + } + if loadedS3.head().ChunkFirstTime != 3 { + t.Errorf("want ChunkFirstTime in head chunk to be 3, got %d", loadedS3.head().ChunkFirstTime) + } + if loadedS3.head().ChunkLastTime != 3 { + t.Errorf("want ChunkLastTime in head chunk to be 3, got %d", loadedS3.head().ChunkLastTime) + } + } else { + t.Errorf("couldn't find %v in loaded map", m3) + } if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS4.metric, m4) { t.Errorf("want metric %v, got %v", m4, loadedS4.metric)