diff --git a/model/metric.go b/model/metric.go index 223b2ab8e..8db9d7802 100644 --- a/model/metric.go +++ b/model/metric.go @@ -75,6 +75,10 @@ type Metric map[LabelName]LabelValue // time. type SampleValue float64 +func (s SampleValue) Equal(o SampleValue) bool { + return s == o +} + func (s SampleValue) ToDTO() *float64 { return proto.Float64(float64(s)) } @@ -92,6 +96,10 @@ type SamplePair struct { Timestamp time.Time } +func (s SamplePair) Equal(o SamplePair) bool { + return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp) +} + type Values []SamplePair func (v Values) Len() int { @@ -136,6 +144,28 @@ func (v Values) InsideInterval(t time.Time) (s bool) { return true } +// TruncateBefore returns a subslice of the original such that extraneous +// samples in the collection that occur before the provided time are +// dropped. The original slice is not mutated. +func (v Values) TruncateBefore(t time.Time) (values Values) { + index := sort.Search(len(v), func(i int) bool { + timestamp := v[i].Timestamp + + return !timestamp.Before(t) + }) + + switch index { + case 0: + values = v + case len(v): + values = v[len(v)-1:] + default: + values = v[index-1:] + } + + return +} + func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) { for _, value := range dto.Value { v = append(v, SamplePair{ diff --git a/model/metric_test.go b/model/metric_test.go index 4aff61170..15484eecf 100644 --- a/model/metric_test.go +++ b/model/metric_test.go @@ -16,6 +16,7 @@ package model import ( "github.com/prometheus/prometheus/utility/test" "testing" + "time" ) func testMetric(t test.Tester) { @@ -77,3 +78,162 @@ func BenchmarkMetric(b *testing.B) { testMetric(b) } } + +func testValues(t test.Tester) { + type in struct { + values Values + time time.Time + } + instant := time.Now() + var scenarios = []struct { + in in + out Values + }{ + { + in: in{ + time: instant, + values: Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(2 * time.Second), + values: Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: Values{ + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(5 * time.Second), + values: Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: Values{ + // Preserve the last value in case it needs to be used for the next set. + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + } + + for i, scenario := range scenarios { + actual := scenario.in.values.TruncateBefore(scenario.in.time) + + if len(actual) != len(scenario.out) { + t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual)) + } + + for j, actualValue := range actual { + if !actualValue.Equal(scenario.out[j]) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue) + } + } + } +} + +func TestValues(t *testing.T) { + testValues(t) +} diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 5cf1b5bd0..c4364f731 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -417,6 +417,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) { break } + chunk = chunk.TruncateBefore(targetTime) + lastChunkTime := chunk[len(chunk)-1].Timestamp if lastChunkTime.After(targetTime) { targetTime = lastChunkTime @@ -428,6 +430,9 @@ func (t *tieredStorage) renderView(viewJob viewJob) { if op.CurrentTime().After(targetTime) { break } + + chunk = chunk.TruncateBefore(*(op.CurrentTime())) + for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { out = op.ExtractSamples(chunk) }