diff --git a/head.go b/head.go index 3ead78fc4..fcc18755f 100644 --- a/head.go +++ b/head.go @@ -190,6 +190,10 @@ func (h *Head) ReadWAL() error { r := h.wal.Reader() mint := h.MinTime() + // Track number of samples that referenced a series we don't know about + // for error reporting. + var unknownRefs int + seriesFunc := func(series []RefSeries) error { for _, s := range series { h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) @@ -207,7 +211,7 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint) + unknownRefs++ continue } _, chunkCreated := ms.append(s.T, s.V) @@ -230,6 +234,8 @@ func (h *Head) ReadWAL() error { return nil } + h.logger.Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { return errors.Wrap(err, "consume WAL") } @@ -267,12 +273,10 @@ func (h *Head) Truncate(mint int64) error { start = time.Now() - p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value) - if err != nil { - return err + keep := func(id uint64) bool { + return h.series.getByID(id) != nil } - - if err := h.wal.Truncate(mint, p); err == nil { + if err := h.wal.Truncate(mint, keep); err == nil { h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start)) } else { h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) diff --git a/wal.go b/wal.go index 27984ea0c..c52bca86d 100644 --- a/wal.go +++ b/wal.go @@ -71,7 +71,7 @@ type WAL interface { LogSeries([]RefSeries) error LogSamples([]RefSample) error LogDeletes([]Stone) error - Truncate(int64, Postings) error + Truncate(mint int64, keep func(uint64) bool) error Close() error } @@ -87,7 +87,7 @@ func (w nopWAL) Reader() WALReader { return w } func (nopWAL) LogSeries([]RefSeries) error { return nil } func (nopWAL) LogSamples([]RefSample) error { return nil } func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, Postings) error { return nil } +func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } func (nopWAL) Close() error { return nil } // WALReader reads entries from a WAL. @@ -272,8 +272,9 @@ func (w *SegmentWAL) putBuffer(b *encbuf) { w.buffers.Put(b) } -// Truncate deletes the values prior to mint and the series entries not in p. -func (w *SegmentWAL) Truncate(mint int64, p Postings) error { +// Truncate deletes the values prior to mint and the series which the keep function +// does not indiciate to preserve. +func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { // The last segment is always active. if len(w.files) < 2 { return nil @@ -314,7 +315,6 @@ func (w *SegmentWAL) Truncate(mint int64, p Postings) error { activeSeries = []RefSeries{} ) -Loop: for r.next() { rt, flag, byt := r.at() @@ -328,10 +328,7 @@ Loop: activeSeries = activeSeries[:0] for _, s := range series { - if !p.Seek(s.Ref) { - break Loop - } - if p.At() == s.Ref { + if keep(s.Ref) { activeSeries = append(activeSeries, s) } } diff --git a/wal_test.go b/wal_test.go index f36b22e90..b5a69ee0f 100644 --- a/wal_test.go +++ b/wal_test.go @@ -154,12 +154,16 @@ func TestSegmentWAL_Truncate(t *testing.T) { boundarySeries := w.files[len(w.files)/2].minSeries // We truncate while keeping every 2nd series. - keep := []uint64{} + keep := map[uint64]struct{}{} for i := 1; i <= numMetrics; i += 2 { - keep = append(keep, uint64(i)) + keep[uint64(i)] = struct{}{} + } + keepf := func(id uint64) bool { + _, ok := keep[id] + return ok } - err = w.Truncate(1000, newListPostings(keep)) + err = w.Truncate(1000, keepf) require.NoError(t, err) var expected []RefSeries @@ -172,7 +176,7 @@ func TestSegmentWAL_Truncate(t *testing.T) { // Call Truncate once again to see whether we can read the written file without // creating a new WAL. - err = w.Truncate(1000, newListPostings(keep)) + err = w.Truncate(1000, keepf) require.NoError(t, err) require.NoError(t, w.Close())