mirror of
https://github.com/prometheus/prometheus
synced 2025-03-05 13:08:30 +00:00
Ensure postings are always sorted
IDs for new series are handed out before the postings are locked. Thus series are not indexed in order of their IDs, which could result in only partially sorted postings list. Iterating over those silently skipped elements as the sort invariant was violated.
This commit is contained in:
parent
f39388c9af
commit
6ee254e353
@ -457,7 +457,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||||||
|
|
||||||
indexr := b.Index()
|
indexr := b.Index()
|
||||||
|
|
||||||
all, err := indexr.Postings("", "")
|
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
6
head.go
6
head.go
@ -207,7 +207,7 @@ func (h *Head) ReadWAL() error {
|
|||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, chunkCreated := ms.append(s.T, s.V)
|
_, chunkCreated := ms.append(s.T, s.V)
|
||||||
@ -267,7 +267,7 @@ func (h *Head) Truncate(mint int64) error {
|
|||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
|
||||||
p, err := h.indexRange(mint, math.MaxInt64).Postings("", "")
|
p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1038,8 +1038,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
|||||||
return prev, false
|
return prev, false
|
||||||
}
|
}
|
||||||
s.hashes[i].set(hash, series)
|
s.hashes[i].set(hash, series)
|
||||||
|
|
||||||
s.hashes[i][hash] = append(s.hashes[i][hash], series)
|
|
||||||
s.locks[i].Unlock()
|
s.locks[i].Unlock()
|
||||||
|
|
||||||
i = series.ref & stripeMask
|
i = series.ref & stripeMask
|
||||||
|
29
postings.go
29
postings.go
@ -45,7 +45,7 @@ func (p *memPostings) get(name, value string) Postings {
|
|||||||
return newListPostings(l)
|
return newListPostings(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
var allLabel = labels.Label{}
|
var allPostingsKey = labels.Label{}
|
||||||
|
|
||||||
// add adds a document to the index. The caller has to ensure that no
|
// add adds a document to the index. The caller has to ensure that no
|
||||||
// term argument appears twice.
|
// term argument appears twice.
|
||||||
@ -53,13 +53,36 @@ func (p *memPostings) add(id uint64, lset labels.Labels) {
|
|||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
p.m[l] = append(p.m[l], id)
|
p.addFor(id, l)
|
||||||
}
|
}
|
||||||
p.m[allLabel] = append(p.m[allLabel], id)
|
p.addFor(id, allPostingsKey)
|
||||||
|
|
||||||
p.mtx.Unlock()
|
p.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *memPostings) addFor(id uint64, l labels.Label) {
|
||||||
|
list := append(p.m[l], id)
|
||||||
|
p.m[l] = list
|
||||||
|
|
||||||
|
// There is no guarantee that no higher ID was inserted before as they may
|
||||||
|
// be generated independently before adding them to postings.
|
||||||
|
// We repair order violations on insert. The invariant is that the first n-1
|
||||||
|
// items in the list are already sorted.
|
||||||
|
for i := len(list) - 1; i >= 1; i-- {
|
||||||
|
if list[i] >= list[i-1] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
list[i], list[i-1] = list[i-1], list[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func expandPostings(p Postings) (res []uint64, err error) {
|
||||||
|
for p.Next() {
|
||||||
|
res = append(res, p.At())
|
||||||
|
}
|
||||||
|
return res, p.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// Postings provides iterative access over a postings list.
|
// Postings provides iterative access over a postings list.
|
||||||
type Postings interface {
|
type Postings interface {
|
||||||
// Next advances the iterator and returns true if another value was found.
|
// Next advances the iterator and returns true if another value was found.
|
||||||
|
@ -21,6 +21,15 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMemPostings_addFor(t *testing.T) {
|
||||||
|
p := newMemPostings()
|
||||||
|
p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8}
|
||||||
|
|
||||||
|
p.addFor(5, allPostingsKey)
|
||||||
|
|
||||||
|
require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey])
|
||||||
|
}
|
||||||
|
|
||||||
type mockPostings struct {
|
type mockPostings struct {
|
||||||
next func() bool
|
next func() bool
|
||||||
seek func(uint64) bool
|
seek func(uint64) bool
|
||||||
@ -33,13 +42,6 @@ func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) }
|
|||||||
func (m *mockPostings) Value() uint64 { return m.value() }
|
func (m *mockPostings) Value() uint64 { return m.value() }
|
||||||
func (m *mockPostings) Err() error { return m.err() }
|
func (m *mockPostings) Err() error { return m.err() }
|
||||||
|
|
||||||
func expandPostings(p Postings) (res []uint64, err error) {
|
|
||||||
for p.Next() {
|
|
||||||
res = append(res, p.At())
|
|
||||||
}
|
|
||||||
return res, p.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntersect(t *testing.T) {
|
func TestIntersect(t *testing.T) {
|
||||||
var cases = []struct {
|
var cases = []struct {
|
||||||
a, b []uint64
|
a, b []uint64
|
||||||
|
Loading…
Reference in New Issue
Block a user