`MemPostings.PostingsForLabelMatching()`: don't hold the mutex while matching (#14286)
* MemPostings.PostingsForLabelMatching: let mutex go This changes the `MemPostings.PostingsForLabelMatching` implementation to stop holding the read mutex while matching the label values. We've seen that this method can be slow when the matcher is expensive, that's why we even added a context expiration check. However, there are critical process that might be waiting on this mutex: writes (adding new series) and compaction (deleting the garbage-collected ones), so we should avoid holding it for a long period of time. Given that we've copied the values to a slice anyway, there's no need to hold the lock while matching. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
2dc177d8af
commit
10a3c7220b
|
@ -425,16 +425,62 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
|
|||
}
|
||||
|
||||
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||
p.mtx.RLock()
|
||||
// We'll copy the values into a slice and then match over that,
|
||||
// this way we don't need to hold the mutex while we're matching,
|
||||
// which can be slow (seconds) if the match function is a huge regex.
|
||||
// Holding this lock prevents new series from being added (slows down the write path)
|
||||
// and blocks the compaction process.
|
||||
vals := p.labelValues(name)
|
||||
for i, count := 0, 1; i < len(vals); count++ {
|
||||
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||
return ErrPostings(ctx.Err())
|
||||
}
|
||||
|
||||
e := p.m[name]
|
||||
if len(e) == 0 {
|
||||
p.mtx.RUnlock()
|
||||
if match(vals[i]) {
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
// Didn't match, bring the last value to this position, make the slice shorter and check again.
|
||||
// The order of the slice doesn't matter as it comes from a map iteration.
|
||||
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
|
||||
}
|
||||
|
||||
// If none matched (or this label had no values), no need to grab the lock again.
|
||||
if len(vals) == 0 {
|
||||
return EmptyPostings()
|
||||
}
|
||||
|
||||
// Benchmarking shows that first copying the values into a slice and then matching over that is
|
||||
// faster than matching over the map keys directly, at least on AMD64.
|
||||
// Now `vals` only contains the values that matched, get their postings.
|
||||
its := make([]Postings, 0, len(vals))
|
||||
p.mtx.RLock()
|
||||
e := p.m[name]
|
||||
for _, v := range vals {
|
||||
if refs, ok := e[v]; ok {
|
||||
// Some of the values may have been garbage-collected in the meantime this is fine, we'll just skip them.
|
||||
// If we didn't let the mutex go, we'd have these postings here, but they would be pointing nowhere
|
||||
// because there would be a `MemPostings.Delete()` call waiting for the lock to delete these labels,
|
||||
// because the series were deleted already.
|
||||
its = append(its, NewListPostings(refs))
|
||||
}
|
||||
}
|
||||
// Let the mutex go before merging.
|
||||
p.mtx.RUnlock()
|
||||
|
||||
return Merge(ctx, its...)
|
||||
}
|
||||
|
||||
// labelValues returns a slice of label values for the given label name.
|
||||
// It will take the read lock.
|
||||
func (p *MemPostings) labelValues(name string) []string {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
|
||||
e := p.m[name]
|
||||
if len(e) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
vals := make([]string, 0, len(e))
|
||||
for v, srs := range e {
|
||||
if len(srs) > 0 {
|
||||
|
@ -442,21 +488,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
}
|
||||
}
|
||||
|
||||
var its []Postings
|
||||
count := 1
|
||||
for _, v := range vals {
|
||||
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
|
||||
p.mtx.RUnlock()
|
||||
return ErrPostings(ctx.Err())
|
||||
}
|
||||
count++
|
||||
if match(v) {
|
||||
its = append(its, NewListPostings(e[v]))
|
||||
}
|
||||
}
|
||||
p.mtx.RUnlock()
|
||||
|
||||
return Merge(ctx, its...)
|
||||
return vals
|
||||
}
|
||||
|
||||
// ExpandPostings returns the postings expanded as a slice.
|
||||
|
|
|
@ -1435,6 +1435,28 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
|
||||
mp := NewMemPostings()
|
||||
mp.Add(1, labels.FromStrings("foo", "1"))
|
||||
mp.Add(2, labels.FromStrings("foo", "2"))
|
||||
mp.Add(3, labels.FromStrings("foo", "3"))
|
||||
mp.Add(4, labels.FromStrings("foo", "4"))
|
||||
|
||||
isEven := func(v string) bool {
|
||||
iv, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return iv%2 == 0
|
||||
}
|
||||
|
||||
p := mp.PostingsForLabelMatching(context.Background(), "foo", isEven)
|
||||
require.NoError(t, p.Err())
|
||||
refs, err := ExpandPostings(p)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
|
||||
}
|
||||
|
||||
func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
|
||||
memP := NewMemPostings()
|
||||
seriesCount := 10 * checkContextEveryNIterations
|
||||
|
|
Loading…
Reference in New Issue