tsdb: fix races around head chunks (#6985)
* tsdb: fix races around head chunks Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
ef138a11c8
commit
0f9e78bd88
|
@ -1116,7 +1116,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
series := h.series.getByID(p.At())
|
series := h.series.getByID(p.At())
|
||||||
|
|
||||||
|
series.RLock()
|
||||||
t0, t1 := series.minTime(), series.maxTime()
|
t0, t1 := series.minTime(), series.maxTime()
|
||||||
|
series.RUnlock()
|
||||||
if t0 == math.MinInt64 || t1 == math.MinInt64 {
|
if t0 == math.MinInt64 || t1 == math.MinInt64 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1424,9 +1426,11 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting
|
||||||
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
s.RLock()
|
||||||
if s.minTime() <= h.maxt && s.maxTime() >= h.mint {
|
if s.minTime() <= h.maxt && s.maxTime() >= h.mint {
|
||||||
filtered = append(filtered, p.At())
|
filtered = append(filtered, p.At())
|
||||||
}
|
}
|
||||||
|
s.RUnlock()
|
||||||
}
|
}
|
||||||
if p.Err() != nil {
|
if p.Err() != nil {
|
||||||
return nil, p.Err()
|
return nil, p.Err()
|
||||||
|
@ -1733,7 +1737,7 @@ func (s sample) V() float64 {
|
||||||
// memSeries is the in-memory representation of a series. None of its methods
|
// memSeries is the in-memory representation of a series. None of its methods
|
||||||
// are goroutine safe and it is the caller's responsibility to lock it.
|
// are goroutine safe and it is the caller's responsibility to lock it.
|
||||||
type memSeries struct {
|
type memSeries struct {
|
||||||
sync.Mutex
|
sync.RWMutex
|
||||||
|
|
||||||
ref uint64
|
ref uint64
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -1336,6 +1337,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) {
|
||||||
h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize)
|
h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
testutil.Ok(t, h.Init(0))
|
||||||
app := h.Appender()
|
app := h.Appender()
|
||||||
|
|
||||||
s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0)
|
s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0)
|
||||||
|
@ -1594,3 +1596,42 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||||
testutil.Assert(t, ok, "Series append failed.")
|
testutil.Assert(t, ok, "Series append failed.")
|
||||||
testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
|
testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadSeriesChunkRace(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
testHeadSeriesChunkRace(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHeadSeriesChunkRace(t *testing.T) {
|
||||||
|
h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
testutil.Ok(t, h.Init(0))
|
||||||
|
app := h.Appender()
|
||||||
|
|
||||||
|
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
for ts := int64(6); ts < 11; ts++ {
|
||||||
|
err = app.AddFast(s2, ts, 0)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
}
|
||||||
|
testutil.Ok(t, app.Commit())
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
|
||||||
|
q, err := NewBlockQuerier(h, 18, 22)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
h.updateMinMaxTime(20, 25)
|
||||||
|
h.gc()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
ss, _, err := q.Select(nil, matcher)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, ss.Err())
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue