From b40865833d2f17289ee8bcdadebb4af29c9d352a Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 29 Aug 2023 11:03:27 +0200 Subject: [PATCH] PostingsForMatchers race with creating new series (#12558) Signed-off-by: Dimitar Dimitrov --- tsdb/querier.go | 46 +++++++++++++++++++++++------- tsdb/querier_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 10 deletions(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 965707547..ae09f4772 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -21,6 +21,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "golang.org/x/exp/slices" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -244,6 +245,41 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, labelMustBeSet[m.Name] = true } } + isSubtractingMatcher := func(m *labels.Matcher) bool { + if !labelMustBeSet[m.Name] { + return true + } + return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("") + } + hasSubtractingMatchers, hasIntersectingMatchers := false, false + for _, m := range ms { + if isSubtractingMatcher(m) { + hasSubtractingMatchers = true + } else { + hasIntersectingMatchers = true + } + } + + if hasSubtractingMatchers && !hasIntersectingMatchers { + // If there's nothing to subtract from, add in everything and remove the notIts later. + // We prefer to get AllPostings so that the base of subtraction (i.e. allPostings) + // doesn't include series that may be added to the index reader during this function call. + k, v := index.AllPostingsKey() + allPostings, err := ix.Postings(k, v) + if err != nil { + return nil, err + } + its = append(its, allPostings) + } + + // Sort matchers to have the intersecting matchers first. + // This way the base for subtraction is smaller and + // there is no chance that the set we subtract from + // contains postings of series that didn't exist when + // we constructed the set we subtract by. + slices.SortStableFunc(ms, func(i, j *labels.Matcher) bool { + return !isSubtractingMatcher(i) && isSubtractingMatcher(j) + }) for _, m := range ms { switch { @@ -312,16 +348,6 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, } } - // If there's nothing to subtract from, add in everything and remove the notIts later. - if len(its) == 0 && len(notIts) != 0 { - k, v := index.AllPostingsKey() - allPostings, err := ix.Postings(k, v) - if err != nil { - return nil, err - } - its = append(its, allPostings) - } - it := index.Intersect(its...) for _, n := range notIts { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 24005ab5f..2af0fd934 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -22,6 +22,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "time" @@ -2194,6 +2195,71 @@ func TestPostingsForMatchers(t *testing.T) { } } +// TestQuerierIndexQueriesRace tests the index queries with racing appends. +func TestQuerierIndexQueriesRace(t *testing.T) { + const testRepeats = 1000 + + testCases := []struct { + matchers []*labels.Matcher + }{ + { + matchers: []*labels.Matcher{ + // This matcher should involve the AllPostings posting list in calculating the posting lists. + labels.MustNewMatcher(labels.MatchNotEqual, labels.MetricName, "metric"), + }, + }, + { + matchers: []*labels.Matcher{ + // The first matcher should be effectively the same as AllPostings, because all series have always_0=0 + // If it is evaluated first, then __name__=metric will contain more series than always_0=0. + labels.MustNewMatcher(labels.MatchNotEqual, "always_0", "0"), + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric"), + }, + }, + } + + for _, c := range testCases { + c := c + t.Run(fmt.Sprintf("%v", c.matchers), func(t *testing.T) { + db := openTestDB(t, DefaultOptions(), nil) + h := db.Head() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go appendSeries(t, ctx, wg, h) + t.Cleanup(wg.Wait) + t.Cleanup(cancel) + + for i := 0; i < testRepeats; i++ { + q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + values, _, err := q.LabelValues("seq", c.matchers...) + require.NoError(t, err) + require.Emptyf(t, values, `label values for label "seq" should be empty`) + } + }) + } +} + +func appendSeries(t *testing.T, ctx context.Context, wg *sync.WaitGroup, h *Head) { + defer wg.Done() + + for i := 0; ctx.Err() != nil; i++ { + app := h.Appender(context.Background()) + _, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric", "seq", strconv.Itoa(i), "always_0", "0"), 0, 0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + + // Throttle down the appends to keep the test somewhat nimble. + time.Sleep(time.Millisecond) + } +} + // TestClose ensures that calling Close more than once doesn't block and doesn't panic. func TestClose(t *testing.T) { dir := t.TempDir()