Handle multiple refs for the same series when WAL reading. (#623)

This can happen if a given series is created/truncated/recreated.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-06-06 14:28:54 +01:00 committed by GitHub
parent 731ed22ac8
commit 149c5dc73a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 10 deletions

28
head.go
View File

@ -313,7 +313,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}
}
func (h *Head) loadWAL(r *wal.Reader) error {
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
@ -322,10 +322,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []RefSample, n)
outputs = make([]chan []RefSample, n)
wg sync.WaitGroup
multiRefLock sync.Mutex
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []RefSample, n)
outputs = make([]chan []RefSample, n)
)
wg.Add(n)
@ -364,7 +365,14 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if !created {
// There's already a different ref for this series.
multiRefLock.Lock()
multiRef[s.Ref] = series.ref
multiRefLock.Unlock()
}
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
@ -399,6 +407,9 @@ func (h *Head) loadWAL(r *wal.Reader) error {
shards[i] = buf[:0]
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
@ -478,6 +489,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
multiRef := map[uint64]uint64{}
if err == nil {
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
@ -487,7 +499,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
startFrom++
@ -507,7 +519,7 @@ func (h *Head) Init(minValidTime int64) error {
}
sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr))
err = h.loadWAL(wal.NewReader(sr), multiRef)
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
continue

View File

@ -109,10 +109,13 @@ func TestHead_ReadWAL(t *testing.T) {
},
[]RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
// This series has two refs pointing to it.
{Ref: 101, Labels: labels.FromStrings("a", "3")},
},
[]RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
{Ref: 101, T: 101, V: 7},
},
[]Stone{
{ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}},
@ -133,7 +136,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, head.Init(math.MinInt64))
testutil.Equals(t, uint64(100), head.lastSeriesID)
testutil.Equals(t, uint64(101), head.lastSeriesID)
s10 := head.series.getByID(10)
s11 := head.series.getByID(11)
@ -156,7 +159,52 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
}
func TestHead_WALMultiRef(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_multi_ref")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
app := head.Appender()
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, head.Truncate(200))
app = head.Appender()
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
if ref1 == ref2 {
t.Fatal("Refs are the same")
}
testutil.Ok(t, head.Close())
w, err = wal.New(nil, nil, dir)
testutil.Ok(t, err)
head, err = NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
defer head.Close()
q, err := NewBlockQuerier(head, 0, 300)
testutil.Ok(t, err)
series := query(t, q, labels.NewEqualMatcher("foo", "bar"))
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series)
}
func TestHead_Truncate(t *testing.T) {