Integrate memory and disk layers in view rendering.
This commit is contained in:
parent
30c7acfaa4
commit
99dcbe0f94
|
@ -102,6 +102,18 @@ func (v Values) Swap(i, j int) {
|
|||
v[i], v[j] = v[j], v[i]
|
||||
}
|
||||
|
||||
// FirstTimeAfter indicates whether the first sample of a set is after a given
|
||||
// timestamp.
|
||||
func (v Values) FirstTimeAfter(t time.Time) bool {
|
||||
return v[0].Timestamp.After(t)
|
||||
}
|
||||
|
||||
// LastTimeBefore indicates whether the last sample of a set is before a given
|
||||
// timestamp.
|
||||
func (v Values) LastTimeBefore(t time.Time) bool {
|
||||
return v[len(v)-1].Timestamp.Before(t)
|
||||
}
|
||||
|
||||
// InsideInterval indicates whether a given range of sorted values could contain
|
||||
// a value for a given time.
|
||||
func (v Values) InsideInterval(t time.Time) (s bool) {
|
||||
|
|
|
@ -289,7 +289,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t
|
|||
if sampleSets[groupingKey] == nil {
|
||||
sampleSets[groupingKey] = &model.SampleSet{
|
||||
Metric: sample.Metric,
|
||||
Values: []model.SamplePair{samplePair},
|
||||
Values: model.Values{samplePair},
|
||||
}
|
||||
} else {
|
||||
sampleSets[groupingKey].Values = append(sampleSets[groupingKey].Values, samplePair)
|
||||
|
|
|
@ -57,7 +57,7 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m
|
|||
// surrounding a given target time. If samples are found both before and after
|
||||
// the target time, the sample value is interpolated between these. Otherwise,
|
||||
// the single closest sample is returned verbatim.
|
||||
func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp time.Time) (sample *model.SamplePair) {
|
||||
func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) (sample *model.SamplePair) {
|
||||
var closestBefore *model.SamplePair
|
||||
var closestAfter *model.SamplePair
|
||||
for _, candidate := range samples {
|
||||
|
|
|
@ -25,7 +25,7 @@ var testStartTime = time.Time{}
|
|||
|
||||
func getTestValueStream(startVal model.SampleValue,
|
||||
endVal model.SampleValue,
|
||||
stepVal model.SampleValue) (resultValues []model.SamplePair) {
|
||||
stepVal model.SampleValue) (resultValues model.Values) {
|
||||
currentTime := testStartTime
|
||||
for currentVal := startVal; currentVal <= endVal; currentVal += stepVal {
|
||||
sample := model.SamplePair{
|
||||
|
|
|
@ -47,9 +47,9 @@ type MetricPersistence interface {
|
|||
// Get the metric associated with the provided fingerprint.
|
||||
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
|
||||
|
||||
GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair
|
||||
GetValueAtTime(model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) model.Values
|
||||
|
||||
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
||||
|
||||
|
@ -64,9 +64,9 @@ type MetricPersistence interface {
|
|||
// View provides view of the values in the datastore subject to the request of a
|
||||
// preloading operation.
|
||||
type View interface {
|
||||
GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair
|
||||
GetValueAtTime(model.Fingerprint, time.Time) model.Values
|
||||
GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values)
|
||||
GetRangeValues(model.Fingerprint, model.Interval) model.Values
|
||||
|
||||
// Destroy this view.
|
||||
Close()
|
||||
|
|
|
@ -862,15 +862,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
|
|||
return
|
||||
}
|
||||
|
||||
func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) {
|
||||
func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) {
|
||||
func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) {
|
||||
func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
|
|
|
@ -210,7 +210,7 @@ func (s memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metri
|
|||
return
|
||||
}
|
||||
|
||||
func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) {
|
||||
func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
if !ok {
|
||||
return
|
||||
|
@ -251,13 +251,13 @@ func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (s
|
|||
return
|
||||
}
|
||||
|
||||
func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) {
|
||||
func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) {
|
||||
first = s.GetValueAtTime(f, i.OldestInclusive)
|
||||
second = s.GetValueAtTime(f, i.NewestInclusive)
|
||||
return
|
||||
}
|
||||
|
||||
func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) {
|
||||
func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
if !ok {
|
||||
return
|
||||
|
|
|
@ -26,7 +26,7 @@ type op interface {
|
|||
// The time at which this operation starts.
|
||||
StartsAt() time.Time
|
||||
// Extract samples from stream of values and advance operation time.
|
||||
ExtractSamples(in []model.SamplePair) (out []model.SamplePair)
|
||||
ExtractSamples(in model.Values) (out model.Values)
|
||||
// Get current operation time or nil if no subsequent work associated with
|
||||
// this operator remains.
|
||||
CurrentTime() *time.Time
|
||||
|
@ -73,7 +73,7 @@ func (g getValuesAtTimeOp) StartsAt() time.Time {
|
|||
return g.time
|
||||
}
|
||||
|
||||
func (g *getValuesAtTimeOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) {
|
||||
func (g *getValuesAtTimeOp) ExtractSamples(in model.Values) (out model.Values) {
|
||||
if len(in) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func (g getValuesAtTimeOp) GreedierThan(op op) (superior bool) {
|
|||
// are adjacent to it.
|
||||
//
|
||||
// An assumption of this is that the provided samples are already sorted!
|
||||
func extractValuesAroundTime(t time.Time, in []model.SamplePair) (out []model.SamplePair) {
|
||||
func extractValuesAroundTime(t time.Time, in model.Values) (out model.Values) {
|
||||
i := sort.Search(len(in), func(i int) bool {
|
||||
return !in[i].Timestamp.Before(t)
|
||||
})
|
||||
|
@ -151,7 +151,7 @@ func (g getValuesAtIntervalOp) Through() time.Time {
|
|||
return g.through
|
||||
}
|
||||
|
||||
func (g *getValuesAtIntervalOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) {
|
||||
func (g *getValuesAtIntervalOp) ExtractSamples(in model.Values) (out model.Values) {
|
||||
if len(in) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func (g getValuesAlongRangeOp) Through() time.Time {
|
|||
return g.through
|
||||
}
|
||||
|
||||
func (g *getValuesAlongRangeOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) {
|
||||
func (g *getValuesAlongRangeOp) ExtractSamples(in model.Values) (out model.Values) {
|
||||
if len(in) == 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1177,8 +1177,8 @@ func BenchmarkOptimize(b *testing.B) {
|
|||
func TestGetValuesAtTimeOp(t *testing.T) {
|
||||
var scenarios = []struct {
|
||||
op getValuesAtTimeOp
|
||||
in []model.SamplePair
|
||||
out []model.SamplePair
|
||||
in model.Values
|
||||
out model.Values
|
||||
}{
|
||||
// No values.
|
||||
{
|
||||
|
@ -1191,13 +1191,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1209,13 +1209,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(1 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1227,13 +1227,13 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1245,7 +1245,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1255,7 +1255,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1267,7 +1267,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(1 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1277,7 +1277,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1289,7 +1289,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(90 * time.Second),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1299,7 +1299,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1315,7 +1315,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1325,7 +1325,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1341,7 +1341,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
op: getValuesAtTimeOp{
|
||||
time: testInstant.Add(3 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1351,7 +1351,7 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1376,8 +1376,8 @@ func TestGetValuesAtTimeOp(t *testing.T) {
|
|||
func TestGetValuesAtIntervalOp(t *testing.T) {
|
||||
var scenarios = []struct {
|
||||
op getValuesAtIntervalOp
|
||||
in []model.SamplePair
|
||||
out []model.SamplePair
|
||||
in model.Values
|
||||
out model.Values
|
||||
}{
|
||||
// No values.
|
||||
{
|
||||
|
@ -1394,7 +1394,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(1 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1404,7 +1404,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1418,7 +1418,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1428,7 +1428,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1446,7 +1446,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(2 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1460,7 +1460,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1478,7 +1478,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1488,7 +1488,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1506,7 +1506,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(4 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1524,7 +1524,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1542,7 +1542,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
through: testInstant.Add(3 * time.Minute),
|
||||
interval: 30 * time.Second,
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1552,7 +1552,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1577,8 +1577,8 @@ func TestGetValuesAtIntervalOp(t *testing.T) {
|
|||
func TestGetValuesAlongRangeOp(t *testing.T) {
|
||||
var scenarios = []struct {
|
||||
op getValuesAlongRangeOp
|
||||
in []model.SamplePair
|
||||
out []model.SamplePair
|
||||
in model.Values
|
||||
out model.Values
|
||||
}{
|
||||
// No values.
|
||||
{
|
||||
|
@ -1593,7 +1593,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant,
|
||||
through: testInstant.Add(1 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1603,7 +1603,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{},
|
||||
out: model.Values{},
|
||||
},
|
||||
// Operator range starts before first value, ends within available values.
|
||||
{
|
||||
|
@ -1611,7 +1611,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant,
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1621,7 +1621,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1634,7 +1634,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant.Add(1 * time.Minute),
|
||||
through: testInstant.Add(2 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1648,7 +1648,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1661,7 +1661,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant,
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1671,7 +1671,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(1 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1688,7 +1688,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(4 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1706,7 +1706,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{
|
||||
out: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(2 * time.Minute),
|
||||
Value: 1,
|
||||
|
@ -1723,7 +1723,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
from: testInstant.Add(2 * time.Minute),
|
||||
through: testInstant.Add(3 * time.Minute),
|
||||
},
|
||||
in: []model.SamplePair{
|
||||
in: model.Values{
|
||||
{
|
||||
Timestamp: testInstant,
|
||||
Value: 1,
|
||||
|
@ -1733,7 +1733,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
|
|||
Value: 1,
|
||||
},
|
||||
},
|
||||
out: []model.SamplePair{},
|
||||
out: model.Values{},
|
||||
},
|
||||
}
|
||||
for i, scenario := range scenarios {
|
||||
|
|
|
@ -188,7 +188,7 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
|
|||
}
|
||||
}
|
||||
|
||||
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples []model.SamplePair, err error) {
|
||||
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples model.Values, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
|
@ -465,7 +465,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
|
|||
NewestInclusive: time.Unix(end, 0),
|
||||
}
|
||||
|
||||
samples := []model.SamplePair{}
|
||||
samples := model.Values{}
|
||||
fp := model.NewFingerprintFromMetric(metric)
|
||||
switch persistence := p.(type) {
|
||||
case *LevelDBMetricPersistence:
|
||||
|
|
|
@ -370,19 +370,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if t.diskFrontier == nil {
|
||||
// Storage still empty, return an empty view.
|
||||
viewJob.output <- view
|
||||
return
|
||||
}
|
||||
|
||||
for _, scanJob := range scans {
|
||||
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if seriesFrontier == nil {
|
||||
continue
|
||||
var seriesFrontier *seriesFrontier = nil
|
||||
if t.diskFrontier != nil {
|
||||
seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
standingOps := scanJob.operations
|
||||
|
@ -393,25 +388,47 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
}
|
||||
|
||||
// Load data value chunk(s) around the first standing op's current time.
|
||||
highWatermark := *standingOps[0].CurrentTime()
|
||||
// XXX: For earnest performance gains analagous to the benchmarking we
|
||||
// performed, chunk should only be reloaded if it no longer contains
|
||||
// the values we're looking for.
|
||||
//
|
||||
// To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue.
|
||||
chunk := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, highWatermark)
|
||||
targetTime := *standingOps[0].CurrentTime()
|
||||
|
||||
chunk := model.Values{}
|
||||
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
|
||||
// If we aimed before the oldest value in memory, load more data from disk.
|
||||
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
|
||||
// XXX: For earnest performance gains analagous to the benchmarking we
|
||||
// performed, chunk should only be reloaded if it no longer contains
|
||||
// the values we're looking for.
|
||||
//
|
||||
// To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue.
|
||||
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
|
||||
|
||||
// If we aimed past the newest value on disk, combine it with the next value from memory.
|
||||
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
|
||||
latestDiskValue := diskValues[len(diskValues)-1 : len(diskValues)]
|
||||
chunk = append(latestDiskValue, memValues...)
|
||||
} else {
|
||||
chunk = diskValues
|
||||
}
|
||||
} else {
|
||||
chunk = memValues
|
||||
}
|
||||
|
||||
// There's no data at all for this fingerprint, so stop processing ops for it.
|
||||
if len(chunk) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
lastChunkTime := chunk[len(chunk)-1].Timestamp
|
||||
if lastChunkTime.After(highWatermark) {
|
||||
highWatermark = lastChunkTime
|
||||
if lastChunkTime.After(targetTime) {
|
||||
targetTime = lastChunkTime
|
||||
}
|
||||
|
||||
// For each op, extract all needed data from the current chunk.
|
||||
out := []model.SamplePair{}
|
||||
out := model.Values{}
|
||||
for _, op := range standingOps {
|
||||
if op.CurrentTime().After(highWatermark) {
|
||||
if op.CurrentTime().After(targetTime) {
|
||||
break
|
||||
}
|
||||
for op.CurrentTime() != nil && !op.CurrentTime().After(highWatermark) {
|
||||
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
|
||||
out = op.ExtractSamples(chunk)
|
||||
}
|
||||
}
|
||||
|
@ -450,7 +467,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
return
|
||||
}
|
||||
|
||||
func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) {
|
||||
func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) {
|
||||
var (
|
||||
targetKey = &dto.SampleKey{
|
||||
Fingerprint: fingerprint.ToDTO(),
|
||||
|
|
|
@ -69,7 +69,7 @@ func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v
|
|||
return
|
||||
}
|
||||
|
||||
func testMakeView(t test.Tester) {
|
||||
func testMakeView(t test.Tester, flushToDisk bool) {
|
||||
type in struct {
|
||||
atTime []getValuesAtTimeOp
|
||||
atInterval []getValuesAtIntervalOp
|
||||
|
@ -77,9 +77,9 @@ func testMakeView(t test.Tester) {
|
|||
}
|
||||
|
||||
type out struct {
|
||||
atTime [][]model.SamplePair
|
||||
atInterval [][]model.SamplePair
|
||||
alongRange [][]model.SamplePair
|
||||
atTime []model.Values
|
||||
atInterval []model.Values
|
||||
alongRange []model.Values
|
||||
}
|
||||
var (
|
||||
instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local)
|
||||
|
@ -100,7 +100,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{{}},
|
||||
atTime: []model.Values{{}},
|
||||
},
|
||||
},
|
||||
// Single sample, query asks for exact sample time.
|
||||
|
@ -120,7 +120,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
|
@ -152,7 +152,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant.Add(time.Second),
|
||||
|
@ -179,7 +179,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
|
@ -211,7 +211,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
|
@ -248,7 +248,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant.Add(time.Second),
|
||||
|
@ -285,7 +285,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant,
|
||||
|
@ -326,7 +326,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant.Add(time.Second * 2),
|
||||
|
@ -351,7 +351,7 @@ func testMakeView(t test.Tester) {
|
|||
},
|
||||
},
|
||||
out: out{
|
||||
atTime: [][]model.SamplePair{
|
||||
atTime: []model.Values{
|
||||
{
|
||||
{
|
||||
Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize/2)),
|
||||
|
@ -378,7 +378,11 @@ func testMakeView(t test.Tester) {
|
|||
}
|
||||
}
|
||||
|
||||
tiered.Flush()
|
||||
if flushToDisk {
|
||||
tiered.Flush()
|
||||
} else {
|
||||
tiered.(*tieredStorage).writeMemory()
|
||||
}
|
||||
|
||||
requestBuilder := NewViewRequestBuilder()
|
||||
|
||||
|
@ -421,13 +425,23 @@ func testMakeView(t test.Tester) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMakeView(t *testing.T) {
|
||||
testMakeView(t)
|
||||
func TestMakeViewFlush(t *testing.T) {
|
||||
testMakeView(t, true)
|
||||
}
|
||||
|
||||
func BenchmarkMakeView(b *testing.B) {
|
||||
func BenchmarkMakeViewFlush(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testMakeView(b)
|
||||
testMakeView(b, true)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeViewNoFlush(t *testing.T) {
|
||||
testMakeView(t, false)
|
||||
}
|
||||
|
||||
func BenchmarkMakeViewNoFlush(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testMakeView(b, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue