Merge pull request #339 from prometheus/inittime

Properly initialize head time
This commit is contained in:
Fabian Reinartz 2018-08-02 16:54:38 -05:00 committed by GitHub
commit a9a8fabd2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 155 additions and 34 deletions

View File

@ -19,6 +19,7 @@ import (
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
@ -1193,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}
func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
// Should be set to init values if no WAL or blocks exist so far.
testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
// First added sample initializes the writable range.
app := db.Appender()
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
testutil.Ok(t, err)
testutil.Equals(t, int64(1000), db.head.MinTime())
testutil.Equals(t, int64(1000), db.head.MaxTime())
})
t.Run("wal-only", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(5000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
t.Run("existing-block", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 2000,
})
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(2000), db.head.MinTime())
testutil.Equals(t, int64(2000), db.head.MaxTime())
})
t.Run("existing-block-and-wal", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 6000,
})
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(6000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
}

81
head.go
View File

@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MinInt64,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
series: newStripeSeries(),
values: map[string]stringset{},
@ -200,17 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
mint int64,
minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
maxt := h.MaxTime()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input {
for _, s := range samples {
if s.T < mint || s.Ref%total != partition {
if s.T < minValidTime || s.Ref%total != partition {
continue
}
ms := h.series.getByID(s.Ref)
@ -226,10 +226,27 @@ func (h *Head) processWALSamples(
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
}
output <- samples
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
func (h *Head) updateMinMaxTime(mint, maxt int64) {
for {
lt := h.MinTime()
if mint >= lt {
break
}
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
break
}
}
for {
ht := h.MaxTime()
if maxt <= ht {
@ -239,12 +256,15 @@ func (h *Head) processWALSamples(
break
}
}
return unknownRefs
}
func (h *Head) loadWAL(r *wal.Reader) error {
mint := h.MinTime()
minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}
// Track number of samples that referenced a series we don't know about
// for error reporting.
@ -265,7 +285,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(i, input, output)
@ -327,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
for _, s := range tstones {
for _, itv := range s.intervals {
if itv.Maxt < mint {
if itv.Maxt < minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
@ -400,9 +420,9 @@ func (h *Head) Init() error {
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MinInt64
initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint {
if h.MinTime() >= mint && !initialize {
return nil
}
atomic.StoreInt64(&h.minTime, mint)
@ -462,10 +482,7 @@ func (h *Head) Truncate(mint int64) error {
// for a compltely fresh head with an empty WAL.
// Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64.
mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) {
if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
return false
}
// Ensure that max time is initialized to at least the min time we just set.
@ -536,7 +553,7 @@ func (h *Head) Appender() Appender {
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MinInt64 {
if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h}
}
return h.appender()
@ -544,10 +561,11 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}
@ -576,15 +594,16 @@ func (h *Head) putBytesBuffer(b []byte) {
}
type headAppender struct {
head *Head
mint, maxt int64
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []RefSeries
samples []RefSample
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.mint {
if t < a.minValidTime {
return 0, ErrOutOfBounds
}
@ -611,9 +630,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil {
return err
}
if t < a.mint {
if t < a.minValidTime {
return ErrOutOfBounds
}
if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}
@ -682,16 +704,7 @@ func (a *headAppender) Commit() error {
}
a.head.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) {
break
}
}
a.head.updateMinMaxTime(a.mint, a.maxt)
return nil
}