WIP - Truncate irrelevant chunk values.

This does not work with the view tests.
This commit is contained in:
Matt T. Proud 2013-04-24 11:02:51 +02:00
parent 955708e8db
commit 05504d3642
3 changed files with 202 additions and 0 deletions

View File

@ -75,6 +75,10 @@ type Metric map[LabelName]LabelValue
// time. // time.
type SampleValue float64 type SampleValue float64
func (s SampleValue) Equal(o SampleValue) bool {
return s == o
}
func (s SampleValue) ToDTO() *float64 { func (s SampleValue) ToDTO() *float64 {
return proto.Float64(float64(s)) return proto.Float64(float64(s))
} }
@ -92,6 +96,17 @@ type SamplePair struct {
Timestamp time.Time Timestamp time.Time
} }
func (s SamplePair) Equal(o SamplePair) (equal bool) {
switch {
case !s.Value.Equal(o.Value):
return
case !s.Timestamp.Equal(o.Timestamp):
return
}
return true
}
type Values []SamplePair type Values []SamplePair
func (v Values) Len() int { func (v Values) Len() int {
@ -136,6 +151,28 @@ func (v Values) InsideInterval(t time.Time) (s bool) {
return true return true
} }
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated.
func (v Values) TruncateBefore(t time.Time) (values Values) {
index := sort.Search(len(v), func(i int) bool {
timestamp := v[i].Timestamp
return timestamp.After(t) || timestamp.Equal(t)
})
switch index {
case 0:
values = v
case len(v):
values = v[len(v)-1:]
default:
values = v[index-1:]
}
return
}
func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) { func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
for _, value := range dto.Value { for _, value := range dto.Value {
v = append(v, SamplePair{ v = append(v, SamplePair{

View File

@ -16,6 +16,7 @@ package model
import ( import (
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"testing" "testing"
"time"
) )
func testMetric(t test.Tester) { func testMetric(t test.Tester) {
@ -77,3 +78,162 @@ func BenchmarkMetric(b *testing.B) {
testMetric(b) testMetric(b)
} }
} }
func testValues(t test.Tester) {
type in struct {
values Values
time time.Time
}
instant := time.Now()
var scenarios = []struct {
in in
out Values
}{
{
in: in{
time: instant,
values: Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(2 * time.Second),
values: Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(5 * time.Second),
values: Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: Values{
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
}
for i, scenario := range scenarios {
actual := scenario.in.values.TruncateBefore(scenario.in.time)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual))
}
for j, actualValue := range actual {
if !actualValue.Equal(scenario.out[j]) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue)
}
}
}
}
func TestValues(t *testing.T) {
testValues(t)
}

View File

@ -422,12 +422,17 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
targetTime = lastChunkTime targetTime = lastChunkTime
} }
chunk = chunk.TruncateBefore(targetTime)
// For each op, extract all needed data from the current chunk. // For each op, extract all needed data from the current chunk.
out := model.Values{} out := model.Values{}
for _, op := range standingOps { for _, op := range standingOps {
if op.CurrentTime().After(targetTime) { if op.CurrentTime().After(targetTime) {
break break
} }
chunk = chunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(chunk) out = op.ExtractSamples(chunk)
} }