Fix corrupting spans via iterator sharing

Iterator may share spans without copy, so we always have to make a copy
before modification - copy-on-write.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-08-06 16:51:20 +02:00
parent d2f6fa7289
commit 98ecdf3589
4 changed files with 14 additions and 4 deletions

View File

@ -782,9 +782,10 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
// of the chunk. // of the chunk.
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets. // No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
h.PositiveSpans = resize(h.PositiveSpans, len(a.pSpans)) // However we need to make a copy in case the input is sharing spans from an iterator.
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
copy(h.PositiveSpans, a.pSpans) copy(h.PositiveSpans, a.pSpans)
h.NegativeSpans = resize(h.NegativeSpans, len(a.nSpans)) h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
copy(h.NegativeSpans, a.nSpans) copy(h.NegativeSpans, a.nSpans)
} else { } else {
// Spans need pre-adjusting to accommodate the new buckets. // Spans need pre-adjusting to accommodate the new buckets.

View File

@ -411,6 +411,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
{Offset: 3, Length: 2}, {Offset: 3, Length: 2},
{Offset: 5, Length: 1}, {Offset: 5, Length: 1},
} }
savedH2Spans := h2.PositiveSpans
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2} h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
@ -426,6 +427,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
// Check that h2 was recoded. // Check that h2 was recoded.
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets) require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans) require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
} }
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty. { // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
@ -439,6 +441,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
{Offset: 3, Length: 2}, {Offset: 3, Length: 2},
{Offset: 5, Length: 2}, {Offset: 5, Length: 2},
} }
savedH2Spans := h2.PositiveSpans
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2, 3} h2.PositiveBuckets = []float64{7, 4, 3, 5, 2, 3}
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
@ -460,6 +463,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
{Offset: 3, Length: 1}, // Added empty bucket. {Offset: 3, Length: 1}, // Added empty bucket.
{Offset: 1, Length: 2}, // Existing + the extra bucket. {Offset: 1, Length: 2}, // Existing + the extra bucket.
}, h2.PositiveSpans) }, h2.PositiveSpans)
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
} }
{ // New histogram that has a counter reset while buckets are same. { // New histogram that has a counter reset while buckets are same.

View File

@ -816,9 +816,10 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
// of the chunk. // of the chunk.
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 { if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets. // No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
h.PositiveSpans = resize(h.PositiveSpans, len(a.pSpans)) // However we need to make a copy in case the input is sharing spans from an iterator.
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
copy(h.PositiveSpans, a.pSpans) copy(h.PositiveSpans, a.pSpans)
h.NegativeSpans = resize(h.NegativeSpans, len(a.nSpans)) h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
copy(h.NegativeSpans, a.nSpans) copy(h.NegativeSpans, a.nSpans)
} else { } else {
// Spans need pre-adjusting to accommodate the new buckets. // Spans need pre-adjusting to accommodate the new buckets.

View File

@ -428,6 +428,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
{Offset: 4, Length: 1}, {Offset: 4, Length: 1},
{Offset: 1, Length: 1}, {Offset: 1, Length: 1},
} }
savedH2Spans := h2.PositiveSpans
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18) h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
@ -443,6 +444,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
// Check that h2 was recoded. // Check that h2 was recoded.
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18) require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans) require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
} }
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty. { // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
@ -457,6 +459,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
{Offset: 4, Length: 1}, {Offset: 4, Length: 1},
{Offset: 1, Length: 2}, {Offset: 1, Length: 2},
} }
savedH2Spans := h2.PositiveSpans
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1, 1} // counts: 7, 2, 3, 3, 4, 5 (total 23) h2.PositiveBuckets = []int64{7, -5, 1, 0, 1, 1} // counts: 7, 2, 3, 3, 4, 5 (total 23)
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2) posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
@ -478,6 +481,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
{Offset: 3, Length: 1}, // Existing - offset adjusted. {Offset: 3, Length: 1}, // Existing - offset adjusted.
{Offset: 1, Length: 2}, // Existing. {Offset: 1, Length: 2}, // Existing.
}, h2.PositiveSpans) }, h2.PositiveSpans)
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
} }
{ // New histogram that has a counter reset while buckets are same. { // New histogram that has a counter reset while buckets are same.