diff --git a/promql/value.go b/promql/value.go index cddaf7435..50642f439 100644 --- a/promql/value.go +++ b/promql/value.go @@ -300,6 +300,10 @@ func (ssi *storageSeriesIterator) AtHistogram() (int64, histogram.SparseHistogra return 0, histogram.SparseHistogram{} } +func (ssi *storageSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (ssi *storageSeriesIterator) Next() bool { ssi.curr++ return ssi.curr < len(ssi.points) diff --git a/storage/buffer.go b/storage/buffer.go index b6196a0c7..82e5a2832 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -202,6 +202,10 @@ func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) { return 0, histogram.SparseHistogram{} } +func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (r *sampleRing) at(i int) (int64, float64) { j := (r.f + i) % len(r.buf) s := r.buf[j] diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 2e0abfd0c..c242bfe10 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/prometheus/pkg/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" ) @@ -198,6 +199,9 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { return 0, histogram.SparseHistogram{} } +func (m *mockSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} func (m *mockSeriesIterator) Next() bool { return m.next() } func (m *mockSeriesIterator) Err() error { return m.err() } @@ -219,6 +223,10 @@ func (it *fakeSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { return it.idx * it.step, histogram.SparseHistogram{} // value doesn't matter } +func (it *fakeSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (it *fakeSeriesIterator) Next() bool { it.idx++ return it.idx < it.nsamples diff --git a/storage/merge.go b/storage/merge.go index ac43d11c0..cda1259f3 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -489,6 +489,13 @@ func (c *chainSampleIterator) AtHistogram() (int64, histogram.SparseHistogram) { return c.curr.AtHistogram() } +func (c *chainSampleIterator) ChunkEncoding() chunkenc.Encoding { + if c.curr == nil { + panic("chainSampleIterator.ChunkEncoding() called before first .Next() or after .Next() returned false.") + } + return c.curr.ChunkEncoding() +} + func (c *chainSampleIterator) Next() bool { if c.h == nil { c.h = samplesIteratorHeap{} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 388d3090c..9e455db5a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -373,6 +373,10 @@ func (c *concreteSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram return 0, histogram.SparseHistogram{} } +func (c *concreteSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + // Next implements storage.SeriesIterator. func (c *concreteSeriesIterator) Next() bool { c.cur++ diff --git a/storage/series.go b/storage/series.go index 86471a143..a7c559f2b 100644 --- a/storage/series.go +++ b/storage/series.go @@ -95,6 +95,10 @@ func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { return 0, histogram.SparseHistogram{} } +func (it *listSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (it *listSeriesIterator) Next() bool { it.idx++ return it.idx < it.samples.Len() diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index d28e34812..eeb6c2b61 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -103,6 +103,8 @@ type Iterator interface { // Err returns the current error. It should be used only after iterator is // exhausted, that is `Next` or `Seek` returns false. Err() error + // ChunkEncoding returns the encoding of the chunk that it is iterating. + ChunkEncoding() Encoding } // NewNopIterator returns a new chunk iterator that does not hold any data. @@ -117,8 +119,9 @@ func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } func (nopIterator) AtHistogram() (int64, histogram.SparseHistogram) { return math.MinInt64, histogram.SparseHistogram{} } -func (nopIterator) Next() bool { return false } -func (nopIterator) Err() error { return nil } +func (nopIterator) Next() bool { return false } +func (nopIterator) Err() error { return nil } +func (nopIterator) ChunkEncoding() Encoding { return EncNone } // Pool is used to create and reuse chunk references to avoid allocations. type Pool interface { diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index fcf5cdb83..9299e02de 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -403,6 +403,10 @@ func (it *histoIterator) At() (int64, float64) { panic("cannot call histoIterator.At().") } +func (it *histoIterator) ChunkEncoding() Encoding { + return EncSHS +} + func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) { return it.t, histogram.SparseHistogram{ Count: it.cnt, diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 374959221..c1c56c7e6 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -281,6 +281,10 @@ func (it *xorIterator) AtHistogram() (int64, histogram.SparseHistogram) { panic("cannot call xorIterator.AtHistogram().") } +func (it *xorIterator) ChunkEncoding() Encoding { + return EncXOR +} + func (it *xorIterator) Err() error { return it.err } diff --git a/tsdb/head.go b/tsdb/head.go index e112c9113..1f00b1d04 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2678,7 +2678,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { // iterator returns a chunk iterator. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { +func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) (ttt chunkenc.Iterator) { c, garbageCollect, err := s.chunk(id, chunkDiskMapper) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got @@ -2837,7 +2837,7 @@ func (it *memSafeIterator) Next() bool { return false } it.i++ - if it.total-it.i > 4 { + if it.Iterator.ChunkEncoding() == chunkenc.EncSHS || it.total-it.i > 4 { return it.Iterator.Next() } return true diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 371e0ecd6..09516494a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -2177,3 +2178,67 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { ok = it.Seek(7) require.False(t, ok) } + +func TestAppendHistogram(t *testing.T) { + l := labels.Labels{{Name: "a", Value: "b"}} + for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} { + t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) { + head, _ := newTestHead(t, 1000, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + + require.NoError(t, head.Init(0)) + app := head.Appender(context.Background()) + + type timedHist struct { + t int64 + h histogram.SparseHistogram + } + expHists := make([]timedHist, 0, numHistograms) + for i, h := range generateHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, int64(i), h) + require.NoError(t, err) + expHists = append(expHists, timedHist{int64(i), h}) + } + require.NoError(t, app.Commit()) + + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + require.NoError(t, err) + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator() + actHists := make([]timedHist, 0, len(expHists)) + for it.Next() { + t, h := it.AtHistogram() + actHists = append(actHists, timedHist{t, h.Copy()}) + } + + require.Equal(t, expHists, actHists) + }) + } +} + +func generateHistograms(n int) (r []histogram.SparseHistogram) { + for i := 0; i < n; i++ { + r = append(r, histogram.SparseHistogram{ + Count: 5 + uint64(i*4), + ZeroCount: 2 + uint64(i), + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + NegativeBuckets: []int64{}, + }) + } + + return r +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 771fce76e..d35fa4922 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -633,7 +633,10 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) bool { func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } func (p *populateWithDelSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { - return 0, histogram.SparseHistogram{} + return p.curr.AtHistogram() +} +func (p *populateWithDelSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return p.curr.ChunkEncoding() } func (p *populateWithDelSeriesIterator) Err() error { @@ -823,7 +826,12 @@ func (it *DeletedIterator) At() (int64, float64) { } func (it *DeletedIterator) AtHistogram() (int64, histogram.SparseHistogram) { - return 0, histogram.SparseHistogram{} + t, h := it.Iter.AtHistogram() + return t, h +} + +func (it *DeletedIterator) ChunkEncoding() chunkenc.Encoding { + return it.Iter.ChunkEncoding() } func (it *DeletedIterator) Seek(t int64) bool { @@ -835,7 +843,12 @@ func (it *DeletedIterator) Seek(t int64) bool { } // Now double check if the entry falls into a deleted interval. - ts, _ := it.At() + var ts int64 + if it.ChunkEncoding() == chunkenc.EncSHS { + ts, _ = it.AtHistogram() + } else { + ts, _ = it.At() + } for _, itv := range it.Intervals { if ts < itv.Mint { return true @@ -857,7 +870,12 @@ func (it *DeletedIterator) Seek(t int64) bool { func (it *DeletedIterator) Next() bool { Outer: for it.Iter.Next() { - ts, _ := it.Iter.At() + var ts int64 + if it.ChunkEncoding() == chunkenc.EncSHS { + ts, _ = it.AtHistogram() + } else { + ts, _ = it.At() + } for _, tr := range it.Intervals { if tr.InBounds(ts) { diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go index 1eb54f147..e96267f38 100644 --- a/tsdb/tsdbutil/buffer.go +++ b/tsdb/tsdbutil/buffer.go @@ -164,6 +164,10 @@ func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) { return 0, histogram.SparseHistogram{} } +func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (r *sampleRing) at(i int) (int64, float64) { j := (r.f + i) % len(r.buf) s := r.buf[j] diff --git a/tsdb/tsdbutil/buffer_test.go b/tsdb/tsdbutil/buffer_test.go index 0c6175219..ef9c06102 100644 --- a/tsdb/tsdbutil/buffer_test.go +++ b/tsdb/tsdbutil/buffer_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/prometheus/prometheus/pkg/histogram" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/require" ) @@ -155,6 +156,10 @@ func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { return 0, histogram.SparseHistogram{} } +func (it *listSeriesIterator) ChunkEncoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (it *listSeriesIterator) Next() bool { it.idx++ return it.idx < len(it.list)