From 0f9e78bd88660129f030037c0ea59aae31e016da Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 16 Mar 2020 13:59:22 +0100 Subject: [PATCH] tsdb: fix races around head chunks (#6985) * tsdb: fix races around head chunks Signed-off-by: Julien Pivotto --- tsdb/head.go | 6 +++++- tsdb/head_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/tsdb/head.go b/tsdb/head.go index 197fbe287..d4b8a7997 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1116,7 +1116,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { for p.Next() { series := h.series.getByID(p.At()) + series.RLock() t0, t1 := series.minTime(), series.maxTime() + series.RUnlock() if t0 == math.MinInt64 || t1 == math.MinInt64 { continue } @@ -1424,9 +1426,11 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting level.Debug(h.head.logger).Log("msg", "looked up series not found") continue } + s.RLock() if s.minTime() <= h.maxt && s.maxTime() >= h.mint { filtered = append(filtered, p.At()) } + s.RUnlock() } if p.Err() != nil { return nil, p.Err() @@ -1733,7 +1737,7 @@ func (s sample) V() float64 { // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { - sync.Mutex + sync.RWMutex ref uint64 lset labels.Labels diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 967f86665..def09fe2c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "github.com/pkg/errors" @@ -1336,6 +1337,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() + testutil.Ok(t, h.Init(0)) app := h.Appender() s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0) @@ -1594,3 +1596,42 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { testutil.Assert(t, ok, "Series append failed.") testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } + +func TestHeadSeriesChunkRace(t *testing.T) { + for i := 0; i < 1000; i++ { + testHeadSeriesChunkRace(t) + } +} + +func testHeadSeriesChunkRace(t *testing.T) { + h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) + testutil.Ok(t, err) + defer h.Close() + testutil.Ok(t, h.Init(0)) + app := h.Appender() + + s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) + testutil.Ok(t, err) + for ts := int64(6); ts < 11; ts++ { + err = app.AddFast(s2, ts, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + var wg sync.WaitGroup + matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") + q, err := NewBlockQuerier(h, 18, 22) + testutil.Ok(t, err) + defer q.Close() + + wg.Add(1) + go func() { + h.updateMinMaxTime(20, 25) + h.gc() + wg.Done() + }() + ss, _, err := q.Select(nil, matcher) + testutil.Ok(t, err) + testutil.Ok(t, ss.Err()) + wg.Wait() +}