Fix the case where a series in memory has 0 chunks, but chunks on disk.

This is actually completely normal for a freshly unarchived series.

Test added to expose.
This commit is contained in:
beorn7 2015-04-09 15:57:11 +02:00
parent 49d67fc834
commit c5fa0b90c3
3 changed files with 61 additions and 6 deletions

View File

@ -410,7 +410,7 @@ func (p *persistence) cleanUpArchiveIndexes(
return err
}
if !deleted {
glog.Errorf("Fingerprint %s to be deleted from archivedFingerprintToTimeRange not found. This should never happen.", fp)
glog.Errorf("Fingerprint %v to be deleted from archivedFingerprintToTimeRange not found. This should never happen.", fp)
}
return nil
}); err != nil {

View File

@ -861,11 +861,8 @@ func (s *memorySeriesStorage) writeMemorySeries(
return false
}
series.dropChunks(beforeTime)
if len(series.chunkDescs) == 0 { // All chunks dropped from memory series.
if !allDroppedFromPersistence {
glog.Errorf("All chunks dropped from memory but chunks left in persistence for fingerprint %v, series %v.", fp, series)
s.persistence.setDirty(true)
}
if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good.
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()

View File

@ -579,6 +579,64 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if archived {
t.Fatal("archived series not dropped")
}
// Recreate series.
for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing()
series, ok = ms.fpToSeries.get(fp)
if !ok {
t.Fatal("could not find series")
}
// Persist head chunk so we can safely archive.
series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics.
ms.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil {
t.Fatal(err)
}
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived {
t.Fatal("not archived")
}
// Unarchive metrics.
ms.getOrCreateSeries(fp, clientmodel.Metric{})
series, ok = ms.fpToSeries.get(fp)
if !ok {
t.Fatal("could not find series")
}
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if archived {
t.Fatal("archived")
}
fmt.Println(series.headChunkClosed, len(series.chunkDescs))
// This will archive again, but must not drop it completely, despite the
// memorySeries being empty.
ms.maintainMemorySeries(fp, 1000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived {
t.Fatal("series purged completely")
}
}
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {