Merge pull request #13844 from aknuds1/bugfix/wlog-checkpoint-float-histogram

tsdb/wlog.Checkpoint: Handle also float histograms
This commit is contained in:
Björn Rabenstein 2024-03-26 17:41:43 +01:00 committed by GitHub
commit 25a8d57671
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 66 additions and 15 deletions

View File

@ -149,22 +149,23 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
r := NewReader(sgmReader) r := NewReader(sgmReader)
var ( var (
series []record.RefSeries series []record.RefSeries
samples []record.RefSample samples []record.RefSample
histogramSamples []record.RefHistogramSample histogramSamples []record.RefHistogramSample
tstones []tombstones.Stone floatHistogramSamples []record.RefFloatHistogramSample
exemplars []record.RefExemplar tstones []tombstones.Stone
metadata []record.RefMetadata exemplars []record.RefExemplar
st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function. metadata []record.RefMetadata
dec = record.NewDecoder(st) st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function.
enc record.Encoder dec = record.NewDecoder(st)
buf []byte enc record.Encoder
recs [][]byte buf []byte
recs [][]byte
latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata) latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata)
) )
for r.Next() { for r.Next() {
series, samples, histogramSamples, tstones, exemplars, metadata = series[:0], samples[:0], histogramSamples[:0], tstones[:0], exemplars[:0], metadata[:0] series, samples, histogramSamples, floatHistogramSamples, tstones, exemplars, metadata = series[:0], samples[:0], histogramSamples[:0], floatHistogramSamples[:0], tstones[:0], exemplars[:0], metadata[:0]
// We don't reset the buffer since we batch up multiple records // We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint. // before writing them to the checkpoint.
@ -227,6 +228,24 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
stats.TotalSamples += len(histogramSamples) stats.TotalSamples += len(histogramSamples)
stats.DroppedSamples += len(histogramSamples) - len(repl) stats.DroppedSamples += len(histogramSamples) - len(repl)
case record.FloatHistogramSamples:
floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples)
if err != nil {
return nil, fmt.Errorf("decode float histogram samples: %w", err)
}
// Drop irrelevant floatHistogramSamples in place.
repl := floatHistogramSamples[:0]
for _, fh := range floatHistogramSamples {
if fh.T >= mint {
repl = append(repl, fh)
}
}
if len(repl) > 0 {
buf = enc.FloatHistogramSamples(repl, buf)
}
stats.TotalSamples += len(floatHistogramSamples)
stats.DroppedSamples += len(floatHistogramSamples) - len(repl)
case record.Tombstones: case record.Tombstones:
tstones, err = dec.Tombstones(rec, tstones) tstones, err = dec.Tombstones(rec, tstones)
if err != nil { if err != nil {

View File

@ -125,6 +125,20 @@ func TestCheckpoint(t *testing.T) {
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
} }
} }
makeFloatHistogram := func(i int) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
Count: 5 + float64(i*4),
ZeroCount: 2 + float64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{float64(i + 1), 1, -1, 0},
}
}
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} { for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) { t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
@ -154,7 +168,7 @@ func TestCheckpoint(t *testing.T) {
w, err = NewSize(nil, nil, dir, 64*1024, compress) w, err = NewSize(nil, nil, dir, 64*1024, compress)
require.NoError(t, err) require.NoError(t, err)
samplesInWAL, histogramsInWAL := 0, 0 samplesInWAL, histogramsInWAL, floatHistogramsInWAL := 0, 0, 0
var last int64 var last int64
for i := 0; ; i++ { for i := 0; ; i++ {
_, n, err := Segments(w.Dir()) _, n, err := Segments(w.Dir())
@ -200,6 +214,15 @@ func TestCheckpoint(t *testing.T) {
}, nil) }, nil)
require.NoError(t, w.Log(b)) require.NoError(t, w.Log(b))
histogramsInWAL += 4 histogramsInWAL += 4
fh := makeFloatHistogram(i)
b = enc.FloatHistogramSamples([]record.RefFloatHistogramSample{
{Ref: 0, T: last, FH: fh},
{Ref: 1, T: last + 10000, FH: fh},
{Ref: 2, T: last + 20000, FH: fh},
{Ref: 3, T: last + 30000, FH: fh},
}, nil)
require.NoError(t, w.Log(b))
floatHistogramsInWAL += 4
b = enc.Exemplars([]record.RefExemplar{ b = enc.Exemplars([]record.RefExemplar{
{Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i))}, {Ref: 1, T: last, V: float64(i), Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i))},
@ -226,7 +249,7 @@ func TestCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Truncate(107)) require.NoError(t, w.Truncate(107))
require.NoError(t, DeleteCheckpoints(w.Dir(), 106)) require.NoError(t, DeleteCheckpoints(w.Dir(), 106))
require.Equal(t, histogramsInWAL+samplesInWAL, stats.TotalSamples) require.Equal(t, histogramsInWAL+floatHistogramsInWAL+samplesInWAL, stats.TotalSamples)
require.Greater(t, stats.DroppedSamples, 0) require.Greater(t, stats.DroppedSamples, 0)
// Only the new checkpoint should be left. // Only the new checkpoint should be left.
@ -244,7 +267,7 @@ func TestCheckpoint(t *testing.T) {
var metadata []record.RefMetadata var metadata []record.RefMetadata
r := NewReader(sr) r := NewReader(sr)
samplesInCheckpoint, histogramsInCheckpoint := 0, 0 samplesInCheckpoint, histogramsInCheckpoint, floatHistogramsInCheckpoint := 0, 0, 0
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
@ -266,6 +289,13 @@ func TestCheckpoint(t *testing.T) {
require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp") require.GreaterOrEqual(t, h.T, last/2, "histogram with wrong timestamp")
} }
histogramsInCheckpoint += len(histograms) histogramsInCheckpoint += len(histograms)
case record.FloatHistogramSamples:
floatHistograms, err := dec.FloatHistogramSamples(rec, nil)
require.NoError(t, err)
for _, h := range floatHistograms {
require.GreaterOrEqual(t, h.T, last/2, "float histogram with wrong timestamp")
}
floatHistogramsInCheckpoint += len(floatHistograms)
case record.Exemplars: case record.Exemplars:
exemplars, err := dec.Exemplars(rec, nil) exemplars, err := dec.Exemplars(rec, nil)
require.NoError(t, err) require.NoError(t, err)
@ -283,6 +313,8 @@ func TestCheckpoint(t *testing.T) {
require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8) require.Less(t, float64(samplesInCheckpoint)/float64(samplesInWAL), 0.8)
require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5) require.Greater(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.5)
require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8) require.Less(t, float64(histogramsInCheckpoint)/float64(histogramsInWAL), 0.8)
require.Greater(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.5)
require.Less(t, float64(floatHistogramsInCheckpoint)/float64(floatHistogramsInWAL), 0.8)
expectedRefSeries := []record.RefSeries{ expectedRefSeries := []record.RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},