Merge pull request #2414 from prometheus/beorn7/storage

storage: Fix chunkIndexToStartSeek calculation
This commit is contained in:
Björn Rabenstein 2017-02-10 12:43:00 +01:00 committed by GitHub
commit b4450f02bc
2 changed files with 78 additions and 2 deletions

View File

@ -932,8 +932,9 @@ func (p *persistence) dropAndPersistChunks(
chunkIndexToStartSeek := 0
if p.minShrinkRatio < 1 {
chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio))
} else {
chunkIndexToStartSeek = totalChunks - 1
}
if chunkIndexToStartSeek >= chunksInFile {
chunkIndexToStartSeek = chunksInFile - 1
}
numDropped = chunkIndexToStartSeek

View File

@ -462,6 +462,81 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
t.Error("all chunks dropped")
}
}
// Drop all the chunks again.
for fp := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 100, nil)
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if err != nil {
t.Fatal(err)
}
if offset != 0 {
t.Errorf("want offset 0, got %d", offset)
}
if numDropped != 7 {
t.Errorf("want 7 dropped chunks, got %v", numDropped)
}
if !allDropped {
t.Error("not all chunks dropped")
}
}
// Re-add first two of the chunks again.
for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err :=
p.dropAndPersistChunks(fp, model.Earliest, chunks[:2])
if err != nil {
t.Fatal(err)
}
if got, want := firstTimeNotDropped, model.Time(0); got != want {
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
}
if got, want := offset, 0; got != want {
t.Errorf("Want offset %v, got %v.", got, want)
}
if got, want := numDropped, 0; got != want {
t.Errorf("Want numDropped %v, got %v.", got, want)
}
if allDropped {
t.Error("All dropped.")
}
}
// Try to drop the first of the chunks while adding eight more. The drop
// should not happen because of the shrink ratio. Also, this time the
// minimum cut-off point is within the added chunks and not in the file
// anymore.
for fp, chunks := range fpToChunks {
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[2:])
if err != nil {
t.Fatal(err)
}
if offset != 2 {
t.Errorf("want offset 2, got %d", offset)
}
if firstTime != 0 {
t.Errorf("want first time 0, got %d", firstTime)
}
if numDropped != 0 {
t.Errorf("want 0 dropped chunk, got %v", numDropped)
}
if allDropped {
t.Error("all chunks dropped")
}
wantChunks := chunks
indexes := make([]int, len(wantChunks))
for i := range indexes {
indexes[i] = i
}
gotChunks, err := p.loadChunks(fp, indexes, 0)
if err != nil {
t.Fatal(err)
}
for i, wantChunk := range wantChunks {
if !chunksEqual(wantChunk, gotChunks[i]) {
t.Errorf("%d. Chunks not equal.", i)
}
}
}
}
func TestPersistLoadDropChunksType0(t *testing.T) {