diff --git a/compact.go b/compact.go index 3f5fa367c..0439ca310 100644 --- a/compact.go +++ b/compact.go @@ -524,6 +524,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { + if len(blocks) == 0 { + return errors.New("cannot populate block from no readers") + } + var ( set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) @@ -595,13 +599,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, continue } - if len(dranges) > 0 { - // Re-encode the chunk to not have deleted values. - for i, chk := range chks { + for i, chk := range chks { + if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { + return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", + chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) + } + + if len(dranges) > 0 { + // Re-encode the chunk to not have deleted values. if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { continue } - newChunk := chunkenc.NewXORChunk() app, err := newChunk.Appender() if err != nil { @@ -617,6 +625,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, chks[i].Chunk = newChunk } } + if err := chunkw.WriteChunks(chks...); err != nil { return errors.Wrap(err, "write chunks") } @@ -791,7 +800,6 @@ func (c *compactionMerger) Next() bool { var chks []chunks.Meta d := c.compare() - // Both sets contain the current series. Chain them into a single one. if d > 0 { lset, chks, c.intervals = c.b.At() c.l = append(c.l[:0], lset...) @@ -805,8 +813,10 @@ func (c *compactionMerger) Next() bool { c.aok = c.a.Next() } else { + // Both sets contain the current series. Chain them into a single one. l, ca, ra := c.a.At() _, cb, rb := c.b.At() + for _, r := range rb { ra = ra.add(r) } diff --git a/compact_test.go b/compact_test.go index 42e38b5c6..acbbb5182 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,12 +15,14 @@ package tsdb import ( "io/ioutil" + "math" "os" "path/filepath" "testing" "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/testutil" ) @@ -401,3 +403,282 @@ type erringBReader struct{} func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } + +type nopChunkWriter struct{} + +func (nopChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return nil } +func (nopChunkWriter) Close() error { return nil } + +func TestCompaction_populateBlock(t *testing.T) { + var populateBlocksCases = []struct { + title string + inputSeriesSamples [][]seriesSamples + compactMinTime int64 + compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64. + + expSeriesSamples []seriesSamples + expErr error + }{ + { + title: "Populate block from empty input should return error.", + inputSeriesSamples: [][]seriesSamples{}, + expErr: errors.New("cannot populate block from no readers"), + }, + { + // Populate from single block without chunks. We expect these kind of series being ignored. + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + }, + }, + }, + }, + { + title: "Populate from single block. We expect the same samples at the output.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + { + title: "Populate from two blocks.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, + }, + { + // no-chunk series should be dropped. + lset: map[string]string{"a": "empty"}, + }, + }, + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 21}, {t: 30}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 40}, {t: 45}}}, + }, + }, + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}}, + }, + }, + }, + { + title: "Populate from two blocks showing that order is maintained.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 21}, {t: 30}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 40}, {t: 45}}}, + }, + }, + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, + }, + }, + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 21}, {t: 30}}, {{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 40}, {t: 45}}, {{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, + }, + }, + }, + { + title: "Populate from two blocks showing that order or series is sorted.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "4"}, + chunks: [][]sample{{{t: 5}, {t: 7}}}, + }, + { + lset: map[string]string{"a": "3"}, + chunks: [][]sample{{{t: 5}, {t: 6}}}, + }, + { + lset: map[string]string{"a": "same"}, + chunks: [][]sample{{{t: 1}, {t: 4}}}, + }, + }, + { + { + lset: map[string]string{"a": "2"}, + chunks: [][]sample{{{t: 1}, {t: 3}}}, + }, + { + lset: map[string]string{"a": "1"}, + chunks: [][]sample{{{t: 1}, {t: 2}}}, + }, + { + lset: map[string]string{"a": "same"}, + chunks: [][]sample{{{t: 5}, {t: 8}}}, + }, + }, + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "1"}, + chunks: [][]sample{{{t: 1}, {t: 2}}}, + }, + { + lset: map[string]string{"a": "2"}, + chunks: [][]sample{{{t: 1}, {t: 3}}}, + }, + { + lset: map[string]string{"a": "3"}, + chunks: [][]sample{{{t: 5}, {t: 6}}}, + }, + { + lset: map[string]string{"a": "4"}, + chunks: [][]sample{{{t: 5}, {t: 7}}}, + }, + { + lset: map[string]string{"a": "same"}, + chunks: [][]sample{{{t: 1}, {t: 4}}, {{t: 5}, {t: 8}}}, + }, + }, + }, + { + // This should not happened because head block is making sure the chunks are not crossing block boundaries. + title: "Populate from single block containing chunk outside of compact meta time range.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 30}}}, + }, + }, + }, + compactMinTime: 0, + compactMaxTime: 20, + expErr: errors.New("found chunk with minTime: 10 maxTime: 30 outside of compacted minTime: 0 maxTime: 20"), + }, + { + // Introduced by https://github.com/prometheus/tsdb/issues/347. + title: "Populate from single block containing extra chunk", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "issue347"}, + chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}}, + }, + }, + }, + compactMinTime: 0, + compactMaxTime: 10, + expErr: errors.New("found chunk with minTime: 10 maxTime: 20 outside of compacted minTime: 0 maxTime: 10"), + }, + { + // No special deduplication expected. + title: "Populate from two blocks containing duplicated chunk.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}}, + }, + }, + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 10}, {t: 20}}}, + }, + }, + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}, {{t: 10}, {t: 20}}}, + }, + }, + }, + } + + for _, tc := range populateBlocksCases { + if ok := t.Run(tc.title, func(t *testing.T) { + blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples)) + for _, b := range tc.inputSeriesSamples { + ir, cr := createIdxChkReaders(b) + blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) + } + + c, err := NewLeveledCompactor(nil, nil, []int64{0}, nil) + testutil.Ok(t, err) + + meta := &BlockMeta{ + MinTime: tc.compactMinTime, + MaxTime: tc.compactMaxTime, + } + if meta.MaxTime == 0 { + meta.MaxTime = math.MaxInt64 + } + + iw := &mockIndexWriter{} + err = c.populateBlock(blocks, meta, iw, nopChunkWriter{}) + if tc.expErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.expErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) + + testutil.Equals(t, tc.expSeriesSamples, iw.series) + + // Check if stats are calculated properly. + s := BlockStats{ + NumSeries: uint64(len(tc.expSeriesSamples)), + } + for _, series := range tc.expSeriesSamples { + s.NumChunks += uint64(len(series.chunks)) + for _, chk := range series.chunks { + s.NumSamples += uint64(len(chk)) + } + } + testutil.Equals(t, s, meta.Stats) + }); !ok { + return + } + } +} diff --git a/mocks_test.go b/mocks_test.go new file mode 100644 index 000000000..ab796f8b5 --- /dev/null +++ b/mocks_test.go @@ -0,0 +1,73 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +type mockIndexWriter struct { + series []seriesSamples +} + +func (mockIndexWriter) AddSymbols(sym map[string]struct{}) error { return nil } +func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error { + i := -1 + for j, s := range m.series { + if !labels.FromMap(s.lset).Equals(l) { + continue + } + i = j + break + } + if i == -1 { + m.series = append(m.series, seriesSamples{ + lset: l.Map(), + }) + i = len(m.series) - 1 + } + + for _, chk := range chunks { + samples := make([]sample, 0, chk.Chunk.NumSamples()) + + iter := chk.Chunk.Iterator() + for iter.Next() { + s := sample{} + s.t, s.v = iter.At() + + samples = append(samples, s) + } + if err := iter.Err(); err != nil { + return err + } + + m.series[i].chunks = append(m.series[i].chunks, samples) + } + return nil +} + +func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } +func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil } +func (mockIndexWriter) Close() error { return nil } + +type mockBReader struct { + ir IndexReader + cr ChunkReader +} + +func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } +func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } +func (r *mockBReader) Tombstones() (TombstoneReader, error) { return NewMemTombstones(), nil } diff --git a/querier_test.go b/querier_test.go index 87a45b647..1beaf8044 100644 --- a/querier_test.go +++ b/querier_test.go @@ -245,12 +245,14 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { return r, it.Err() } -// Index: labels -> postings -> chunkMetas -> chunkRef -// ChunkReader: ref -> vals -func createIdxChkReaders(tc []struct { +type seriesSamples struct { lset map[string]string chunks [][]sample -}) (IndexReader, ChunkReader) { +} + +// Index: labels -> postings -> chunkMetas -> chunkRef +// ChunkReader: ref -> vals +func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { sort.Slice(tc, func(i, j int) bool { return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 }) @@ -322,17 +324,11 @@ func TestBlockQuerier(t *testing.T) { } cases := struct { - data []struct { - lset map[string]string - chunks [][]sample - } + data []seriesSamples queries []query }{ - data: []struct { - lset map[string]string - chunks [][]sample - }{ + data: []seriesSamples{ { lset: map[string]string{ "a": "a", @@ -528,18 +524,12 @@ func TestBlockQuerierDelete(t *testing.T) { } cases := struct { - data []struct { - lset map[string]string - chunks [][]sample - } + data []seriesSamples tombstones TombstoneReader queries []query }{ - data: []struct { - lset map[string]string - chunks [][]sample - }{ + data: []seriesSamples{ { lset: map[string]string{ "a": "a",