diff --git a/tsdb/block_test.go b/tsdb/block_test.go index e1bdf81b1..6fd0edc23 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -515,8 +516,67 @@ const ( defaultLabelValue = "labelValue" ) -// genSeries generates series with a given number of labels and values. +// genSeries generates series of float64 samples with a given number of labels and values. func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series { + return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, 1, func(ts int64) tsdbutil.Sample { + return sample{t: ts, v: rand.Float64()} + }) +} + +// genHistogramSeries generates series of histogram samples with a given number of labels and values. +func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { + return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { + h := &histogram.Histogram{ + Count: 5 + uint64(ts*4), + ZeroCount: 2 + uint64(ts), + ZeroThreshold: 0.001, + Sum: 18.4 * rand.Float64(), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, + } + return sample{t: ts, h: h} + }) +} + +// genHistogramAndFloatSeries generates series of mixed histogram and float64 samples with a given number of labels and values. +func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series { + floatSample := false + count := 0 + return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample { + count++ + var s sample + if floatSample { + s = sample{t: ts, v: rand.Float64()} + } else { + h := &histogram.Histogram{ + Count: 5 + uint64(ts*4), + ZeroCount: 2 + uint64(ts), + ZeroThreshold: 0.001, + Sum: 18.4 * rand.Float64(), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, + } + s = sample{t: ts, h: h} + } + + if count%5 == 0 { + // Flip the sample type for every 5 samples. + floatSample = !floatSample + } + + return s + }) +} + +func genSeriesFromSampleGenerator(totalSeries, labelCount int, mint, maxt, step int64, generator func(ts int64) tsdbutil.Sample) []storage.Series { if totalSeries == 0 || labelCount == 0 { return nil } @@ -530,8 +590,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series { lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) } samples := make([]tsdbutil.Sample, 0, maxt-mint+1) - for t := mint; t < maxt; t++ { - samples = append(samples, sample{t: t, v: rand.Float64()}) + for t := mint; t < maxt; t += step { + samples = append(samples, generator(t)) } series[i] = storage.NewListSeries(labels.FromMap(lbls), samples) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0be5ef0d5..0601c04e7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -40,6 +40,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" @@ -92,10 +93,17 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str samples := []tsdbutil.Sample{} it := series.Iterator() - for it.Next() == chunkenc.ValFloat { - // TODO(beorn7): Also handle histograms. - t, v := it.At() - samples = append(samples, sample{t: t, v: v}) + for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { + switch typ { + case chunkenc.ValFloat: + ts, v := it.At() + samples = append(samples, sample{t: ts, v: v}) + case chunkenc.ValHistogram: + ts, h := it.AtHistogram() + samples = append(samples, sample{t: ts, h: h}) + default: + t.Fatalf("unknown sample type in query %s", typ.String()) + } } require.NoError(t, it.Err()) @@ -3742,3 +3750,276 @@ func TestMetadataAssertInMemoryData(t *testing.T) { require.Equal(t, reopenDB.head.series.getByHash(s3.Hash(), s3).meta, m3) require.Equal(t, reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4) } + +func TestHistogramAppendAndQuery(t *testing.T) { + db := openTestDB(t, nil, nil) + minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + ctx := context.Background() + appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + t.Helper() + app := db.Appender(ctx) + _, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + } + appendFloat := func(lbls labels.Labels, tsMinute int, val float64, exp *[]tsdbutil.Sample) { + t.Helper() + app := db.Appender(ctx) + _, err := app.Append(0, lbls, minute(tsMinute), val) + require.NoError(t, err) + require.NoError(t, app.Commit()) + *exp = append(*exp, sample{t: minute(tsMinute), v: val}) + } + + testQuery := func(name, value string, exp map[string][]tsdbutil.Sample) { + t.Helper() + q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, name, value)) + require.Equal(t, exp, act) + } + + baseH := &histogram.Histogram{ + Count: 11, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{1, 2, -1}, + } + + var ( + series1 = labels.FromStrings("foo", "bar1") + series2 = labels.FromStrings("foo", "bar2") + series3 = labels.FromStrings("foo", "bar3") + series4 = labels.FromStrings("foo", "bar4") + exp1, exp2, exp3, exp4 []tsdbutil.Sample + ) + + // TODO(codesome): test everything for negative buckets as well. + t.Run("series with only histograms", func(t *testing.T) { + h := baseH.Copy() // This is shared across all sub tests. + + appendHistogram(series1, 100, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + + h.PositiveBuckets[0]++ + h.NegativeBuckets[0] += 2 + h.Count += 10 + appendHistogram(series1, 101, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + + t.Run("changing schema", func(t *testing.T) { + h.Schema = 2 + appendHistogram(series1, 102, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + + // Schema back to old. + h.Schema = 1 + appendHistogram(series1, 103, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + }) + + t.Run("new buckets incoming", func(t *testing.T) { + // This histogram with new bucket at the end causes the re-encoding of the previous histogram. + // Hence the previous histogram is recoded into this new layout. + // But the query returns the histogram from the in-memory buffer, hence we don't see the recode here yet. + h.PositiveSpans[1].Length++ + h.PositiveBuckets = append(h.PositiveBuckets, 1) + h.Count += 3 + appendHistogram(series1, 104, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + + // Now we add the new buckets in between. Empty bucket is again not present for the old histogram. + h.PositiveSpans[0].Length++ + h.PositiveSpans[1].Offset-- + h.Count += 3 + // {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1} + h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...) + appendHistogram(series1, 105, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + + // We add 4 more histograms to clear out the buffer and see the re-encoded histograms. + appendHistogram(series1, 106, h.Copy(), &exp1) + appendHistogram(series1, 107, h.Copy(), &exp1) + appendHistogram(series1, 108, h.Copy(), &exp1) + appendHistogram(series1, 109, h.Copy(), &exp1) + + // Update the expected histograms to reflect the re-encoding. + l := len(exp1) + h7 := exp1[l-7].H() + h7.PositiveSpans = exp1[l-1].H().PositiveSpans + h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets. + exp1[l-7] = sample{t: exp1[l-7].T(), h: h7} + + h6 := exp1[l-6].H() + h6.PositiveSpans = exp1[l-1].H().PositiveSpans + h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket. + exp1[l-6] = sample{t: exp1[l-6].T(), h: h6} + + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + }) + + t.Run("buckets disappearing", func(t *testing.T) { + h.PositiveSpans[1].Length-- + h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1] + appendHistogram(series1, 110, h.Copy(), &exp1) + testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) + }) + }) + + t.Run("series starting with float and then getting histograms", func(t *testing.T) { + appendFloat(series2, 100, 100, &exp2) + appendFloat(series2, 101, 101, &exp2) + appendFloat(series2, 102, 102, &exp2) + testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) + + h := baseH.Copy() + appendHistogram(series2, 103, h.Copy(), &exp2) + appendHistogram(series2, 104, h.Copy(), &exp2) + appendHistogram(series2, 105, h.Copy(), &exp2) + testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) + + // Switching between float and histograms again. + appendFloat(series2, 106, 106, &exp2) + appendFloat(series2, 107, 107, &exp2) + testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) + + appendHistogram(series2, 108, h.Copy(), &exp2) + appendHistogram(series2, 109, h.Copy(), &exp2) + testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) + }) + + t.Run("series starting with histogram and then getting float", func(t *testing.T) { + h := baseH.Copy() + appendHistogram(series3, 101, h.Copy(), &exp3) + appendHistogram(series3, 102, h.Copy(), &exp3) + appendHistogram(series3, 103, h.Copy(), &exp3) + testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) + + appendFloat(series3, 104, 100, &exp3) + appendFloat(series3, 105, 101, &exp3) + appendFloat(series3, 106, 102, &exp3) + testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) + + // Switching between histogram and float again. + appendHistogram(series3, 107, h.Copy(), &exp3) + appendHistogram(series3, 108, h.Copy(), &exp3) + testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) + + appendFloat(series3, 109, 106, &exp3) + appendFloat(series3, 110, 107, &exp3) + testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) + }) + + t.Run("query mix of histogram and float series", func(t *testing.T) { + // A float only series. + appendFloat(series4, 100, 100, &exp4) + appendFloat(series4, 101, 101, &exp4) + appendFloat(series4, 102, 102, &exp4) + + testQuery("foo", "bar.*", map[string][]tsdbutil.Sample{ + series1.String(): exp1, + series2.String(): exp2, + series3.String(): exp3, + series4.String(): exp4, + }) + }) +} + +func TestQueryHistogramFromBlocks(t *testing.T) { + minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } + + testBlockQuerying := func(t *testing.T, blockSeries ...[]storage.Series) { + t.Helper() + + opts := DefaultOptions() + opts.AllowOverlappingBlocks = true + db := openTestDB(t, opts, nil) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + ctx := context.Background() + + exp := make(map[string][]tsdbutil.Sample) + for _, series := range blockSeries { + createBlock(t, db.Dir(), series) + + for _, s := range series { + key := s.Labels().String() + it := s.Iterator() + slice := exp[key] + for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { + switch typ { + case chunkenc.ValFloat: + ts, v := it.At() + slice = append(slice, sample{t: ts, v: v}) + case chunkenc.ValHistogram: + ts, h := it.AtHistogram() + slice = append(slice, sample{t: ts, h: h}) + } + } + sort.Slice(slice, func(i, j int) bool { + return slice[i].T() < slice[j].T() + }) + exp[key] = slice + } + } + + require.Len(t, db.Blocks(), 0) + require.NoError(t, db.reload()) + require.Len(t, db.Blocks(), len(blockSeries)) + + q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + require.Equal(t, exp, res) + } + + t.Run("serial blocks with only histograms", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(119), minute(1)), + genHistogramSeries(10, 5, minute(120), minute(239), minute(1)), + genHistogramSeries(10, 5, minute(240), minute(359), minute(1)), + ) + }) + + t.Run("overlapping blocks with only histograms", func(t *testing.T) { + testBlockQuerying(t, + genHistogramSeries(10, 5, minute(0), minute(120), minute(3)), + genHistogramSeries(10, 5, minute(1), minute(120), minute(3)), + genHistogramSeries(10, 5, minute(2), minute(120), minute(3)), + ) + }) + + t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) { + testBlockQuerying(t, + genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1)), + genHistogramSeries(10, 5, minute(61), minute(120), minute(1)), + genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1)), + ) + }) + + t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) { + testBlockQuerying(t, + genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3)), + genHistogramSeries(10, 5, minute(46), minute(100), minute(3)), + genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3)), + ) + }) +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 6f8cc218a..706f2cd42 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -553,7 +553,7 @@ func ValidateHistogram(h *histogram.Histogram) error { if c := negativeCount + positiveCount; c > h.Count { return errors.Wrap( storage.ErrHistogramCountNotBigEnough, - fmt.Sprintf("%d observations found in buckets, but overall count is %d", c, h.Count), + fmt.Sprintf("%d observations found in buckets, but the Count field is %d", c, h.Count), ) } @@ -712,7 +712,6 @@ func (a *headAppender) Commit() (err error) { defer a.head.putHistogramBuffer(a.histograms) defer a.head.putMetadataBuffer(a.metadata) defer a.head.iso.closeAppend(a.appendID) - total := len(a.samples) var series *memSeries for i, s := range a.samples { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7ab99b220..a709a7ca9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3886,7 +3886,7 @@ func TestHistogramValidation(t *testing.T) { NegativeBuckets: []int64{1}, PositiveBuckets: []int64{1}, }, - errMsg: `2 observations found in buckets, but overall count is 0`, + errMsg: `2 observations found in buckets, but the Count field is 0`, }, } diff --git a/tsdb/querier.go b/tsdb/querier.go index d279e5080..82b752cbb 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -696,7 +696,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { if !p.next() { return false } - p.curr = p.currChkMeta if p.currDelIter == nil { return true diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 858388f5b..777db5e90 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -54,14 +54,34 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l ref := storage.SeriesRef(0) it := s.Iterator() lset := s.Labels() - for it.Next() == chunkenc.ValFloat { - // TODO(beorn7): Add histogram support. - t, v := it.At() - ref, err = app.Append(ref, lset, t, v) + typ := it.Next() + lastTyp := typ + for ; typ != chunkenc.ValNone; typ = it.Next() { + if lastTyp != typ { + // The behaviour of appender is undefined if samples of different types + // are appended to the same series in a single Commit(). + if err = app.Commit(); err != nil { + return "", err + } + app = w.Appender(ctx) + sampleCount = 0 + } + + switch typ { + case chunkenc.ValFloat: + t, v := it.At() + ref, err = app.Append(ref, lset, t, v) + case chunkenc.ValHistogram: + t, h := it.AtHistogram() + ref, err = app.AppendHistogram(ref, lset, t, h) + default: + return "", fmt.Errorf("unknown sample type %s", typ.String()) + } if err != nil { return "", err } sampleCount++ + lastTyp = typ } if it.Err() != nil { return "", it.Err()