diff --git a/tsdb/head.go b/tsdb/head.go index 3b1d4ad05..a21f5ad31 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1692,8 +1692,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.metrics.seriesCreated.Inc() atomic.AddUint64(&h.numSeries, 1) - h.postings.Add(id, lset) - h.symMtx.Lock() defer h.symMtx.Unlock() @@ -1709,6 +1707,10 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.symbols[l.Value] = struct{}{} } + // Postings should be set after setting the symbols (or after holding + // the symbol mtx) to avoid race during compaction of seeing partial symbols. + h.postings.Add(id, lset) + return s, true, nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6c5e46658..c2523c396 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1875,3 +1875,70 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { }) } } + +func TestHeadCompactionRace(t *testing.T) { + // There are still some races to be fixed. Hence skipping this test + // for now to not cause flaky CI failures. + t.Skip() + + for i := 0; i < 10; i++ { + t.Run(fmt.Sprintf("run %d", i), func(t *testing.T) { + tsdbCfg := &Options{ + RetentionDuration: 100000000, + NoLockfile: true, + MinBlockDuration: 1000000, + MaxBlockDuration: 1000000, + } + + db, closer := openTestDB(t, tsdbCfg, []int64{1000000}) + t.Cleanup(closer) + t.Cleanup(func() { + testutil.Ok(t, db.Close()) + }) + + head := db.Head() + + // Get past the init appender phase here. + app := head.Appender() + _, err := app.Add(labels.Labels{labels.Label{Name: "n", Value: "v"}}, 10, 10) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + wait := make(chan struct{}) + var wg sync.WaitGroup + + // Prepare to execute concurrent appends. + wg.Add(100) + for i := 0; i < 100; i++ { + go func(idx int) { + defer wg.Done() + app := head.Appender() + <-wait + + for j := 0; j < 100; j++ { + // After compaction this will return out of bound, so this is a best effort append. + app.Add(labels.Labels{labels.Label{ + Name: fmt.Sprintf("n%d", idx*100+j), + Value: fmt.Sprintf("v%d", idx*100+j), + }}, 1000, 10) + } + + testutil.Ok(t, app.Commit()) + }(i) + } + + // Prepare for head compaction. + wg.Add(1) + go func() { + defer wg.Done() + <-wait + testutil.Ok(t, db.CompactHead(NewRangeHead(head, 0, 10000000))) + }() + + // Run concurrent appends and compaction. + close(wait) + + wg.Wait() + }) + } +}