From 6ea22f2bf909796e69a77f2ae6ff97e52540ab99 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 23 Oct 2013 00:28:38 +0200 Subject: [PATCH 1/2] Add compaction regression tests. This adds regression tests that catch the two error cases reported in https://github.com/prometheus/prometheus/issues/367 It also adds a commented-out test case for the crash in https://github.com/prometheus/prometheus/issues/368 but there's no fix for the latter crash yet. Change-Id: Idffefea4ed7cc281caae660bcad2e3c13ec3bd17 --- storage/metric/compaction_regression_test.go | 244 +++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 storage/metric/compaction_regression_test.go diff --git a/storage/metric/compaction_regression_test.go b/storage/metric/compaction_regression_test.go new file mode 100644 index 000000000..4a2fe93ae --- /dev/null +++ b/storage/metric/compaction_regression_test.go @@ -0,0 +1,244 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "fmt" + "testing" + "flag" + "time" + + "github.com/prometheus/prometheus/storage" + + clientmodel "github.com/prometheus/client_golang/model" +) + +type nopCurationStateUpdater struct{} +func (n *nopCurationStateUpdater) UpdateCurationState(*CurationState) {} + +func generateTestSamples(endTime time.Time, numTs int, samplesPerTs int, interval time.Duration) clientmodel.Samples { + samples := clientmodel.Samples{} + + startTime := endTime.Add(-interval * time.Duration(samplesPerTs-1)) + for ts := 0; ts < numTs; ts++ { + metric := clientmodel.Metric{} + metric["name"] = clientmodel.LabelValue(fmt.Sprintf("metric_%d", ts)) + for i := 0; i < samplesPerTs; i++ { + sample := &clientmodel.Sample{ + Metric: metric, + Value: clientmodel.SampleValue(ts + 1000 * i), + Timestamp: startTime.Add(interval * time.Duration(i)), + } + samples = append(samples, sample) + } + } + return samples +} + +type compactionChecker struct { + t *testing.T + sampleIdx int + numChunks int + expectedSamples clientmodel.Samples +} + +func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorError { + c.numChunks++ + sampleKey := key.(*SampleKey) + if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { + c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) + } + for _, sample := range value.(Values) { + if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) { + c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) + } + + expected := c.expectedSamples[c.sampleIdx] + + fp := &clientmodel.Fingerprint{} + fp.LoadFromMetric(expected.Metric) + if !sampleKey.Fingerprint.Equal(fp) { + c.t.Fatalf("%d. Expected fingerprint %s, got %s", c.sampleIdx, fp, sampleKey.Fingerprint) + } + + sp := &SamplePair{ + Value: expected.Value, + Timestamp: expected.Timestamp, + } + if !sample.Equal(sp) { + c.t.Fatalf("%d. Expected sample %s, got %s", c.sampleIdx, sp, sample) + } + c.sampleIdx++ + } + return nil +} + + +func checkStorageSaneAndEquivalent(t *testing.T, name string, ts *TieredStorage, samples clientmodel.Samples, expectedNumChunks int) { + cc := &compactionChecker{ + expectedSamples: samples, + t: t, + } + entire, err := ts.DiskStorage.MetricSamples.ForEach(&MetricSamplesDecoder{}, &AcceptAllFilter{}, cc) + if err != nil { + t.Fatalf("%s: Error dumping samples: %s", name, err) + } + if !entire { + t.Fatalf("%s: Didn't scan entire corpus", name) + } + if cc.numChunks != expectedNumChunks { + t.Fatalf("%s: Expected %d chunks, got %d", name, expectedNumChunks, cc.numChunks) + } +} + +type compactionTestScenario struct { + leveldbChunkSize int + numTimeseries int + samplesPerTs int + + ignoreYoungerThan time.Duration + maximumMutationPoolBatch int + minimumGroupSize int + + uncompactedChunks int + compactedChunks int +} + +func (s compactionTestScenario) run(t *testing.T) { + flag.Set("leveldbChunkSize", fmt.Sprintf("%d", s.leveldbChunkSize)) + defer flag.Set("leveldbChunkSize", "200") + + ts, closer := NewTestTieredStorage(t) + defer closer.Close() + + // 1. Store test values. + samples := generateTestSamples(testInstant, s.numTimeseries, s.samplesPerTs, time.Minute) + ts.AppendSamples(samples) + ts.Flush() + + // 2. Check sanity of uncompacted values. + checkStorageSaneAndEquivalent(t, "Before compaction", ts, samples, s.uncompactedChunks) + + // 3. Compact test storage. + processor := NewCompactionProcessor(&CompactionProcessorOptions{ + MaximumMutationPoolBatch: s.maximumMutationPoolBatch, + MinimumGroupSize: s.minimumGroupSize, + }) + defer processor.Close() + + curator := NewCurator(&CuratorOptions{ + Stop: make(chan bool), + ViewQueue: ts.ViewQueue, + }) + defer curator.Close() + + fmt.Println("test instant:", testInstant) + err := curator.Run(s.ignoreYoungerThan, testInstant, processor, ts.DiskStorage.CurationRemarks, ts.DiskStorage.MetricSamples, ts.DiskStorage.MetricHighWatermarks, &nopCurationStateUpdater{}) + if err != nil { + t.Fatalf("Failed to run curator: %s", err) + } + + // 4. Check sanity of compacted values. + checkStorageSaneAndEquivalent(t, "After compaction", ts, samples, s.compactedChunks) +} + +func TestCompaction(t *testing.T) { + scenarios := []compactionTestScenario{ + // BEFORE COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 5 | A | 1 .. 5 + // 5 | A | 6 .. 10 + // 5 | A | 11 .. 15 + // 5 | B | 1 .. 5 + // 5 | B | 6 .. 10 + // 5 | B | 11 .. 15 + // 5 | C | 1 .. 5 + // 5 | C | 6 .. 10 + // 5 | C | 11 .. 15 + // + // AFTER COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 10 | A | 1 .. 10 + // 5 | A | 11 .. 15 + // 10 | B | 1 .. 10 + // 5 | B | 11 .. 15 + // 10 | C | 1 .. 10 + // 5 | C | 11 .. 15 + { + leveldbChunkSize: 5, + numTimeseries: 3, + samplesPerTs: 15, + + ignoreYoungerThan: time.Minute, + maximumMutationPoolBatch: 30, + minimumGroupSize: 10, + + uncompactedChunks: 9, + compactedChunks: 6, + }, + // BEFORE COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 5 | A | 1 .. 5 + // 5 | A | 6 .. 10 + // 5 | A | 11 .. 15 + // 5 | B | 1 .. 5 + // 5 | B | 6 .. 10 + // 5 | B | 11 .. 15 + // 5 | C | 1 .. 5 + // 5 | C | 6 .. 10 + // 5 | C | 11 .. 15 + // + // AFTER COMPACTION: + // + // Chunk size | Fingerprint | Samples + // 10 | A | 1 .. 15 + // 10 | B | 1 .. 15 + // 10 | C | 1 .. 15 + { + leveldbChunkSize: 5, + numTimeseries: 3, + samplesPerTs: 15, + + ignoreYoungerThan: time.Minute, + maximumMutationPoolBatch: 30, + minimumGroupSize: 30, + + uncompactedChunks: 9, + compactedChunks: 3, + }, + // BUG: This case crashes! See: + // https://github.com/prometheus/prometheus/issues/368 + // Fix this! + // + //{ + // leveldbChunkSize: 5, + // numTimeseries: 3, + // samplesPerTs: 20, + + // ignoreYoungerThan: time.Minute, + // maximumMutationPoolBatch: 30, + // minimumGroupSize: 10, + + // uncompactedChunks: 12, + // compactedChunks: 9, + //}, + } + + for _, s := range scenarios { + s.run(t) + } +} From eb461a707df5a625cf0988d6fb2f1f11eb6234a9 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 23 Oct 2013 01:06:49 +0200 Subject: [PATCH 2/2] Add chunk sanity checking to dumper tool. Also, move codecs/filters to common location so they can be used in subsequent test. Change-Id: I3ffeb09188b8f4552e42683cbc9279645f45b32e --- storage/metric/leveldb.go | 31 +++++++++++++++++++++++++++++++ tools/dumper/main.go | 39 ++++++++------------------------------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index f74ca8f70..603778cef 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -662,3 +662,34 @@ func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { l.MetricSamples.State(), } } + +type MetricSamplesDecoder struct {} + +func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { + key := &dto.SampleKey{} + err := proto.Unmarshal(in.([]byte), key) + if err != nil { + return nil, err + } + + sampleKey := &SampleKey{} + sampleKey.Load(key) + + return sampleKey, nil +} + +func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { + values := &dto.SampleValueSeries{} + err := proto.Unmarshal(in.([]byte), values) + if err != nil { + return nil, err + } + + return NewValuesFromDTO(values), nil +} + +type AcceptAllFilter struct {} + +func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult { + return storage.ACCEPT +} diff --git a/tools/dumper/main.go b/tools/dumper/main.go index 0768a3d89..9fde15f48 100644 --- a/tools/dumper/main.go +++ b/tools/dumper/main.go @@ -25,53 +25,30 @@ import ( "os" "strconv" - "code.google.com/p/goprotobuf/proto" "github.com/golang/glog" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" - - dto "github.com/prometheus/prometheus/model/generated" ) var ( storageRoot = flag.String("storage.root", "", "The path to the storage root for Prometheus.") + dieOnBadChunk = flag.Bool("dieOnBadChunk", false, "Whether to die upon encountering a bad chunk.") ) type SamplesDumper struct { *csv.Writer } -func (d *SamplesDumper) DecodeKey(in interface{}) (interface{}, error) { - key := &dto.SampleKey{} - err := proto.Unmarshal(in.([]byte), key) - if err != nil { - return nil, err - } - - sampleKey := &metric.SampleKey{} - sampleKey.Load(key) - - return sampleKey, nil -} - -func (d *SamplesDumper) DecodeValue(in interface{}) (interface{}, error) { - values := &dto.SampleValueSeries{} - err := proto.Unmarshal(in.([]byte), values) - if err != nil { - return nil, err - } - - return metric.NewValuesFromDTO(values), nil -} - -func (d *SamplesDumper) Filter(_, _ interface{}) storage.FilterResult { - return storage.ACCEPT -} - func (d *SamplesDumper) Operate(key, value interface{}) *storage.OperatorError { sampleKey := key.(*metric.SampleKey) + if *dieOnBadChunk && sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { + glog.Fatalf("Chunk: First time (%v) after last time (%v): %v\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) + } for i, sample := range value.(metric.Values) { + if *dieOnBadChunk && (sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp)) { + glog.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) + } d.Write([]string{ sampleKey.Fingerprint.String(), strconv.FormatInt(sampleKey.FirstTimestamp.Unix(), 10), @@ -108,7 +85,7 @@ func main() { csv.NewWriter(os.Stdout), } - entire, err := persistence.MetricSamples.ForEach(dumper, dumper, dumper) + entire, err := persistence.MetricSamples.ForEach(&metric.MetricSamplesDecoder{}, &metric.AcceptAllFilter{}, dumper) if err != nil { glog.Fatal("Error dumping samples: ", err) }