Include deletion processor for long-tail values.

This commit extracts the model.Values truncation behavior into the actual
tiered storage, which uses it and behaves in a peculiar way—notably the
retention of previous elements if the chunk were to ever go empty.  This is
done to enable interpolation between sparse sample values in the evaluation
cycle.  Nothing necessarily new here—just an extraction.

Now, the model.Values TruncateBefore functionality would do what a user
would expect without any surprises, which is required for the
DeletionProcessor, which may decide to split a large chunk in two if it
determines that the chunk contains the cut-off time.
This commit is contained in:
Matt T. Proud 2013-05-08 20:39:59 +02:00
parent 8d062ebb27
commit 161c8fbf9b
13 changed files with 878 additions and 48 deletions

View File

@ -37,7 +37,9 @@ FULL_GOPATH_BASE := $(FIRST_GOPATH)/src/github.com/prometheus
export PREFIX=$(PWD)/build/root
export PATH := $(PREFIX)/bin:$(GOPATH)/bin:$(PATH)
export LOCAL_BINARIES=$(PREFIX)/bin
export PATH := $(LOCAL_BINARIES):$(GOPATH)/bin:$(PATH)
export LD_LIBRARY_PATH := $(PREFIX)/lib:$(LD_LIBRARY_PATH)
export CFLAGS := $(CFLAGS) -I$(PREFIX)/include
@ -68,3 +70,5 @@ BUILDFLAGS := -ldflags \
-X main.leveldbVersion $(LEVELDB_VERSION)\
-X main.protobufVersion $(PROTOCOL_BUFFERS_VERSION)\
-X main.snappyVersion $(SNAPPY_VERSION)"
PROTOC := $(LOCAL_BINARIES)/protoc

View File

@ -24,4 +24,4 @@ include ../Makefile.INCLUDE
generated/config.pb.go: config.proto
protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto
$(PROTOC) --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto

View File

@ -22,8 +22,8 @@ include ../Makefile.INCLUDE
#
# make -C build goprotobuf-protoc-gen-go-stamp
generated/data.pb.go: data.proto
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
generated/data.pb.go: data.proto
$(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
generated/descriptor.blob: data.proto
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
$(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto

View File

@ -122,3 +122,8 @@ message CurationValue {
// fingerprint.
optional int64 last_completion_timestamp = 1;
}
// DeletionProcessorDefinition models a curation process across the sample
// corpus that deletes old values.
message DeletionProcessorDefinition {
}

View File

@ -295,5 +295,13 @@ func (m *CurationValue) GetLastCompletionTimestamp() int64 {
return 0
}
type DeletionProcessorDefinition struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *DeletionProcessorDefinition) Reset() { *m = DeletionProcessorDefinition{} }
func (m *DeletionProcessorDefinition) String() string { return proto.CompactTextString(m) }
func (*DeletionProcessorDefinition) ProtoMessage() {}
func init() {
}

Binary file not shown.

View File

@ -186,24 +186,15 @@ func (v Values) InsideInterval(t time.Time) (s bool) {
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated.
func (v Values) TruncateBefore(t time.Time) (values Values) {
// dropped. The original slice is not mutated
func (v Values) TruncateBefore(t time.Time) Values {
index := sort.Search(len(v), func(i int) bool {
timestamp := v[i].Timestamp
return !timestamp.Before(t)
})
switch index {
case 0:
values = v
case len(v):
values = v[len(v)-1:]
default:
values = v[index-1:]
}
return
return v[index:]
}
func (v Values) ToDTO() (out *dto.SampleValueSeries) {

View File

@ -79,7 +79,7 @@ func BenchmarkMetric(b *testing.B) {
}
}
func testValues(t test.Tester) {
func testTruncateBefore(t test.Tester) {
type in struct {
values Values
time time.Time
@ -165,10 +165,6 @@ func testValues(t test.Tester) {
},
},
out: Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
@ -209,13 +205,7 @@ func testValues(t test.Tester) {
},
},
},
out: Values{
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
out: Values{},
},
}
@ -234,6 +224,6 @@ func testValues(t test.Tester) {
}
}
func TestValues(t *testing.T) {
testValues(t)
func TestTruncateBefore(t *testing.T) {
testTruncateBefore(t)
}

View File

@ -143,7 +143,7 @@ var testMatrix = ast.Matrix{
{
Metric: model.Metric{
model.MetricNameLabel: "x",
"y": "testvalue",
"y": "testvalue",
},
Values: getTestValueStream(0, 100, 10),
},

View File

@ -41,7 +41,7 @@ type Processor interface {
//
// Upon completion or error, the last time at which the processor finished
// shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
}
// CompactionProcessor combines sparse values in the database together such
@ -80,10 +80,10 @@ func (p *CompactionProcessor) Signature() (out []byte, err error) {
}
func (p CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize)
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize)
}
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {
@ -137,7 +137,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
err = samples.Commit(pendingBatch)
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}
@ -223,7 +223,136 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.
// This is not deferred due to the off-chance that a pre-existing commit
// failed.
if pendingBatch != nil && pendingMutations > 0 {
err = samples.Commit(pendingBatch)
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}
}
return
}
// DeletionProcessor deletes sample blocks older than a defined value.
type DeletionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
MaximumMutationPoolBatch int
// signature is the byte representation of the DeletionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
}
func (p DeletionProcessor) Name() string {
return "io.prometheus.DeletionProcessorDefinition"
}
func (p *DeletionProcessor) Signature() (out []byte, err error) {
if len(p.signature) == 0 {
out, err = proto.Marshal(&dto.DeletionProcessorDefinition{})
p.signature = out
}
out = p.signature
return
}
func (p DeletionProcessor) String() string {
return "deletionProcessor"
}
func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {
if pendingBatch != nil {
pendingBatch.Close()
}
}()
sampleKey, err := extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err := extractSampleValues(sampleIterator)
if err != nil {
return
}
pendingMutations := 0
for lastCurated.Before(stopAt) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:
pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
case len(sampleValues) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}
pendingMutations = 0
pendingBatch.Close()
pendingBatch = nil
case !sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
lastCurated = sampleKey.LastTimestamp
sampleValues = model.Values{}
pendingMutations++
case sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
pendingMutations++
sampleValues = sampleValues.TruncateBefore(stopAt)
if len(sampleValues) > 0 {
sampleKey = sampleValues.ToSampleKey(fingerprint)
lastCurated = sampleKey.FirstTimestamp
newKey := coding.NewProtocolBuffer(sampleKey.ToDTO())
newValue := coding.NewProtocolBuffer(sampleValues.ToDTO())
pendingBatch.Put(newKey, newValue)
pendingMutations++
} else {
lastCurated = sampleKey.LastTimestamp
}
default:
err = fmt.Errorf("Unhandled processing case.")
}
}
// This is not deferred due to the off-chance that a pre-existing commit
// failed.
if pendingBatch != nil && pendingMutations > 0 {
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}

View File

@ -880,11 +880,529 @@ func TestCuratorCompactionProcessor(t *testing.T) {
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)
actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto)
signature, err := expected.processor.Signature()
if err != nil {
t.Fatal(err)
}
actualKey := curationKey
expectedKey := model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint),
IgnoreYoungerThan: expected.ignoreYoungerThan,
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: expected.processor.Name(),
}
if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
}
expectedCurationRemark := model.CurationRemark{
LastCompletionTimestamp: expected.lastCurated,
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
}
}
iterator = samples.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.sampleGroups {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected)
}
}
sampleKey, err := extractSampleKey(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
sampleValues, err := extractSampleValues(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) {
t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint)
}
if int(sampleKey.SampleCount) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount)
}
if len(sampleValues) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues))
}
if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp)
}
for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
}
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
}
}
if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) {
fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value)
t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp)
}
}
}
}
func TestCuratorDeletionProcessor(t *testing.T) {
scenarios := []struct {
in in
out out
}{
{
in: in{
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
ignoreYoungerThan: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
curationState{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
curationState{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-2-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 90,
},
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 30,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
Value: 14,
},
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 15,
},
{
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
Value: 16,
},
{
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
Value: 17,
},
{
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
Value: 18,
},
{
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
Value: 19,
},
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 20,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
out: out{
curationStates: []curationState{
{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
},
sampleGroups: []sampleGroup{
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 30,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
},
}
for i, scenario := range scenarios {
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
defer curatorDirectory.Close()
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
defer watermarkDirectory.Close()
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer samples.Close()
updates := make(chan CurationState, 100)
defer close(updates)
stop := make(chan bool)
defer close(stop)
c := Curator{
Stop: stop,
}
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
if err != nil {
t.Fatal(err)
}
iterator := curatorStates.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next.", i, j)
}
}
curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)

View File

@ -27,6 +27,32 @@ import (
"time"
)
type chunk model.Values
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated. It works with the assumption
// that consumers of these values could want preceding values if none would
// exist prior to the defined time.
func (c chunk) TruncateBefore(t time.Time) chunk {
index := sort.Search(len(c), func(i int) bool {
timestamp := c[i].Timestamp
return !timestamp.Before(t)
})
switch index {
case 0:
return c
case len(c):
return c[len(c)-1:]
default:
return c[index-1:]
}
panic("unreachable")
}
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
@ -383,7 +409,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Load data value chunk(s) around the first standing op's current time.
targetTime := *standingOps[0].CurrentTime()
chunk := model.Values{}
currentChunk := chunk{}
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
@ -397,22 +423,22 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
chunk = append(latestDiskValue, memValues...)
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
chunk = diskValues
currentChunk = chunk(diskValues)
}
} else {
chunk = memValues
currentChunk = chunk(memValues)
}
// There's no data at all for this fingerprint, so stop processing ops for it.
if len(chunk) == 0 {
if len(currentChunk) == 0 {
break
}
chunk = chunk.TruncateBefore(targetTime)
currentChunk = currentChunk.TruncateBefore(targetTime)
lastChunkTime := chunk[len(chunk)-1].Timestamp
lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime
}
@ -424,10 +450,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
break
}
chunk = chunk.TruncateBefore(*(op.CurrentTime()))
currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(chunk)
out = op.ExtractSamples(model.Values(currentChunk))
}
}

View File

@ -568,3 +568,162 @@ func TestGetFingerprintsForLabelSet(t *testing.T) {
}
}
}
func testTruncateBefore(t test.Tester) {
type in struct {
values model.Values
time time.Time
}
instant := time.Now()
var scenarios = []struct {
in in
out model.Values
}{
{
in: in{
time: instant,
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(2 * time.Second),
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(5 * time.Second),
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
}
for i, scenario := range scenarios {
actual := chunk(scenario.in.values).TruncateBefore(scenario.in.time)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual))
}
for j, actualValue := range actual {
if !actualValue.Equal(scenario.out[j]) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue)
}
}
}
}
func TestTruncateBefore(t *testing.T) {
testTruncateBefore(t)
}