Merge pull request #14983 from machine424/dopp

fix(storage/mergeQuerier): fix a data race
This commit is contained in:
Ayoub Mrini 2024-10-25 18:34:51 +02:00 committed by GitHub
commit 93db81dd3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 40 additions and 3 deletions

View File

@ -153,13 +153,18 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
// copy the matchers as some queriers may alter the slice.
// See https://github.com/prometheus/prometheus/issues/14723
matchersCopy := make([]*labels.Matcher, len(matchers))
copy(matchersCopy, matchers)
wg.Add(1)
go func(qr genericQuerier) {
go func(qr genericQuerier, m []*labels.Matcher) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr.Select(ctx, true, hints, matchers...)
}(querier)
seriesSetChan <- qr.Select(ctx, true, hints, m...)
}(querier, matchersCopy)
}
go func() {
wg.Wait()

View File

@ -3787,3 +3787,35 @@ func (m mockReaderOfLabels) Series(storage.SeriesRef, *labels.ScratchBuilder, *[
func (m mockReaderOfLabels) Symbols() index.StringIter {
panic("Series called")
}
// TestMergeQuerierConcurrentSelectMatchers reproduces the data race bug from
// https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case)
// alters the passed matchers.
func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) {
block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil)
require.NoError(t, err)
defer func() {
require.NoError(t, block.Close())
}()
p, err := NewBlockQuerier(block, 0, 1)
require.NoError(t, err)
// A secondary querier is required to enable concurrent select; a blockQuerier is used for simplicity.
s, err := NewBlockQuerier(block, 0, 1)
require.NoError(t, err)
originalMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "baz", ".*"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
matchers := append([]*labels.Matcher{}, originalMatchers...)
mergedQuerier := storage.NewMergeQuerier([]storage.Querier{p}, []storage.Querier{s}, storage.ChainedSeriesMerge)
defer func() {
require.NoError(t, mergedQuerier.Close())
}()
mergedQuerier.Select(context.Background(), false, nil, matchers...)
require.Equal(t, originalMatchers, matchers)
}