From 81222849bcf523fabfcf254f3afb4ef2bcd70488 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 6 Sep 2017 16:20:37 +0200 Subject: [PATCH] Filter WAL data in Head, misc fixes --- db.go | 3 +++ head.go | 29 ++++++++++++++++++++--------- wal.go | 39 +++++++++++++++++++++++++++------------ 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/db.go b/db.go index 724153bff..ad034d8b0 100644 --- a/db.go +++ b/db.go @@ -214,6 +214,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := db.reload(); err != nil { return nil, err } + if err := db.head.ReadWAL(); err != nil { + return nil, errors.Wrap(err, "read WAL") + } go db.run() diff --git a/head.go b/head.go index 807690605..db548b6c7 100644 --- a/head.go +++ b/head.go @@ -164,7 +164,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( l = log.NewNopLogger() } if wal == nil { - wal = NopWAL{} + wal = NopWAL() } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) @@ -173,7 +173,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( wal: wal, logger: l, chunkRange: chunkRange, - minTime: math.MaxInt64, + minTime: math.MinInt64, maxTime: math.MinInt64, series: newStripeSeries(), values: map[string]stringset{}, @@ -183,11 +183,12 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( } h.metrics = newHeadMetrics(h, r) - return h, h.readWAL() + return h, nil } -func (h *Head) readWAL() error { +func (h *Head) ReadWAL() error { r := h.wal.Reader() + mint := h.MinTime() seriesFunc := func(series []RefSeries) error { for _, s := range series { @@ -197,6 +198,9 @@ func (h *Head) readWAL() error { } samplesFunc := func(samples []RefSample) error { for _, s := range samples { + if s.T < mint { + continue + } ms := h.series.getByID(s.Ref) if ms == nil { return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) @@ -213,6 +217,9 @@ func (h *Head) readWAL() error { deletesFunc := func(stones []Stone) error { for _, s := range stones { for _, itv := range s.intervals { + if itv.Maxt < mint { + continue + } h.tombstones.add(s.ref, itv) } } @@ -226,12 +233,10 @@ func (h *Head) readWAL() error { return nil } -func (h *Head) String() string { - return "" -} - // Truncate removes all data before mint from the head block and truncates its WAL. func (h *Head) Truncate(mint int64) error { + initialize := h.MinTime() == math.MinInt64 + if mint%h.chunkRange != 0 { return errors.Errorf("truncating at %d not aligned", mint) } @@ -240,6 +245,12 @@ func (h *Head) Truncate(mint int64) error { } atomic.StoreInt64(&h.minTime, mint) + // This was an initial call to Truncate after loading blocks on startup. + // We haven't read back the WAL yet, so do not attempt to truncate it. + if initialize { + return nil + } + start := time.Now() h.gc() @@ -934,7 +945,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { rmChunks = 0 ) // Run through all series and truncate old chunks. Mark those with no - // chunks left as deleted and store their ID and hash. + // chunks left as deleted and store their ID. for i := 0; i < stripeSize; i++ { s.locks[i].Lock() diff --git a/wal.go b/wal.go index 073be2d7d..776569398 100644 --- a/wal.go +++ b/wal.go @@ -73,15 +73,19 @@ type WAL interface { } // NopWAL is a WAL that does nothing. -type NopWAL struct{} +func NopWAL() WAL { + return nopWAL{} +} -func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w NopWAL) Reader() WALReader { return w } -func (NopWAL) LogSeries([]RefSeries) error { return nil } -func (NopWAL) LogSamples([]RefSample) error { return nil } -func (NopWAL) LogDeletes([]Stone) error { return nil } -func (NopWAL) Truncate(int64, Postings) error { return nil } -func (NopWAL) Close() error { return nil } +type nopWAL struct{} + +func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } +func (w nopWAL) Reader() WALReader { return w } +func (nopWAL) LogSeries([]RefSeries) error { return nil } +func (nopWAL) LogSamples([]RefSample) error { return nil } +func (nopWAL) LogDeletes([]Stone) error { return nil } +func (nopWAL) Truncate(int64, Postings) error { return nil } +func (nopWAL) Close() error { return nil } // WALReader reads entries from a WAL. type WALReader interface { @@ -344,6 +348,13 @@ Loop: return errors.Wrap(r.Err(), "read candidate WAL files") } + off, err := csf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := csf.Truncate(off); err != nil { + return err + } if err := csf.Close(); err != nil { return errors.Wrap(err, "close tmp file") } @@ -351,15 +362,19 @@ Loop: if err := renameFile(csf.Name(), candidates[len(candidates)-1].Name()); err != nil { return err } - if err := w.dirFile.Sync(); err != nil { - return err - } - for _, f := range candidates[1:] { if err := os.RemoveAll(f.Name()); err != nil { return errors.Wrap(err, "delete WAL segment file") } } + if err := w.dirFile.Sync(); err != nil { + return err + } + + w.mtx.Lock() + w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...) + w.mtx.Unlock() + return nil }