diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 0a708d40a..c5b4efac9 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/cli" "github.com/prometheus/prometheus/version" ) @@ -184,6 +185,19 @@ func checkRules(t cli.Term, filename string) (int, error) { return len(rules), nil } +// DumpHeadsCmd dumps metadata of a heads.db file. +func DumpHeadsCmd(t cli.Term, args ...string) int { + if len(args) != 1 { + t.Infof("usage: promtool dump-heads ") + return 2 + } + if err := local.DumpHeads(args[0], t.Out()); err != nil { + t.Errorf(" FAILED: %s", err) + return 1 + } + return 0 +} + var versionInfoTmpl = ` prometheus, version {{.version}} (branch: {{.branch}}, revision: {{.revision}}) build user: {{.buildUser}} @@ -199,7 +213,7 @@ func VersionCmd(t cli.Term, _ ...string) int { if err := tmpl.ExecuteTemplate(&buf, "version", version.Map); err != nil { panic(err) } - t.Out(strings.TrimSpace(buf.String())) + fmt.Fprintln(t.Out(), strings.TrimSpace(buf.String())) return 0 } @@ -216,6 +230,11 @@ func main() { Run: CheckRulesCmd, }) + app.Register("dump-heads", &cli.Command{ + Desc: "dump metadata of a heads.db checkpoint file", + Run: DumpHeadsCmd, + }) + app.Register("version", &cli.Command{ Desc: "print the version of this binary", Run: VersionCmd, diff --git a/storage/local/heads.go b/storage/local/heads.go new file mode 100644 index 000000000..49a142fbd --- /dev/null +++ b/storage/local/heads.go @@ -0,0 +1,242 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/storage/local/codable" +) + +const ( + headsFileName = "heads.db" + headsTempFileName = "heads.db.tmp" + headsFormatVersion = 2 + headsFormatLegacyVersion = 1 // Can read, but will never write. + headsMagicString = "PrometheusHeads" +) + +// headsScanner is a scanner to read time series with their heads from a +// heads.db file. It follows a similar semantics as the bufio.Scanner. +// It is not safe to use a headsScanner concurrently. +type headsScanner struct { + f *os.File + r *bufio.Reader + fp model.Fingerprint // Read after each scan() call that has returned true. + series *memorySeries // Read after each scan() call that has returned true. + version int64 // Read after newHeadsScanner has returned. + seriesTotal uint64 // Read after newHeadsScanner has returned. + seriesCurrent uint64 + chunksToPersistTotal int64 // Read after scan() has returned false. + err error // Read after scan() has returned false. +} + +func newHeadsScanner(filename string) *headsScanner { + hs := &headsScanner{} + defer func() { + if hs.f != nil && hs.err != nil { + hs.f.Close() + } + }() + + if hs.f, hs.err = os.Open(filename); hs.err != nil { + return hs + } + hs.r = bufio.NewReaderSize(hs.f, fileBufSize) + + buf := make([]byte, len(headsMagicString)) + if _, hs.err = io.ReadFull(hs.r, buf); hs.err != nil { + return hs + } + magic := string(buf) + if magic != headsMagicString { + hs.err = fmt.Errorf( + "unexpected magic string, want %q, got %q", + headsMagicString, magic, + ) + return hs + } + hs.version, hs.err = binary.ReadVarint(hs.r) + if (hs.version != headsFormatVersion && hs.version != headsFormatLegacyVersion) || hs.err != nil { + hs.err = fmt.Errorf( + "unknown or unreadable heads format version, want %d, got %d, error: %s", + headsFormatVersion, hs.version, hs.err, + ) + return hs + } + if hs.seriesTotal, hs.err = codable.DecodeUint64(hs.r); hs.err != nil { + return hs + } + return hs +} + +// scan works like bufio.Scanner.Scan. +func (hs *headsScanner) scan() bool { + if hs.seriesCurrent == hs.seriesTotal || hs.err != nil { + return false + } + + var ( + seriesFlags byte + fpAsInt uint64 + metric codable.Metric + persistWatermark int64 + modTimeNano int64 + modTime time.Time + chunkDescsOffset int64 + savedFirstTime int64 + numChunkDescs int64 + firstTime int64 + lastTime int64 + encoding byte + ) + if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { + return false + } + headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 + if fpAsInt, hs.err = codable.DecodeUint64(hs.r); hs.err != nil { + return false + } + hs.fp = model.Fingerprint(fpAsInt) + + if hs.err = metric.UnmarshalFromReader(hs.r); hs.err != nil { + return false + } + if hs.version != headsFormatLegacyVersion { + // persistWatermark only present in v2. + persistWatermark, hs.err = binary.ReadVarint(hs.r) + if hs.err != nil { + return false + } + modTimeNano, hs.err = binary.ReadVarint(hs.r) + if hs.err != nil { + return false + } + if modTimeNano != -1 { + modTime = time.Unix(0, modTimeNano) + } + } + if chunkDescsOffset, hs.err = binary.ReadVarint(hs.r); hs.err != nil { + return false + } + if savedFirstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { + return false + } + + if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { + return false + } + chunkDescs := make([]*chunkDesc, numChunkDescs) + if hs.version == headsFormatLegacyVersion { + if headChunkPersisted { + persistWatermark = numChunkDescs + } else { + persistWatermark = numChunkDescs - 1 + } + } + headChunkClosed := true // Initial assumption. + for i := int64(0); i < numChunkDescs; i++ { + if i < persistWatermark { + if firstTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { + return false + } + if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { + return false + } + chunkDescs[i] = &chunkDesc{ + chunkFirstTime: model.Time(firstTime), + chunkLastTime: model.Time(lastTime), + } + numMemChunkDescs.Inc() + } else { + // Non-persisted chunk. + // If there are non-persisted chunks at all, we consider + // the head chunk not to be closed yet. + headChunkClosed = false + if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { + return false + } + chunk := newChunkForEncoding(chunkEncoding(encoding)) + if hs.err = chunk.unmarshal(hs.r); hs.err != nil { + return false + } + cd := newChunkDesc(chunk, chunk.firstTime()) + if i < numChunkDescs-1 { + // This is NOT the head chunk. So it's a chunk + // to be persisted, and we need to populate lastTime. + hs.chunksToPersistTotal++ + cd.maybePopulateLastTime() + } + chunkDescs[i] = cd + } + } + + hs.series = &memorySeries{ + metric: model.Metric(metric), + chunkDescs: chunkDescs, + persistWatermark: int(persistWatermark), + modTime: modTime, + chunkDescsOffset: int(chunkDescsOffset), + savedFirstTime: model.Time(savedFirstTime), + lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), + headChunkClosed: headChunkClosed, + } + hs.seriesCurrent++ + return true +} + +// close closes the underlying file if required. +func (hs *headsScanner) close() { + if hs.f != nil { + hs.f.Close() + } +} + +// DumpHeads writes the metadata of the provided heads file in a human-readable +// form. +func DumpHeads(filename string, out io.Writer) error { + hs := newHeadsScanner(filename) + defer hs.close() + + if hs.err == nil { + fmt.Fprintf( + out, + ">>> Dumping %d series from heads file %q with format version %d. <<<\n", + hs.seriesTotal, filename, hs.version, + ) + } + for hs.scan() { + s := hs.series + fmt.Fprintf( + out, + "FP=%v\tMETRIC=%s\tlen(chunkDescs)=%d\tpersistWatermark=%d\tchunkDescOffset=%d\tsavedFirstTime=%v\tlastTime=%v\theadChunkClosed=%t\n", + hs.fp, s.metric, len(s.chunkDescs), s.persistWatermark, s.chunkDescsOffset, s.savedFirstTime, s.lastTime, s.headChunkClosed, + ) + } + if hs.err == nil { + fmt.Fprintf( + out, + ">>> Dump complete. %d chunks to persist. <<<\n", + hs.chunksToPersistTotal, + ) + } + return hs.err +} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 22c16eccd..725673cb1 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -47,12 +47,6 @@ const ( seriesTempFileSuffix = ".db.tmp" seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. - headsFileName = "heads.db" - headsTempFileName = "heads.db.tmp" - headsFormatVersion = 2 - headsFormatLegacyVersion = 1 // Can read, but will never write. - headsMagicString = "PrometheusHeads" - mappingsFileName = "mappings.db" mappingsTempFileName = "mappings.db.tmp" mappingsFormatVersion = 1 @@ -699,190 +693,36 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap // start-up while nothing else is running in storage land. This method is // utterly goroutine-unsafe. func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) { - var chunkDescsTotal int64 fingerprintToSeries := make(map[model.Fingerprint]*memorySeries) sm = &seriesMap{m: fingerprintToSeries} defer func() { - if sm != nil && p.dirty { + if p.dirty { log.Warn("Persistence layer appears dirty.") err = p.recoverFromCrash(fingerprintToSeries) if err != nil { sm = nil } } - if err == nil { - numMemChunkDescs.Add(float64(chunkDescsTotal)) - } }() - f, err := os.Open(p.headsFileName()) - if os.IsNotExist(err) { + hs := newHeadsScanner(p.headsFileName()) + defer hs.close() + for hs.scan() { + fingerprintToSeries[hs.fp] = hs.series + } + if os.IsNotExist(hs.err) { return sm, 0, nil } - if err != nil { - log.Warn("Could not open heads file:", err) + if hs.err != nil { p.dirty = true - return + log. + With("file", p.headsFileName()). + With("error", hs.err). + Error("Error reading heads file.") + return sm, 0, hs.err } - defer f.Close() - r := bufio.NewReaderSize(f, fileBufSize) - - buf := make([]byte, len(headsMagicString)) - if _, err := io.ReadFull(r, buf); err != nil { - log.Warn("Could not read from heads file:", err) - p.dirty = true - return sm, 0, nil - } - magic := string(buf) - if magic != headsMagicString { - log.Warnf( - "unexpected magic string, want %q, got %q", - headsMagicString, magic, - ) - p.dirty = true - return - } - version, err := binary.ReadVarint(r) - if (version != headsFormatVersion && version != headsFormatLegacyVersion) || err != nil { - log.Warnf("unknown heads format version, want %d", headsFormatVersion) - p.dirty = true - return sm, 0, nil - } - numSeries, err := codable.DecodeUint64(r) - if err != nil { - log.Warn("Could not decode number of series:", err) - p.dirty = true - return sm, 0, nil - } - - for ; numSeries > 0; numSeries-- { - seriesFlags, err := r.ReadByte() - if err != nil { - log.Warn("Could not read series flags:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 - fp, err := codable.DecodeUint64(r) - if err != nil { - log.Warn("Could not decode fingerprint:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - var metric codable.Metric - if err := metric.UnmarshalFromReader(r); err != nil { - log.Warn("Could not decode metric:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - var persistWatermark int64 - var modTime time.Time - if version != headsFormatLegacyVersion { - // persistWatermark only present in v2. - persistWatermark, err = binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode persist watermark:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - modTimeNano, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode modification time:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - if modTimeNano != -1 { - modTime = time.Unix(0, modTimeNano) - } - } - chunkDescsOffset, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode chunk descriptor offset:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - savedFirstTime, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode saved first time:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - numChunkDescs, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode number of chunk descriptors:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - chunkDescs := make([]*chunkDesc, numChunkDescs) - if version == headsFormatLegacyVersion { - if headChunkPersisted { - persistWatermark = numChunkDescs - } else { - persistWatermark = numChunkDescs - 1 - } - } - - headChunkClosed := true // Initial assumption. - for i := int64(0); i < numChunkDescs; i++ { - if i < persistWatermark { - firstTime, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode first time:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - lastTime, err := binary.ReadVarint(r) - if err != nil { - log.Warn("Could not decode last time:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - chunkDescs[i] = &chunkDesc{ - chunkFirstTime: model.Time(firstTime), - chunkLastTime: model.Time(lastTime), - } - chunkDescsTotal++ - } else { - // Non-persisted chunk. - // If there are non-persisted chunks at all, we consider - // the head chunk not to be closed yet. - headChunkClosed = false - encoding, err := r.ReadByte() - if err != nil { - log.Warn("Could not decode chunk type:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - chunk := newChunkForEncoding(chunkEncoding(encoding)) - if err := chunk.unmarshal(r); err != nil { - log.Warn("Could not decode chunk:", err) - p.dirty = true - return sm, chunksToPersist, nil - } - cd := newChunkDesc(chunk, chunk.firstTime()) - if i < numChunkDescs-1 { - // This is NOT the head chunk. So it's a chunk - // to be persisted, and we need to populate lastTime. - chunksToPersist++ - cd.maybePopulateLastTime() - } - chunkDescs[i] = cd - } - } - - fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ - metric: model.Metric(metric), - chunkDescs: chunkDescs, - persistWatermark: int(persistWatermark), - modTime: modTime, - chunkDescsOffset: int(chunkDescsOffset), - savedFirstTime: model.Time(savedFirstTime), - lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), - headChunkClosed: headChunkClosed, - } - } - return sm, chunksToPersist, nil + return sm, hs.chunksToPersistTotal, nil } // dropAndPersistChunks deletes all chunks from a series file whose last sample diff --git a/util/cli/cli.go b/util/cli/cli.go index 065dfafe3..5df6dce63 100644 --- a/util/cli/cli.go +++ b/util/cli/cli.go @@ -32,7 +32,8 @@ type Command struct { type Term interface { Infof(format string, v ...interface{}) Errorf(format string, v ...interface{}) - Out(format string) + Out() io.Writer + Err() io.Writer } type basicTerm struct { @@ -52,9 +53,13 @@ func (t *basicTerm) Errorf(format string, v ...interface{}) { } // Out implements Term. -func (t *basicTerm) Out(msg string) { - fmt.Fprint(t.out, msg) - fmt.Fprint(t.out, "\n") +func (t *basicTerm) Out() io.Writer { + return t.out +} + +// Err implements Term. +func (t *basicTerm) Err() io.Writer { + return t.err } // BasicTerm returns a Term writing Infof and Errorf to err and Out to out.