diff --git a/head.go b/head.go index 740eff07f..087e64d1b 100644 --- a/head.go +++ b/head.go @@ -313,7 +313,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader) error { +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -322,10 +322,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - inputs = make([]chan []RefSample, n) - outputs = make([]chan []RefSample, n) + wg sync.WaitGroup + multiRefLock sync.Mutex + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) @@ -364,7 +365,14 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if !created { + // There's already a different ref for this series. + multiRefLock.Lock() + multiRef[s.Ref] = series.ref + multiRefLock.Unlock() + } if h.lastSeriesID < s.Ref { h.lastSeriesID = s.Ref @@ -399,6 +407,9 @@ func (h *Head) loadWAL(r *wal.Reader) error { shards[i] = buf[:0] } for _, sam := range samples[:m] { + if r, ok := multiRef[sam.Ref]; ok { + sam.Ref = r + } mod := sam.Ref % uint64(n) shards[mod] = append(shards[mod], sam) } @@ -478,6 +489,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil && err != ErrNotFound { return errors.Wrap(err, "find last checkpoint") } + multiRef := map[uint64]uint64{} if err == nil { sr, err := wal.NewSegmentsReader(dir) if err != nil { @@ -487,7 +499,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wal.NewReader(sr)); err != nil { + if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil { return errors.Wrap(err, "backfill checkpoint") } startFrom++ @@ -507,7 +519,7 @@ func (h *Head) Init(minValidTime int64) error { } sr := wal.NewSegmentBufReader(s) - err = h.loadWAL(wal.NewReader(sr)) + err = h.loadWAL(wal.NewReader(sr), multiRef) sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. if err == nil { continue diff --git a/head_test.go b/head_test.go index 79092dd5c..8f5774bfc 100644 --- a/head_test.go +++ b/head_test.go @@ -109,10 +109,13 @@ func TestHead_ReadWAL(t *testing.T) { }, []RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, + // This series has two refs pointing to it. + {Ref: 101, Labels: labels.FromStrings("a", "3")}, }, []RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, + {Ref: 101, T: 101, V: 7}, }, []Stone{ {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, @@ -133,7 +136,7 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, head.Init(math.MinInt64)) - testutil.Equals(t, uint64(100), head.lastSeriesID) + testutil.Equals(t, uint64(101), head.lastSeriesID) s10 := head.series.getByID(10) s11 := head.series.getByID(11) @@ -156,7 +159,52 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) +} + +func TestHead_WALMultiRef(t *testing.T) { + dir, err := ioutil.TempDir("", "test_wal_multi_ref") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + + testutil.Ok(t, head.Init(0)) + app := head.Appender() + ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Ok(t, head.Truncate(200)) + + app = head.Appender() + ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + if ref1 == ref2 { + t.Fatal("Refs are the same") + } + testutil.Ok(t, head.Close()) + + w, err = wal.New(nil, nil, dir) + testutil.Ok(t, err) + + head, err = NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + testutil.Ok(t, head.Init(0)) + defer head.Close() + + q, err := NewBlockQuerier(head, 0, 300) + testutil.Ok(t, err) + series := query(t, q, labels.NewEqualMatcher("foo", "bar")) + testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series) } func TestHead_Truncate(t *testing.T) {