From d5ab1851dc52c046401353c679f8e3c87fcff440 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Fri, 1 Mar 2024 14:49:42 +0100 Subject: [PATCH 1/2] SampleRingIterator: add currType field Signed-off-by: Yuri Nikolic --- storage/buffer.go | 53 ++++++++++++++++++++++------------- storage/buffer_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 19 deletions(-) diff --git a/storage/buffer.go b/storage/buffer.go index b3c789e97..545b7ed14 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -316,35 +316,40 @@ func (r *sampleRing) iterator() *SampleRingIterator { // SampleRingIterator is returned by BufferedSeriesIterator.Buffer() and can be // used to iterate samples buffered in the lookback window. type SampleRingIterator struct { - r *sampleRing - i int - t int64 - f float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + r *sampleRing + i int + t int64 + f float64 + h *histogram.Histogram + fh *histogram.FloatHistogram + currType chunkenc.ValueType } func (it *SampleRingIterator) Next() chunkenc.ValueType { it.i++ + it.currType = chunkenc.ValNone if it.i >= it.r.l { - return chunkenc.ValNone + return it.currType } switch it.r.bufInUse { case fBuf: s := it.r.atF(it.i) it.t = s.t it.f = s.f - return chunkenc.ValFloat + it.currType = chunkenc.ValFloat case hBuf: s := it.r.atH(it.i) it.t = s.t it.h = s.h - return chunkenc.ValHistogram + it.currType = chunkenc.ValHistogram case fhBuf: s := it.r.atFH(it.i) it.t = s.t it.fh = s.fh - return chunkenc.ValFloatHistogram + it.currType = chunkenc.ValFloatHistogram + } + if it.currType != chunkenc.ValNone { + return it.currType } s := it.r.at(it.i) it.t = s.T() @@ -352,24 +357,31 @@ func (it *SampleRingIterator) Next() chunkenc.ValueType { case chunkenc.ValHistogram: it.h = s.H() it.fh = nil - return chunkenc.ValHistogram + it.currType = chunkenc.ValHistogram case chunkenc.ValFloatHistogram: it.fh = s.FH() it.h = nil - return chunkenc.ValFloatHistogram + it.currType = chunkenc.ValFloatHistogram default: it.f = s.F() - return chunkenc.ValFloat + it.currType = chunkenc.ValFloat } + return it.currType } // At returns the current float element of the iterator. func (it *SampleRingIterator) At() (int64, float64) { + if it.currType != chunkenc.ValFloat { + panic(fmt.Sprintf("SampleRingIterator: impossible to read a float, current value type is %v", it.currType)) + } return it.t, it.f } // AtHistogram returns the current histogram element of the iterator. func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) { + if it.currType != chunkenc.ValHistogram { + panic(fmt.Sprintf("SampleRingIterator: impossible to read a histogram, current value type is %v", it.currType)) + } return it.t, it.h } @@ -378,14 +390,17 @@ func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) { // An optional histogram.FloatHistogram can be provided to avoid allocating a new // object for the conversion. func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - if it.fh == nil { - return it.t, it.h.ToFloat(fh) + if it.currType != chunkenc.ValHistogram && it.currType != chunkenc.ValFloatHistogram { + panic(fmt.Sprintf("SampleRingIterator: impossible to read a float histogram, current value type is %v", it.currType)) } - if fh != nil { - it.fh.CopyTo(fh) - return it.t, fh + if it.currType == chunkenc.ValFloatHistogram { + if fh != nil { + it.fh.CopyTo(fh) + return it.t, fh + } + return it.t, it.fh.Copy() } - return it.t, it.fh.Copy() + return it.t, it.h.ToFloat(fh) } func (it *SampleRingIterator) AtT() int64 { diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 61074c212..1b24e5da2 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -138,6 +138,69 @@ func TestSampleRingMixed(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Next()) } +func TestSampleRingAtFloatHistogram(t *testing.T) { + fh1 := tsdbutil.GenerateTestFloatHistogram(1) + fh2 := tsdbutil.GenerateTestFloatHistogram(2) + h1 := tsdbutil.GenerateTestHistogram(3) + h2 := tsdbutil.GenerateTestHistogram(4) + + // With ValNone as the preferred type, nothing should be initialized. + r := newSampleRing(10, 2, chunkenc.ValNone) + require.Zero(t, len(r.fBuf)) + require.Zero(t, len(r.hBuf)) + require.Zero(t, len(r.fhBuf)) + require.Zero(t, len(r.iBuf)) + + var ( + h *histogram.Histogram + fh *histogram.FloatHistogram + ts int64 + ) + + it := r.iterator() + require.Equal(t, chunkenc.ValNone, it.Next()) + + r.addFH(fhSample{t: 1, fh: fh1}) + r.addFH(fhSample{t: 2, fh: fh2}) + + it = r.iterator() + + require.Equal(t, chunkenc.ValFloatHistogram, it.Next()) + ts, fh = it.AtFloatHistogram(fh) + require.Equal(t, int64(1), ts) + require.Equal(t, fh1, fh) + require.Equal(t, chunkenc.ValFloatHistogram, it.Next()) + ts, fh = it.AtFloatHistogram(fh) + require.Equal(t, int64(2), ts) + require.Equal(t, fh2, fh) + require.Equal(t, chunkenc.ValNone, it.Next()) + + r.reset() + it = r.iterator() + require.Equal(t, chunkenc.ValNone, it.Next()) + + r.addH(hSample{t: 3, h: h1}) + r.addH(hSample{t: 4, h: h2}) + + it = r.iterator() + + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, h1, h) + ts, fh = it.AtFloatHistogram(fh) + require.Equal(t, int64(3), ts) + require.Equal(t, h1.ToFloat(nil), fh) + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(4), ts) + require.Equal(t, h2, h) + ts, fh = it.AtFloatHistogram(fh) + require.Equal(t, int64(4), ts) + require.Equal(t, h2.ToFloat(nil), fh) + require.Equal(t, chunkenc.ValNone, it.Next()) +} + func TestBufferedSeriesIterator(t *testing.T) { var it *BufferedSeriesIterator From 7df0b9b92d5959b9ef14e8a1a6b5e7f357c126e9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 5 Mar 2024 15:41:18 +0100 Subject: [PATCH 2/2] storage: simplify sampleRing fix Signed-off-by: beorn7 --- storage/buffer.go | 65 ++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/storage/buffer.go b/storage/buffer.go index 545b7ed14..651e5c83e 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -306,50 +306,51 @@ func (r *sampleRing) reset() { r.iBuf = r.iBuf[:0] } -// Returns the current iterator. Invalidates previously returned iterators. +// Resets and returns the iterator. Invalidates previously returned iterators. func (r *sampleRing) iterator() *SampleRingIterator { - r.it.r = r - r.it.i = -1 + r.it.reset(r) return &r.it } // SampleRingIterator is returned by BufferedSeriesIterator.Buffer() and can be // used to iterate samples buffered in the lookback window. type SampleRingIterator struct { - r *sampleRing - i int - t int64 - f float64 - h *histogram.Histogram - fh *histogram.FloatHistogram - currType chunkenc.ValueType + r *sampleRing + i int + t int64 + f float64 + h *histogram.Histogram + fh *histogram.FloatHistogram +} + +func (it *SampleRingIterator) reset(r *sampleRing) { + it.r = r + it.i = -1 + it.h = nil + it.fh = nil } func (it *SampleRingIterator) Next() chunkenc.ValueType { it.i++ - it.currType = chunkenc.ValNone if it.i >= it.r.l { - return it.currType + return chunkenc.ValNone } switch it.r.bufInUse { case fBuf: s := it.r.atF(it.i) it.t = s.t it.f = s.f - it.currType = chunkenc.ValFloat + return chunkenc.ValFloat case hBuf: s := it.r.atH(it.i) it.t = s.t it.h = s.h - it.currType = chunkenc.ValHistogram + return chunkenc.ValHistogram case fhBuf: s := it.r.atFH(it.i) it.t = s.t it.fh = s.fh - it.currType = chunkenc.ValFloatHistogram - } - if it.currType != chunkenc.ValNone { - return it.currType + return chunkenc.ValFloatHistogram } s := it.r.at(it.i) it.t = s.T() @@ -357,31 +358,24 @@ func (it *SampleRingIterator) Next() chunkenc.ValueType { case chunkenc.ValHistogram: it.h = s.H() it.fh = nil - it.currType = chunkenc.ValHistogram + return chunkenc.ValHistogram case chunkenc.ValFloatHistogram: it.fh = s.FH() it.h = nil - it.currType = chunkenc.ValFloatHistogram + return chunkenc.ValFloatHistogram default: it.f = s.F() - it.currType = chunkenc.ValFloat + return chunkenc.ValFloat } - return it.currType } // At returns the current float element of the iterator. func (it *SampleRingIterator) At() (int64, float64) { - if it.currType != chunkenc.ValFloat { - panic(fmt.Sprintf("SampleRingIterator: impossible to read a float, current value type is %v", it.currType)) - } return it.t, it.f } // AtHistogram returns the current histogram element of the iterator. func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) { - if it.currType != chunkenc.ValHistogram { - panic(fmt.Sprintf("SampleRingIterator: impossible to read a histogram, current value type is %v", it.currType)) - } return it.t, it.h } @@ -390,17 +384,14 @@ func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) { // An optional histogram.FloatHistogram can be provided to avoid allocating a new // object for the conversion. func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - if it.currType != chunkenc.ValHistogram && it.currType != chunkenc.ValFloatHistogram { - panic(fmt.Sprintf("SampleRingIterator: impossible to read a float histogram, current value type is %v", it.currType)) + if it.fh == nil { + return it.t, it.h.ToFloat(fh) } - if it.currType == chunkenc.ValFloatHistogram { - if fh != nil { - it.fh.CopyTo(fh) - return it.t, fh - } - return it.t, it.fh.Copy() + if fh != nil { + it.fh.CopyTo(fh) + return it.t, fh } - return it.t, it.h.ToFloat(fh) + return it.t, it.fh.Copy() } func (it *SampleRingIterator) AtT() int64 {