diff --git a/Makefile.INCLUDE b/Makefile.INCLUDE index ecb419549..cb83b97a0 100644 --- a/Makefile.INCLUDE +++ b/Makefile.INCLUDE @@ -37,7 +37,9 @@ FULL_GOPATH_BASE := $(FIRST_GOPATH)/src/github.com/prometheus export PREFIX=$(PWD)/build/root -export PATH := $(PREFIX)/bin:$(GOPATH)/bin:$(PATH) +export LOCAL_BINARIES=$(PREFIX)/bin + +export PATH := $(LOCAL_BINARIES):$(GOPATH)/bin:$(PATH) export LD_LIBRARY_PATH := $(PREFIX)/lib:$(LD_LIBRARY_PATH) export CFLAGS := $(CFLAGS) -I$(PREFIX)/include @@ -68,3 +70,5 @@ BUILDFLAGS := -ldflags \ -X main.leveldbVersion $(LEVELDB_VERSION)\ -X main.protobufVersion $(PROTOCOL_BUFFERS_VERSION)\ -X main.snappyVersion $(SNAPPY_VERSION)" + +PROTOC := $(LOCAL_BINARIES)/protoc diff --git a/config/Makefile b/config/Makefile index 741844423..7eca91f58 100644 --- a/config/Makefile +++ b/config/Makefile @@ -24,4 +24,4 @@ include ../Makefile.INCLUDE generated/config.pb.go: config.proto - protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto diff --git a/model/Makefile b/model/Makefile index 0c1f25fd1..c0a8c4426 100644 --- a/model/Makefile +++ b/model/Makefile @@ -22,8 +22,8 @@ include ../Makefile.INCLUDE # # make -C build goprotobuf-protoc-gen-go-stamp -generated/data.pb.go: data.proto - protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto +generated/data.pb.go: data.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto generated/descriptor.blob: data.proto - protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto diff --git a/model/data.proto b/model/data.proto index e92fdec3b..6be584836 100644 --- a/model/data.proto +++ b/model/data.proto @@ -122,3 +122,8 @@ message CurationValue { // fingerprint. optional int64 last_completion_timestamp = 1; } + +// DeletionProcessorDefinition models a curation process across the sample +// corpus that deletes old values. +message DeletionProcessorDefinition { +} diff --git a/model/generated/data.pb.go b/model/generated/data.pb.go index d1ca868f5..a2ae871bd 100644 --- a/model/generated/data.pb.go +++ b/model/generated/data.pb.go @@ -295,5 +295,13 @@ func (m *CurationValue) GetLastCompletionTimestamp() int64 { return 0 } +type DeletionProcessorDefinition struct { + XXX_unrecognized []byte `json:"-"` +} + +func (m *DeletionProcessorDefinition) Reset() { *m = DeletionProcessorDefinition{} } +func (m *DeletionProcessorDefinition) String() string { return proto.CompactTextString(m) } +func (*DeletionProcessorDefinition) ProtoMessage() {} + func init() { } diff --git a/model/generated/descriptor.blob b/model/generated/descriptor.blob index 4c98e3802..044e1c72b 100644 Binary files a/model/generated/descriptor.blob and b/model/generated/descriptor.blob differ diff --git a/model/metric.go b/model/metric.go index 9105f05a4..12c5094da 100644 --- a/model/metric.go +++ b/model/metric.go @@ -186,24 +186,15 @@ func (v Values) InsideInterval(t time.Time) (s bool) { // TruncateBefore returns a subslice of the original such that extraneous // samples in the collection that occur before the provided time are -// dropped. The original slice is not mutated. -func (v Values) TruncateBefore(t time.Time) (values Values) { +// dropped. The original slice is not mutated +func (v Values) TruncateBefore(t time.Time) Values { index := sort.Search(len(v), func(i int) bool { timestamp := v[i].Timestamp return !timestamp.Before(t) }) - switch index { - case 0: - values = v - case len(v): - values = v[len(v)-1:] - default: - values = v[index-1:] - } - - return + return v[index:] } func (v Values) ToDTO() (out *dto.SampleValueSeries) { diff --git a/model/metric_test.go b/model/metric_test.go index 15484eecf..41f9f5af7 100644 --- a/model/metric_test.go +++ b/model/metric_test.go @@ -79,7 +79,7 @@ func BenchmarkMetric(b *testing.B) { } } -func testValues(t test.Tester) { +func testTruncateBefore(t test.Tester) { type in struct { values Values time time.Time @@ -165,10 +165,6 @@ func testValues(t test.Tester) { }, }, out: Values{ - { - Value: 1, - Timestamp: instant.Add(time.Second), - }, { Value: 2, Timestamp: instant.Add(2 * time.Second), @@ -209,13 +205,7 @@ func testValues(t test.Tester) { }, }, }, - out: Values{ - // Preserve the last value in case it needs to be used for the next set. - { - Value: 4, - Timestamp: instant.Add(4 * time.Second), - }, - }, + out: Values{}, }, } @@ -234,6 +224,6 @@ func testValues(t test.Tester) { } } -func TestValues(t *testing.T) { - testValues(t) +func TestTruncateBefore(t *testing.T) { + testTruncateBefore(t) } diff --git a/rules/testdata.go b/rules/testdata.go index c1c43719b..6a4f4780b 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -143,7 +143,7 @@ var testMatrix = ast.Matrix{ { Metric: model.Metric{ model.MetricNameLabel: "x", - "y": "testvalue", + "y": "testvalue", }, Values: getTestValueStream(0, 100, 10), }, diff --git a/storage/metric/processor.go b/storage/metric/processor.go index dc7881ec0..64a18bdb8 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -41,7 +41,7 @@ type Processor interface { // // Upon completion or error, the last time at which the processor finished // shall be emitted in addition to any errors. - Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) + Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) } // CompactionProcessor combines sparse values in the database together such @@ -80,10 +80,10 @@ func (p *CompactionProcessor) Signature() (out []byte, err error) { } func (p CompactionProcessor) String() string { - return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize) + return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize) } -func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { +func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { var pendingBatch raw.Batch = nil defer func() { @@ -137,7 +137,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw. // commit to disk and delete the batch. A new one will be recreated if // necessary. case pendingMutations >= p.MaximumMutationPoolBatch: - err = samples.Commit(pendingBatch) + err = samplesPersistence.Commit(pendingBatch) if err != nil { return } @@ -223,7 +223,136 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw. // This is not deferred due to the off-chance that a pre-existing commit // failed. if pendingBatch != nil && pendingMutations > 0 { - err = samples.Commit(pendingBatch) + err = samplesPersistence.Commit(pendingBatch) + if err != nil { + return + } + } + + return +} + +// DeletionProcessor deletes sample blocks older than a defined value. +type DeletionProcessor struct { + // MaximumMutationPoolBatch represents approximately the largest pending + // batch of mutation operations for the database before pausing to + // commit before resumption. + MaximumMutationPoolBatch int + // signature is the byte representation of the DeletionProcessor's settings, + // used for purely memoization purposes across an instance. + signature []byte +} + +func (p DeletionProcessor) Name() string { + return "io.prometheus.DeletionProcessorDefinition" +} + +func (p *DeletionProcessor) Signature() (out []byte, err error) { + if len(p.signature) == 0 { + out, err = proto.Marshal(&dto.DeletionProcessorDefinition{}) + + p.signature = out + } + + out = p.signature + + return +} + +func (p DeletionProcessor) String() string { + return "deletionProcessor" +} + +func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { + var pendingBatch raw.Batch = nil + + defer func() { + if pendingBatch != nil { + pendingBatch.Close() + } + }() + + sampleKey, err := extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err := extractSampleValues(sampleIterator) + if err != nil { + return + } + + pendingMutations := 0 + + for lastCurated.Before(stopAt) { + switch { + // Furnish a new pending batch operation if none is available. + case pendingBatch == nil: + pendingBatch = leveldb.NewBatch() + + // If there are no sample values to extract from the datastore, let's + // continue extracting more values to use. We know that the time.Before() + // block would prevent us from going into unsafe territory. + case len(sampleValues) == 0: + if !sampleIterator.Next() { + return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") + } + + sampleKey, err = extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err = extractSampleValues(sampleIterator) + if err != nil { + return + } + + // If the number of pending mutations exceeds the allowed batch amount, + // commit to disk and delete the batch. A new one will be recreated if + // necessary. + case pendingMutations >= p.MaximumMutationPoolBatch: + err = samplesPersistence.Commit(pendingBatch) + if err != nil { + return + } + + pendingMutations = 0 + + pendingBatch.Close() + pendingBatch = nil + + case !sampleKey.MayContain(stopAt): + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + lastCurated = sampleKey.LastTimestamp + sampleValues = model.Values{} + pendingMutations++ + + case sampleKey.MayContain(stopAt): + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + pendingMutations++ + + sampleValues = sampleValues.TruncateBefore(stopAt) + if len(sampleValues) > 0 { + sampleKey = sampleValues.ToSampleKey(fingerprint) + lastCurated = sampleKey.FirstTimestamp + newKey := coding.NewProtocolBuffer(sampleKey.ToDTO()) + newValue := coding.NewProtocolBuffer(sampleValues.ToDTO()) + pendingBatch.Put(newKey, newValue) + pendingMutations++ + } else { + lastCurated = sampleKey.LastTimestamp + } + + default: + err = fmt.Errorf("Unhandled processing case.") + } + } + + // This is not deferred due to the off-chance that a pre-existing commit + // failed. + if pendingBatch != nil && pendingMutations > 0 { + err = samplesPersistence.Commit(pendingBatch) if err != nil { return } diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index f4e33ae4c..e534f0ee3 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -880,11 +880,529 @@ func TestCuratorCompactionProcessor(t *testing.T) { err = proto.Unmarshal(iterator.Key(), curationKeyDto) if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } err = proto.Unmarshal(iterator.Value(), curationValueDto) if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) + } + + curationKey := model.NewCurationKeyFromDTO(curationKeyDto) + actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto) + signature, err := expected.processor.Signature() + if err != nil { + t.Fatal(err) + } + + actualKey := curationKey + expectedKey := model.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint), + IgnoreYoungerThan: expected.ignoreYoungerThan, + ProcessorMessageRaw: signature, + ProcessorMessageTypeName: expected.processor.Name(), + } + if !actualKey.Equal(expectedKey) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) + } + expectedCurationRemark := model.CurationRemark{ + LastCompletionTimestamp: expected.lastCurated, + } + if !actualCurationRemark.Equal(expectedCurationRemark) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark) + } + } + + iterator = samples.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.sampleGroups { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected) + } + } + + sampleKey, err := extractSampleKey(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + sampleValues, err := extractSampleValues(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + + if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) { + t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint) + } + + if int(sampleKey.SampleCount) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount) + } + + if len(sampleValues) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues)) + } + + if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp) + } + + for k, actualValue := range sampleValues { + if expected.values[k].Value != actualValue.Value { + t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) + } + if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { + t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) + } + } + + if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) { + fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value) + t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp) + } + } + } +} + +func TestCuratorDeletionProcessor(t *testing.T) { + scenarios := []struct { + in in + out out + }{ + { + in: in{ + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + ignoreYoungerThan: 1 * time.Hour, + groupSize: 5, + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + curationState{ + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-2-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 90, + }, + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 30, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: 15, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 89 * time.Minute), + Value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 88 * time.Minute), + Value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 87 * time.Minute), + Value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 86 * time.Minute), + Value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 84 * time.Minute), + Value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 83 * time.Minute), + Value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 82 * time.Minute), + Value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 81 * time.Minute), + Value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 79 * time.Minute), + Value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 78 * time.Minute), + Value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 77 * time.Minute), + Value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 76 * time.Minute), + Value: 14, + }, + { + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 15, + }, + { + Timestamp: testInstant.Add(-1 * 74 * time.Minute), + Value: 16, + }, + { + Timestamp: testInstant.Add(-1 * 73 * time.Minute), + Value: 17, + }, + { + Timestamp: testInstant.Add(-1 * 72 * time.Minute), + Value: 18, + }, + { + Timestamp: testInstant.Add(-1 * 71 * time.Minute), + Value: 19, + }, + { + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 69 * time.Minute), + Value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 68 * time.Minute), + Value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 67 * time.Minute), + Value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 66 * time.Minute), + Value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 64 * time.Minute), + Value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 63 * time.Minute), + Value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 62 * time.Minute), + Value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 61 * time.Minute), + Value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + out: out{ + curationStates: []curationState{ + { + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + { + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 60 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + }, + sampleGroups: []sampleGroup{ + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 30, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: 15, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) + defer curatorDirectory.Close() + + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) + defer watermarkDirectory.Close() + + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) + defer sampleDirectory.Close() + + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer curatorStates.Close() + + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer watermarkStates.Close() + + samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer samples.Close() + + updates := make(chan CurationState, 100) + defer close(updates) + + stop := make(chan bool) + defer close(stop) + + c := Curator{ + Stop: stop, + } + + err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) + if err != nil { + t.Fatal(err) + } + + iterator := curatorStates.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.curationStates { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next.", i, j) + } + } + + curationKeyDto := &dto.CurationKey{} + curationValueDto := &dto.CurationValue{} + + err = proto.Unmarshal(iterator.Key(), curationKeyDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) + } + err = proto.Unmarshal(iterator.Value(), curationValueDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } curationKey := model.NewCurationKeyFromDTO(curationKeyDto) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 909a3d6ce..5e848e9b7 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -27,6 +27,32 @@ import ( "time" ) +type chunk model.Values + +// TruncateBefore returns a subslice of the original such that extraneous +// samples in the collection that occur before the provided time are +// dropped. The original slice is not mutated. It works with the assumption +// that consumers of these values could want preceding values if none would +// exist prior to the defined time. +func (c chunk) TruncateBefore(t time.Time) chunk { + index := sort.Search(len(c), func(i int) bool { + timestamp := c[i].Timestamp + + return !timestamp.Before(t) + }) + + switch index { + case 0: + return c + case len(c): + return c[len(c)-1:] + default: + return c[index-1:] + } + + panic("unreachable") +} + // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { @@ -383,7 +409,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // Load data value chunk(s) around the first standing op's current time. targetTime := *standingOps[0].CurrentTime() - chunk := model.Values{} + currentChunk := chunk{} memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) // If we aimed before the oldest value in memory, load more data from disk. if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { @@ -397,22 +423,22 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // If we aimed past the newest value on disk, combine it with the next value from memory. if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { latestDiskValue := diskValues[len(diskValues)-1:] - chunk = append(latestDiskValue, memValues...) + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) } else { - chunk = diskValues + currentChunk = chunk(diskValues) } } else { - chunk = memValues + currentChunk = chunk(memValues) } // There's no data at all for this fingerprint, so stop processing ops for it. - if len(chunk) == 0 { + if len(currentChunk) == 0 { break } - chunk = chunk.TruncateBefore(targetTime) + currentChunk = currentChunk.TruncateBefore(targetTime) - lastChunkTime := chunk[len(chunk)-1].Timestamp + lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp if lastChunkTime.After(targetTime) { targetTime = lastChunkTime } @@ -424,10 +450,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) { break } - chunk = chunk.TruncateBefore(*(op.CurrentTime())) + currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime())) for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { - out = op.ExtractSamples(chunk) + out = op.ExtractSamples(model.Values(currentChunk)) } } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 721b9400a..b8cf2ee2b 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -568,3 +568,162 @@ func TestGetFingerprintsForLabelSet(t *testing.T) { } } } + +func testTruncateBefore(t test.Tester) { + type in struct { + values model.Values + time time.Time + } + instant := time.Now() + var scenarios = []struct { + in in + out model.Values + }{ + { + in: in{ + time: instant, + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(2 * time.Second), + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(5 * time.Second), + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + // Preserve the last value in case it needs to be used for the next set. + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + } + + for i, scenario := range scenarios { + actual := chunk(scenario.in.values).TruncateBefore(scenario.in.time) + + if len(actual) != len(scenario.out) { + t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual)) + } + + for j, actualValue := range actual { + if !actualValue.Equal(scenario.out[j]) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue) + } + } + } +} + +func TestTruncateBefore(t *testing.T) { + testTruncateBefore(t) +}