From eb461a707df5a625cf0988d6fb2f1f11eb6234a9 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Wed, 23 Oct 2013 01:06:49 +0200 Subject: [PATCH] Add chunk sanity checking to dumper tool. Also, move codecs/filters to common location so they can be used in subsequent test. Change-Id: I3ffeb09188b8f4552e42683cbc9279645f45b32e --- storage/metric/leveldb.go | 31 +++++++++++++++++++++++++++++++ tools/dumper/main.go | 39 ++++++++------------------------------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index f74ca8f70..603778cef 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -662,3 +662,34 @@ func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { l.MetricSamples.State(), } } + +type MetricSamplesDecoder struct {} + +func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { + key := &dto.SampleKey{} + err := proto.Unmarshal(in.([]byte), key) + if err != nil { + return nil, err + } + + sampleKey := &SampleKey{} + sampleKey.Load(key) + + return sampleKey, nil +} + +func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { + values := &dto.SampleValueSeries{} + err := proto.Unmarshal(in.([]byte), values) + if err != nil { + return nil, err + } + + return NewValuesFromDTO(values), nil +} + +type AcceptAllFilter struct {} + +func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult { + return storage.ACCEPT +} diff --git a/tools/dumper/main.go b/tools/dumper/main.go index 0768a3d89..9fde15f48 100644 --- a/tools/dumper/main.go +++ b/tools/dumper/main.go @@ -25,53 +25,30 @@ import ( "os" "strconv" - "code.google.com/p/goprotobuf/proto" "github.com/golang/glog" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" - - dto "github.com/prometheus/prometheus/model/generated" ) var ( storageRoot = flag.String("storage.root", "", "The path to the storage root for Prometheus.") + dieOnBadChunk = flag.Bool("dieOnBadChunk", false, "Whether to die upon encountering a bad chunk.") ) type SamplesDumper struct { *csv.Writer } -func (d *SamplesDumper) DecodeKey(in interface{}) (interface{}, error) { - key := &dto.SampleKey{} - err := proto.Unmarshal(in.([]byte), key) - if err != nil { - return nil, err - } - - sampleKey := &metric.SampleKey{} - sampleKey.Load(key) - - return sampleKey, nil -} - -func (d *SamplesDumper) DecodeValue(in interface{}) (interface{}, error) { - values := &dto.SampleValueSeries{} - err := proto.Unmarshal(in.([]byte), values) - if err != nil { - return nil, err - } - - return metric.NewValuesFromDTO(values), nil -} - -func (d *SamplesDumper) Filter(_, _ interface{}) storage.FilterResult { - return storage.ACCEPT -} - func (d *SamplesDumper) Operate(key, value interface{}) *storage.OperatorError { sampleKey := key.(*metric.SampleKey) + if *dieOnBadChunk && sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { + glog.Fatalf("Chunk: First time (%v) after last time (%v): %v\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) + } for i, sample := range value.(metric.Values) { + if *dieOnBadChunk && (sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp)) { + glog.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)\n", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) + } d.Write([]string{ sampleKey.Fingerprint.String(), strconv.FormatInt(sampleKey.FirstTimestamp.Unix(), 10), @@ -108,7 +85,7 @@ func main() { csv.NewWriter(os.Stdout), } - entire, err := persistence.MetricSamples.ForEach(dumper, dumper, dumper) + entire, err := persistence.MetricSamples.ForEach(&metric.MetricSamplesDecoder{}, &metric.AcceptAllFilter{}, dumper) if err != nil { glog.Fatal("Error dumping samples: ", err) }