Use boolean function instead of postings to drop WAL series
There is not guarantee or requirement for WAL writers to only add series entries in increasing order of IDs. A postings list cannot look back and thus unordered WAL entries would skip over IDs to not truncate from the WAL. We replace it with a simple boolean check function that does not require order.
This commit is contained in:
parent
5fa1c993b9
commit
1e88ba06b4
16
head.go
16
head.go
|
@ -190,6 +190,10 @@ func (h *Head) ReadWAL() error {
|
||||||
r := h.wal.Reader()
|
r := h.wal.Reader()
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
|
// Track number of samples that referenced a series we don't know about
|
||||||
|
// for error reporting.
|
||||||
|
var unknownRefs int
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
seriesFunc := func(series []RefSeries) error {
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
|
@ -207,7 +211,7 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint)
|
unknownRefs++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, chunkCreated := ms.append(s.T, s.V)
|
_, chunkCreated := ms.append(s.T, s.V)
|
||||||
|
@ -230,6 +234,8 @@ func (h *Head) ReadWAL() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
||||||
|
|
||||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
||||||
return errors.Wrap(err, "consume WAL")
|
return errors.Wrap(err, "consume WAL")
|
||||||
}
|
}
|
||||||
|
@ -267,12 +273,10 @@ func (h *Head) Truncate(mint int64) error {
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
|
||||||
p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value)
|
keep := func(id uint64) bool {
|
||||||
if err != nil {
|
return h.series.getByID(id) != nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
if err := h.wal.Truncate(mint, keep); err == nil {
|
||||||
if err := h.wal.Truncate(mint, p); err == nil {
|
|
||||||
h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start))
|
h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start))
|
||||||
} else {
|
} else {
|
||||||
h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
|
||||||
|
|
15
wal.go
15
wal.go
|
@ -71,7 +71,7 @@ type WAL interface {
|
||||||
LogSeries([]RefSeries) error
|
LogSeries([]RefSeries) error
|
||||||
LogSamples([]RefSample) error
|
LogSamples([]RefSample) error
|
||||||
LogDeletes([]Stone) error
|
LogDeletes([]Stone) error
|
||||||
Truncate(int64, Postings) error
|
Truncate(mint int64, keep func(uint64) bool) error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (w nopWAL) Reader() WALReader { return w }
|
||||||
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
func (nopWAL) LogSeries([]RefSeries) error { return nil }
|
||||||
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
func (nopWAL) LogSamples([]RefSample) error { return nil }
|
||||||
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
func (nopWAL) LogDeletes([]Stone) error { return nil }
|
||||||
func (nopWAL) Truncate(int64, Postings) error { return nil }
|
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
|
||||||
func (nopWAL) Close() error { return nil }
|
func (nopWAL) Close() error { return nil }
|
||||||
|
|
||||||
// WALReader reads entries from a WAL.
|
// WALReader reads entries from a WAL.
|
||||||
|
@ -272,8 +272,9 @@ func (w *SegmentWAL) putBuffer(b *encbuf) {
|
||||||
w.buffers.Put(b)
|
w.buffers.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Truncate deletes the values prior to mint and the series entries not in p.
|
// Truncate deletes the values prior to mint and the series which the keep function
|
||||||
func (w *SegmentWAL) Truncate(mint int64, p Postings) error {
|
// does not indiciate to preserve.
|
||||||
|
func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
||||||
// The last segment is always active.
|
// The last segment is always active.
|
||||||
if len(w.files) < 2 {
|
if len(w.files) < 2 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -314,7 +315,6 @@ func (w *SegmentWAL) Truncate(mint int64, p Postings) error {
|
||||||
activeSeries = []RefSeries{}
|
activeSeries = []RefSeries{}
|
||||||
)
|
)
|
||||||
|
|
||||||
Loop:
|
|
||||||
for r.next() {
|
for r.next() {
|
||||||
rt, flag, byt := r.at()
|
rt, flag, byt := r.at()
|
||||||
|
|
||||||
|
@ -328,10 +328,7 @@ Loop:
|
||||||
activeSeries = activeSeries[:0]
|
activeSeries = activeSeries[:0]
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
if !p.Seek(s.Ref) {
|
if keep(s.Ref) {
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
if p.At() == s.Ref {
|
|
||||||
activeSeries = append(activeSeries, s)
|
activeSeries = append(activeSeries, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
12
wal_test.go
12
wal_test.go
|
@ -154,12 +154,16 @@ func TestSegmentWAL_Truncate(t *testing.T) {
|
||||||
boundarySeries := w.files[len(w.files)/2].minSeries
|
boundarySeries := w.files[len(w.files)/2].minSeries
|
||||||
|
|
||||||
// We truncate while keeping every 2nd series.
|
// We truncate while keeping every 2nd series.
|
||||||
keep := []uint64{}
|
keep := map[uint64]struct{}{}
|
||||||
for i := 1; i <= numMetrics; i += 2 {
|
for i := 1; i <= numMetrics; i += 2 {
|
||||||
keep = append(keep, uint64(i))
|
keep[uint64(i)] = struct{}{}
|
||||||
|
}
|
||||||
|
keepf := func(id uint64) bool {
|
||||||
|
_, ok := keep[id]
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
err = w.Truncate(1000, newListPostings(keep))
|
err = w.Truncate(1000, keepf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var expected []RefSeries
|
var expected []RefSeries
|
||||||
|
@ -172,7 +176,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
|
||||||
|
|
||||||
// Call Truncate once again to see whether we can read the written file without
|
// Call Truncate once again to see whether we can read the written file without
|
||||||
// creating a new WAL.
|
// creating a new WAL.
|
||||||
err = w.Truncate(1000, newListPostings(keep))
|
err = w.Truncate(1000, keepf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, w.Close())
|
require.NoError(t, w.Close())
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue