compact: Verify for chunks outside of compacted time range. Added unit test for populateBlocs. (#349)
* compact: Verify for chunks outside of compacted time range. Unit test for populateBlocs. Signed-off-by: Bartek Plotka <bwplotka@gmail.com> Co-authored-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
parent
d7492b9350
commit
047b1b1357
20
compact.go
20
compact.go
|
@ -524,6 +524,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
|
|||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||
// of the provided blocks. It returns meta information for the new block.
|
||||
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
|
||||
if len(blocks) == 0 {
|
||||
return errors.New("cannot populate block from no readers")
|
||||
}
|
||||
|
||||
var (
|
||||
set ChunkSeriesSet
|
||||
allSymbols = make(map[string]struct{}, 1<<16)
|
||||
|
@ -595,13 +599,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
continue
|
||||
}
|
||||
|
||||
if len(dranges) > 0 {
|
||||
// Re-encode the chunk to not have deleted values.
|
||||
for i, chk := range chks {
|
||||
for i, chk := range chks {
|
||||
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
|
||||
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
|
||||
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
|
||||
}
|
||||
|
||||
if len(dranges) > 0 {
|
||||
// Re-encode the chunk to not have deleted values.
|
||||
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
|
||||
continue
|
||||
}
|
||||
|
||||
newChunk := chunkenc.NewXORChunk()
|
||||
app, err := newChunk.Appender()
|
||||
if err != nil {
|
||||
|
@ -617,6 +625,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
chks[i].Chunk = newChunk
|
||||
}
|
||||
}
|
||||
|
||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||
return errors.Wrap(err, "write chunks")
|
||||
}
|
||||
|
@ -791,7 +800,6 @@ func (c *compactionMerger) Next() bool {
|
|||
var chks []chunks.Meta
|
||||
|
||||
d := c.compare()
|
||||
// Both sets contain the current series. Chain them into a single one.
|
||||
if d > 0 {
|
||||
lset, chks, c.intervals = c.b.At()
|
||||
c.l = append(c.l[:0], lset...)
|
||||
|
@ -805,8 +813,10 @@ func (c *compactionMerger) Next() bool {
|
|||
|
||||
c.aok = c.a.Next()
|
||||
} else {
|
||||
// Both sets contain the current series. Chain them into a single one.
|
||||
l, ca, ra := c.a.At()
|
||||
_, cb, rb := c.b.At()
|
||||
|
||||
for _, r := range rb {
|
||||
ra = ra.add(r)
|
||||
}
|
||||
|
|
281
compact_test.go
281
compact_test.go
|
@ -15,12 +15,14 @@ package tsdb
|
|||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/testutil"
|
||||
)
|
||||
|
||||
|
@ -401,3 +403,282 @@ type erringBReader struct{}
|
|||
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
|
||||
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
|
||||
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
|
||||
|
||||
type nopChunkWriter struct{}
|
||||
|
||||
func (nopChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return nil }
|
||||
func (nopChunkWriter) Close() error { return nil }
|
||||
|
||||
func TestCompaction_populateBlock(t *testing.T) {
|
||||
var populateBlocksCases = []struct {
|
||||
title string
|
||||
inputSeriesSamples [][]seriesSamples
|
||||
compactMinTime int64
|
||||
compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64.
|
||||
|
||||
expSeriesSamples []seriesSamples
|
||||
expErr error
|
||||
}{
|
||||
{
|
||||
title: "Populate block from empty input should return error.",
|
||||
inputSeriesSamples: [][]seriesSamples{},
|
||||
expErr: errors.New("cannot populate block from no readers"),
|
||||
},
|
||||
{
|
||||
// Populate from single block without chunks. We expect these kind of series being ignored.
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "Populate from single block. We expect the same samples at the output.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
expSeriesSamples: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "Populate from two blocks.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
|
||||
},
|
||||
{
|
||||
// no-chunk series should be dropped.
|
||||
lset: map[string]string{"a": "empty"},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 21}, {t: 30}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 40}, {t: 45}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
expSeriesSamples: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "Populate from two blocks showing that order is maintained.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 21}, {t: 30}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 40}, {t: 45}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
expSeriesSamples: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 21}, {t: 30}}, {{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "c"},
|
||||
chunks: [][]sample{{{t: 40}, {t: 45}}, {{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "Populate from two blocks showing that order or series is sorted.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "4"},
|
||||
chunks: [][]sample{{{t: 5}, {t: 7}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "3"},
|
||||
chunks: [][]sample{{{t: 5}, {t: 6}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "same"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 4}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "2"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 3}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "1"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "same"},
|
||||
chunks: [][]sample{{{t: 5}, {t: 8}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
expSeriesSamples: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "1"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "2"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 3}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "3"},
|
||||
chunks: [][]sample{{{t: 5}, {t: 6}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "4"},
|
||||
chunks: [][]sample{{{t: 5}, {t: 7}}},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "same"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 4}}, {{t: 5}, {t: 8}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// This should not happened because head block is making sure the chunks are not crossing block boundaries.
|
||||
title: "Populate from single block containing chunk outside of compact meta time range.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 30}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
compactMinTime: 0,
|
||||
compactMaxTime: 20,
|
||||
expErr: errors.New("found chunk with minTime: 10 maxTime: 30 outside of compacted minTime: 0 maxTime: 20"),
|
||||
},
|
||||
{
|
||||
// Introduced by https://github.com/prometheus/tsdb/issues/347.
|
||||
title: "Populate from single block containing extra chunk",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "issue347"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
compactMinTime: 0,
|
||||
compactMaxTime: 10,
|
||||
expErr: errors.New("found chunk with minTime: 10 maxTime: 20 outside of compacted minTime: 0 maxTime: 10"),
|
||||
},
|
||||
{
|
||||
// No special deduplication expected.
|
||||
title: "Populate from two blocks containing duplicated chunk.",
|
||||
inputSeriesSamples: [][]seriesSamples{
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 10}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
expSeriesSamples: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "b"},
|
||||
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}, {{t: 10}, {t: 20}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range populateBlocksCases {
|
||||
if ok := t.Run(tc.title, func(t *testing.T) {
|
||||
blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
|
||||
for _, b := range tc.inputSeriesSamples {
|
||||
ir, cr := createIdxChkReaders(b)
|
||||
blocks = append(blocks, &mockBReader{ir: ir, cr: cr})
|
||||
}
|
||||
|
||||
c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
meta := &BlockMeta{
|
||||
MinTime: tc.compactMinTime,
|
||||
MaxTime: tc.compactMaxTime,
|
||||
}
|
||||
if meta.MaxTime == 0 {
|
||||
meta.MaxTime = math.MaxInt64
|
||||
}
|
||||
|
||||
iw := &mockIndexWriter{}
|
||||
err = c.populateBlock(blocks, meta, iw, nopChunkWriter{})
|
||||
if tc.expErr != nil {
|
||||
testutil.NotOk(t, err)
|
||||
testutil.Equals(t, tc.expErr.Error(), err.Error())
|
||||
return
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Equals(t, tc.expSeriesSamples, iw.series)
|
||||
|
||||
// Check if stats are calculated properly.
|
||||
s := BlockStats{
|
||||
NumSeries: uint64(len(tc.expSeriesSamples)),
|
||||
}
|
||||
for _, series := range tc.expSeriesSamples {
|
||||
s.NumChunks += uint64(len(series.chunks))
|
||||
for _, chk := range series.chunks {
|
||||
s.NumSamples += uint64(len(chk))
|
||||
}
|
||||
}
|
||||
testutil.Equals(t, s, meta.Stats)
|
||||
}); !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/index"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
type mockIndexWriter struct {
|
||||
series []seriesSamples
|
||||
}
|
||||
|
||||
func (mockIndexWriter) AddSymbols(sym map[string]struct{}) error { return nil }
|
||||
func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
|
||||
i := -1
|
||||
for j, s := range m.series {
|
||||
if !labels.FromMap(s.lset).Equals(l) {
|
||||
continue
|
||||
}
|
||||
i = j
|
||||
break
|
||||
}
|
||||
if i == -1 {
|
||||
m.series = append(m.series, seriesSamples{
|
||||
lset: l.Map(),
|
||||
})
|
||||
i = len(m.series) - 1
|
||||
}
|
||||
|
||||
for _, chk := range chunks {
|
||||
samples := make([]sample, 0, chk.Chunk.NumSamples())
|
||||
|
||||
iter := chk.Chunk.Iterator()
|
||||
for iter.Next() {
|
||||
s := sample{}
|
||||
s.t, s.v = iter.At()
|
||||
|
||||
samples = append(samples, s)
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.series[i].chunks = append(m.series[i].chunks, samples)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
|
||||
func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil }
|
||||
func (mockIndexWriter) Close() error { return nil }
|
||||
|
||||
type mockBReader struct {
|
||||
ir IndexReader
|
||||
cr ChunkReader
|
||||
}
|
||||
|
||||
func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil }
|
||||
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
|
||||
func (r *mockBReader) Tombstones() (TombstoneReader, error) { return NewMemTombstones(), nil }
|
|
@ -245,12 +245,14 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) {
|
|||
return r, it.Err()
|
||||
}
|
||||
|
||||
// Index: labels -> postings -> chunkMetas -> chunkRef
|
||||
// ChunkReader: ref -> vals
|
||||
func createIdxChkReaders(tc []struct {
|
||||
type seriesSamples struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}) (IndexReader, ChunkReader) {
|
||||
}
|
||||
|
||||
// Index: labels -> postings -> chunkMetas -> chunkRef
|
||||
// ChunkReader: ref -> vals
|
||||
func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
|
||||
sort.Slice(tc, func(i, j int) bool {
|
||||
return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0
|
||||
})
|
||||
|
@ -322,17 +324,11 @@ func TestBlockQuerier(t *testing.T) {
|
|||
}
|
||||
|
||||
cases := struct {
|
||||
data []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}
|
||||
data []seriesSamples
|
||||
|
||||
queries []query
|
||||
}{
|
||||
data: []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}{
|
||||
data: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{
|
||||
"a": "a",
|
||||
|
@ -528,18 +524,12 @@ func TestBlockQuerierDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
cases := struct {
|
||||
data []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}
|
||||
data []seriesSamples
|
||||
|
||||
tombstones TombstoneReader
|
||||
queries []query
|
||||
}{
|
||||
data: []struct {
|
||||
lset map[string]string
|
||||
chunks [][]sample
|
||||
}{
|
||||
data: []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{
|
||||
"a": "a",
|
||||
|
|
Loading…
Reference in New Issue