mirror of
https://github.com/prometheus/prometheus
synced 2025-03-05 13:08:30 +00:00
Merge pull request #147 from prometheus/deadlock
Simplify series create logc in head
This commit is contained in:
commit
f39388c9af
@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/pkg/errors"
|
||||
promlabels "github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/textparse"
|
||||
@ -88,7 +89,10 @@ func (b *writeBenchmark) run() {
|
||||
|
||||
dir := filepath.Join(b.outPath, "storage")
|
||||
|
||||
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
||||
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
||||
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||
|
||||
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
|
||||
WALFlushInterval: 200 * time.Millisecond,
|
||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),
|
||||
|
45
head.go
45
head.go
@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// ReadWAL initializes the head by consuming the write ahead log.
|
||||
func (h *Head) ReadWAL() error {
|
||||
r := h.wal.Reader()
|
||||
mint := h.MinTime()
|
||||
|
||||
seriesFunc := func(series []RefSeries) error {
|
||||
for _, s := range series {
|
||||
h.create(s.Labels.Hash(), s.Labels)
|
||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||
|
||||
if h.lastSeriesID < s.Ref {
|
||||
h.lastSeriesID = s.Ref
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error {
|
||||
}
|
||||
ms := h.series.getByID(s.Ref)
|
||||
if ms == nil {
|
||||
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
|
||||
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
||||
continue
|
||||
}
|
||||
_, chunkCreated := ms.append(s.T, s.V)
|
||||
if chunkCreated {
|
||||
@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error {
|
||||
h.metrics.chunks.Inc()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
deletesFunc := func(stones []Stone) error {
|
||||
@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error {
|
||||
h.tombstones.add(s.ref, itv)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||
if t < a.mint {
|
||||
return 0, ErrOutOfBounds
|
||||
}
|
||||
hash := lset.Hash()
|
||||
|
||||
s := a.head.series.getByHash(hash, lset)
|
||||
|
||||
if s == nil {
|
||||
s = a.head.create(hash, lset)
|
||||
|
||||
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
||||
if created {
|
||||
a.series = append(a.series, RefSeries{
|
||||
Ref: s.ref,
|
||||
Labels: lset,
|
||||
hash: hash,
|
||||
})
|
||||
}
|
||||
return s.ref, a.AddFast(s.ref, t, v)
|
||||
@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
||||
h.metrics.series.Inc()
|
||||
h.metrics.seriesCreated.Inc()
|
||||
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||||
// a new series on every sample inserted via Add(), which causes allocations
|
||||
// and makes our series IDs rather random and harder to compress in postings.
|
||||
s := h.series.getByHash(hash, lset)
|
||||
if s != nil {
|
||||
return s, false
|
||||
}
|
||||
|
||||
// Optimistically assume that we are the first one to create the series.
|
||||
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
||||
|
||||
return h.getOrCreateWithID(id, hash, lset)
|
||||
}
|
||||
|
||||
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||
s := newMemSeries(lset, id, h.chunkRange)
|
||||
|
||||
s, created := h.series.getOrSet(hash, s)
|
||||
// Skip indexing if we didn't actually create the series.
|
||||
if !created {
|
||||
return s
|
||||
return s, false
|
||||
}
|
||||
|
||||
h.metrics.series.Inc()
|
||||
h.metrics.seriesCreated.Inc()
|
||||
|
||||
h.postings.add(id, lset)
|
||||
|
||||
h.symMtx.Lock()
|
||||
@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
||||
h.symbols[l.Value] = struct{}{}
|
||||
}
|
||||
|
||||
return s
|
||||
return s, true
|
||||
}
|
||||
|
||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||||
|
87
head_test.go
87
head_test.go
@ -21,6 +21,7 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
|
||||
promlabels "github.com/prometheus/prometheus/pkg/labels"
|
||||
@ -41,7 +42,7 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for _, l := range lbls {
|
||||
h.create(l.Hash(), l)
|
||||
h.getOrCreate(l.Hash(), l)
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,16 +84,92 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
|
||||
return mets, nil
|
||||
}
|
||||
|
||||
type memoryWAL struct {
|
||||
nopWAL
|
||||
entries []interface{}
|
||||
}
|
||||
|
||||
func (w *memoryWAL) Reader() WALReader {
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
|
||||
for _, e := range w.entries {
|
||||
switch v := e.(type) {
|
||||
case []RefSeries:
|
||||
series(v)
|
||||
case []RefSample:
|
||||
samples(v)
|
||||
case []Stone:
|
||||
deletes(v)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHead_ReadWAL(t *testing.T) {
|
||||
entries := []interface{}{
|
||||
[]RefSeries{
|
||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||
{Ref: 11, Labels: labels.FromStrings("a", "2")},
|
||||
{Ref: 100, Labels: labels.FromStrings("a", "3")},
|
||||
},
|
||||
[]RefSample{
|
||||
{Ref: 0, T: 99, V: 1},
|
||||
{Ref: 10, T: 100, V: 2},
|
||||
{Ref: 100, T: 100, V: 3},
|
||||
},
|
||||
[]RefSeries{
|
||||
{Ref: 50, Labels: labels.FromStrings("a", "4")},
|
||||
},
|
||||
[]RefSample{
|
||||
{Ref: 10, T: 101, V: 5},
|
||||
{Ref: 50, T: 101, V: 6},
|
||||
},
|
||||
}
|
||||
wal := &memoryWAL{entries: entries}
|
||||
|
||||
head, err := NewHead(nil, nil, wal, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, head.ReadWAL())
|
||||
require.Equal(t, uint64(100), head.lastSeriesID)
|
||||
|
||||
s10 := head.series.getByID(10)
|
||||
s11 := head.series.getByID(11)
|
||||
s50 := head.series.getByID(50)
|
||||
s100 := head.series.getByID(100)
|
||||
|
||||
require.Equal(t, labels.FromStrings("a", "1"), s10.lset)
|
||||
require.Equal(t, labels.FromStrings("a", "2"), s11.lset)
|
||||
require.Equal(t, labels.FromStrings("a", "4"), s50.lset)
|
||||
require.Equal(t, labels.FromStrings("a", "3"), s100.lset)
|
||||
|
||||
expandChunk := func(c chunks.Iterator) (x []sample) {
|
||||
for c.Next() {
|
||||
t, v := c.At()
|
||||
x = append(x, sample{t: t, v: v})
|
||||
}
|
||||
require.NoError(t, c.Err())
|
||||
return x
|
||||
}
|
||||
|
||||
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
|
||||
require.Equal(t, 0, len(s11.chunks))
|
||||
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
|
||||
require.Equal(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
|
||||
}
|
||||
|
||||
func TestHead_Truncate(t *testing.T) {
|
||||
h, err := NewHead(nil, nil, nil, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s1 := h.create(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
s2 := h.create(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
s3 := h.create(3, labels.FromStrings("a", "1", "b", "2"))
|
||||
s4 := h.create(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
||||
s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
|
||||
s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
|
||||
|
||||
s1.chunks = []*memChunk{
|
||||
{minTime: 0, maxTime: 999},
|
||||
|
15
wal.go
15
wal.go
@ -99,9 +99,6 @@ type WALReader interface {
|
||||
type RefSeries struct {
|
||||
Ref uint64
|
||||
Labels labels.Labels
|
||||
|
||||
// hash for the label set. This field is not generally populated.
|
||||
hash uint64
|
||||
}
|
||||
|
||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||
@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "decode series entry")
|
||||
}
|
||||
seriesf(series)
|
||||
if err := seriesf(series); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cf := r.current()
|
||||
|
||||
@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "decode samples entry")
|
||||
}
|
||||
samplesf(samples)
|
||||
if err := samplesf(samples); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the times for the WAL segment file.
|
||||
cf := r.current()
|
||||
@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "decode delete entry")
|
||||
}
|
||||
deletesf(stones)
|
||||
if err := deletesf(stones); err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the times for the WAL segment file.
|
||||
|
||||
cf := r.current()
|
||||
|
Loading…
Reference in New Issue
Block a user