Fix Stale. Pol. support in GetValueAtTime.

This commit is contained in:
Matt T. Proud 2013-01-06 18:33:22 +01:00
parent f19ca0e8a5
commit be9b7942c1
2 changed files with 250 additions and 161 deletions

View File

@ -14,7 +14,6 @@
package leveldb
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
@ -305,111 +304,6 @@ type iterator interface {
Value() []byte
}
func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err error) {
byteKey, err := coding.NewProtocolBufferEncoder(k).Encode()
if err != nil {
return
}
i.Seek(byteKey)
if !i.Valid() {
return
}
var (
retrievedKey *dto.SampleKey
)
retrievedKey, err = extractSampleKey(i)
if err != nil {
return
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
return
}
if bytes.Equal(retrievedKey.Timestamp, k.Timestamp) {
return true, nil
}
i.Prev()
if !i.Valid() {
return
}
retrievedKey, err = extractSampleKey(i)
if err != nil {
return
}
b = fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint)
return
}
func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) {
byteKey, err := coding.NewProtocolBufferEncoder(k).Encode()
if err != nil {
return
}
i.Seek(byteKey)
if !i.Valid() {
i.SeekToFirst()
}
var (
retrievedKey *dto.SampleKey
)
retrievedKey, err = extractSampleKey(i)
if err != nil {
return
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
return
}
keyTime := indexable.DecodeTime(k.Timestamp)
retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp)
return retrievedTime.Before(keyTime), nil
}
func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) {
byteKey, err := coding.NewProtocolBufferEncoder(k).Encode()
if err != nil {
return
}
i.Seek(byteKey)
if !i.Valid() {
i.SeekToLast()
}
var (
retrievedKey *dto.SampleKey
)
retrievedKey, err = extractSampleKey(i)
if err != nil {
return
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
return
}
keyTime := indexable.DecodeTime(k.Timestamp)
retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp)
return retrievedTime.After(keyTime), nil
}
func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (sample *model.Sample, err error) {
d := model.MetricToDTO(m)
@ -433,13 +327,33 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
if err != nil {
return
}
defer closer.Close()
iterator.Seek(e)
within, err := isKeyInsideRecordedInterval(k, iterator)
if err != nil || !within {
return
if !iterator.Valid() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek after AND outside known range.
*
* Once a LevelDB iterator goes invalid, it cannot be recovered; thusly,
* we need to create a new in order to check if the last value in the
* database is sufficient for our purposes. This is, in all reality, a
* corner case but one that could bring down the system.
*/
iterator, closer, err = l.metricSamples.GetIterator()
if err != nil {
return
}
defer closer.Close()
iterator.SeekToLast()
if !iterator.Valid() {
/*
* For whatever reason, the LevelDB cannot be recovered.
*/
return
}
}
var (
@ -452,60 +366,126 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
return
}
if fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) {
firstValue, err = extractSampleValue(iterator)
if err != nil {
return nil, err
}
foundTimestamp := indexable.DecodeTime(firstKey.Timestamp)
targetTimestamp := indexable.DecodeTime(k.Timestamp)
if foundTimestamp.Equal(targetTimestamp) {
return model.SampleFromDTO(m, t, firstValue), nil
}
} else {
if !fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) {
return
}
var (
secondKey *dto.SampleKey
secondValue *dto.SampleValue
)
firstTime := indexable.DecodeTime(firstKey.Timestamp)
if t.Before(firstTime) {
iterator.Prev()
if !iterator.Valid() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek before AND outside known range.
*
* This is an explicit validation to ensure that if no previous values for
* the series are found, the query aborts.
*/
return
}
var (
alternativeKey *dto.SampleKey
alternativeValue *dto.SampleValue
)
alternativeKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if fingerprintsEqual(alternativeKey.Fingerprint, k.Fingerprint) {
/*
* At this point, we found a previous value in the same series in the
* database. LevelDB originally seeked to the subsequent element given
* the key, but we need to consider this adjacency instead.
*/
alternativeTime := indexable.DecodeTime(alternativeKey.Timestamp)
firstKey = alternativeKey
firstValue = alternativeValue
firstTime = alternativeTime
}
}
firstDelta := firstTime.Sub(*t)
if firstDelta < 0 {
firstDelta *= -1
}
if firstDelta > s.DeltaAllowance {
return
}
firstValue, err = extractSampleValue(iterator)
if err != nil {
return
}
sample = model.SampleFromDTO(m, t, firstValue)
if firstDelta == time.Duration(0) {
return
}
iterator.Next()
if !iterator.Valid() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek after AND outside known range.
*
* This means that there are no more values left in the storage; and if this
* point is reached, we know that the one that has been found is within the
* allowed staleness limits.
*/
return
}
var secondKey *dto.SampleKey
secondKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) {
secondValue, err = extractSampleValue(iterator)
if err != nil {
return
}
if !fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) {
return
} else {
/*
* At this point, current entry in the database has the same key as the
* previous. For this reason, the validation logic will expect that the
* distance between the two points shall not exceed the staleness policy
* allowed limit to reduce interpolation errors.
*
* For this reason, the sample is reset in case of other subsequent
* validation behaviors.
*/
sample = nil
}
secondTime := indexable.DecodeTime(secondKey.Timestamp)
totalDelta := secondTime.Sub(firstTime)
if totalDelta > s.DeltaAllowance {
return
}
firstTime := indexable.DecodeTime(firstKey.Timestamp)
secondTime := indexable.DecodeTime(secondKey.Timestamp)
currentDelta := secondTime.Sub(firstTime)
var secondValue *dto.SampleValue
if currentDelta <= s.DeltaAllowance {
interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t)
emission := &dto.SampleValue{
Value: &interpolated,
}
return model.SampleFromDTO(m, t, emission), nil
secondValue, err = extractSampleValue(iterator)
if err != nil {
return
}
interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t)
sampleValue := &dto.SampleValue{
Value: &interpolated,
}
sample = model.SampleFromDTO(m, t, sampleValue)
return
}

View File

@ -14,6 +14,7 @@
package leveldb
import (
"fmt"
"github.com/matttproud/prometheus/model"
"github.com/matttproud/prometheus/storage/metric"
"github.com/matttproud/prometheus/utility/test"
@ -120,7 +121,7 @@ var testGetValueAtTime = func(t test.Tester) {
},
},
{
name: "before with staleness policy",
name: "before within staleness policy",
input: input{
year: 1984,
month: 3,
@ -129,6 +130,16 @@ var testGetValueAtTime = func(t test.Tester) {
staleness: time.Duration(365*24) * time.Hour,
},
},
{
name: "before outside staleness policy",
input: input{
year: 1984,
month: 3,
day: 29,
hour: 0,
staleness: time.Duration(1) * time.Hour,
},
},
{
name: "after without staleness policy",
input: input{
@ -140,7 +151,7 @@ var testGetValueAtTime = func(t test.Tester) {
},
},
{
name: "after with staleness policy",
name: "after within staleness policy",
input: input{
year: 1984,
month: 3,
@ -148,6 +159,19 @@ var testGetValueAtTime = func(t test.Tester) {
hour: 0,
staleness: time.Duration(365*24) * time.Hour,
},
output: &output{
value: 0,
},
},
{
name: "after outside staleness policy",
input: input{
year: 1984,
month: 4,
day: 7,
hour: 0,
staleness: time.Duration(7*24) * time.Hour,
},
},
},
},
@ -251,6 +275,19 @@ var testGetValueAtTime = func(t test.Tester) {
hour: 12,
staleness: time.Duration(365*24) * time.Hour,
},
output: &output{
value: 1,
},
},
{
name: "after second without staleness policy",
input: input{
year: 1985,
month: 9,
day: 28,
hour: 12,
staleness: time.Duration(0),
},
},
{
name: "middle without staleness policy",
@ -412,7 +449,7 @@ var testGetValueAtTime = func(t test.Tester) {
},
},
{
name: "after third with staleness policy",
name: "after third within staleness policy",
input: input{
year: 1986,
month: 9,
@ -420,6 +457,29 @@ var testGetValueAtTime = func(t test.Tester) {
hour: 12,
staleness: time.Duration(365*24) * time.Hour,
},
output: &output{
value: 2,
},
},
{
name: "after third outside staleness policy",
input: input{
year: 1986,
month: 9,
day: 28,
hour: 12,
staleness: time.Duration(1*24) * time.Hour,
},
},
{
name: "after third without staleness policy",
input: input{
year: 1986,
month: 9,
day: 28,
hour: 12,
staleness: time.Duration(0),
},
},
{
name: "first middle without staleness policy",
@ -494,7 +554,8 @@ var testGetValueAtTime = func(t test.Tester) {
for i, context := range contexts {
// Wrapping in function to enable garbage collection of resources.
func() {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
name := fmt.Sprintf("test_get_value_at_time_%d", i)
temporaryDirectory, _ := ioutil.TempDir("", name)
defer func() {
if err := os.RemoveAll(temporaryDirectory); err != nil {
@ -663,19 +724,61 @@ var testGetBoundaryValues = func(t test.Tester) {
},
},
{
name: "non-existent interval with staleness policy",
name: "non-existent interval after within staleness policy",
input: input{
openYear: 1984,
openMonth: 3,
openDay: 30,
openDay: 31,
openHour: 0,
endYear: 1985,
endMonth: 3,
endDay: 30,
endHour: 0,
staleness: time.Duration(4380) * time.Hour,
},
},
{
name: "non-existent interval after without staleness policy",
input: input{
openYear: 1984,
openMonth: 3,
openDay: 31,
openHour: 0,
endYear: 1985,
endMonth: 3,
endDay: 30,
endHour: 0,
staleness: time.Duration(0),
},
},
{
name: "non-existent interval before with staleness policy",
input: input{
openYear: 1983,
openMonth: 3,
openDay: 30,
openHour: 0,
endYear: 1984,
endMonth: 3,
endDay: 29,
endHour: 0,
staleness: time.Duration(365*24) * time.Hour,
},
},
{
name: "non-existent interval before without staleness policy",
input: input{
openYear: 1983,
openMonth: 3,
openDay: 30,
openHour: 0,
endYear: 1984,
endMonth: 3,
endDay: 29,
endHour: 0,
staleness: time.Duration(0),
},
},
{
name: "on end but not start without staleness policy",
input: input{
@ -856,7 +959,7 @@ var testGetBoundaryValues = func(t test.Tester) {
endMonth: 6,
endDay: 29,
endHour: 6,
staleness: time.Duration(178*24) * time.Hour,
staleness: time.Duration(2190) * time.Hour,
},
},
{
@ -884,7 +987,7 @@ var testGetBoundaryValues = func(t test.Tester) {
endMonth: 6,
endDay: 29,
endHour: 6,
staleness: time.Duration(178*24) * time.Hour,
staleness: time.Duration(1) * time.Hour,
},
},
{
@ -900,6 +1003,10 @@ var testGetBoundaryValues = func(t test.Tester) {
endHour: 6,
staleness: time.Duration(356*24) * time.Hour,
},
output: &output{
open: 0,
end: 1,
},
},
},
},
@ -908,7 +1015,8 @@ var testGetBoundaryValues = func(t test.Tester) {
for i, context := range contexts {
// Wrapping in function to enable garbage collection of resources.
func() {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
name := fmt.Sprintf("test_get_boundary_values_%d", i)
temporaryDirectory, _ := ioutil.TempDir("", name)
defer func() {
if err := os.RemoveAll(temporaryDirectory); err != nil {
@ -942,7 +1050,7 @@ var testGetBoundaryValues = func(t test.Tester) {
input := behavior.input
open := time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC)
end := time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC)
i := model.Interval{
interval := model.Interval{
OldestInclusive: open,
NewestInclusive: end,
}
@ -950,30 +1058,30 @@ var testGetBoundaryValues = func(t test.Tester) {
DeltaAllowance: input.staleness,
}
openValue, endValue, err := persistence.GetBoundaryValues(&m, &i, &p)
openValue, endValue, err := persistence.GetBoundaryValues(&m, &interval, &p)
if err != nil {
t.Errorf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
}
if behavior.output == nil {
if openValue != nil {
t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, openValue)
t.Errorf("%d.%d(%s). Expected open to be nil but got: %q\n", i, j, behavior.name, openValue)
}
if endValue != nil {
t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, endValue)
t.Errorf("%d.%d(%s). Expected end to be nil but got: %q\n", i, j, behavior.name, endValue)
}
} else {
if openValue == nil {
t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output)
t.Errorf("%d.%d(%s). Expected open to be %s but got nil\n", i, j, behavior.name, behavior.output)
}
if endValue == nil {
t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output)
t.Errorf("%d.%d(%s). Expected end to be %s but got nil\n", i, j, behavior.name, behavior.output)
}
if openValue.Value != behavior.output.open {
t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value)
t.Errorf("%d.%d(%s). Expected open to be %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value)
}
if endValue.Value != behavior.output.end {
t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value)
t.Errorf("%d.%d(%s). Expected end to be %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value)
}
}
}
@ -1501,7 +1609,8 @@ var testGetRangeValues = func(t test.Tester) {
for i, context := range contexts {
// Wrapping in function to enable garbage collection of resources.
func() {
temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test")
name := fmt.Sprintf("test_get_range_values_%d", i)
temporaryDirectory, _ := ioutil.TempDir("", name)
defer func() {
if err := os.RemoveAll(temporaryDirectory); err != nil {