diff --git a/db_test.go b/db_test.go index 698bf78b0..9c175118e 100644 --- a/db_test.go +++ b/db_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "testing" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { @@ -1193,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { count := len(q.(*querier).blocks) testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } + +func TestInitializeHeadTimestamp(t *testing.T) { + t.Run("clean", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + // Should be set to init values if no WAL or blocks exist so far. + testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime()) + testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) + + // First added sample initializes the writable range. + app := db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) + testutil.Ok(t, err) + + testutil.Equals(t, int64(1000), db.head.MinTime()) + testutil.Equals(t, int64(1000), db.head.MaxTime()) + }) + t.Run("wal-only", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(5000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) + t.Run("existing-block", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 2000, + }) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(2000), db.head.MinTime()) + testutil.Equals(t, int64(2000), db.head.MaxTime()) + }) + t.Run("existing-block-and-wal", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 6000, + }) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(6000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) +} diff --git a/head.go b/head.go index 61457911f..172d36e87 100644 --- a/head.go +++ b/head.go @@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int wal: wal, logger: l, chunkRange: chunkRange, - minTime: math.MinInt64, + minTime: math.MaxInt64, maxTime: math.MinInt64, series: newStripeSeries(), values: map[string]stringset{}, @@ -200,17 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // them on to other workers. // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( - mint int64, + minValidTime int64, partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) - maxt := h.MaxTime() + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { for _, s := range samples { - if s.T < mint || s.Ref%total != partition { + if s.T < minValidTime || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -226,10 +226,27 @@ func (h *Head) processWALSamples( if s.T > maxt { maxt = s.T } + if s.T < mint { + mint = s.T + } } output <- samples } + h.updateMinMaxTime(mint, maxt) + return unknownRefs +} + +func (h *Head) updateMinMaxTime(mint, maxt int64) { + for { + lt := h.MinTime() + if mint >= lt { + break + } + if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) { + break + } + } for { ht := h.MaxTime() if maxt <= ht { @@ -239,12 +256,15 @@ func (h *Head) processWALSamples( break } } - - return unknownRefs } func (h *Head) loadWAL(r *wal.Reader) error { - mint := h.MinTime() + minValidTime := h.MinTime() + // If the min time is still uninitialized (no persisted blocks yet), + // we accept all sample timestamps from the WAL. + if minValidTime == math.MaxInt64 { + minValidTime = math.MinInt64 + } // Track number of samples that referenced a series we don't know about // for error reporting. @@ -265,7 +285,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { output := make(chan []RefSample, 300) go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(i, input, output) @@ -327,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { } for _, s := range tstones { for _, itv := range s.intervals { - if itv.Maxt < mint { + if itv.Maxt < minValidTime { continue } h.tombstones.addInterval(s.ref, itv) @@ -400,9 +420,9 @@ func (h *Head) Init() error { // Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) error { - initialize := h.MinTime() == math.MinInt64 + initialize := h.MinTime() == math.MaxInt64 - if h.MinTime() >= mint { + if h.MinTime() >= mint && !initialize { return nil } atomic.StoreInt64(&h.minTime, mint) @@ -462,10 +482,7 @@ func (h *Head) Truncate(mint int64) error { // for a compltely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { - // In the init state, the head has a high timestamp of math.MinInt64. - mint, _ := rangeForTimestamp(t, h.chunkRange) - - if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) { + if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { return false } // Ensure that max time is initialized to at least the min time we just set. @@ -536,7 +553,7 @@ func (h *Head) Appender() Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. - if h.MinTime() == math.MinInt64 { + if h.MinTime() == math.MaxInt64 { return &initAppender{head: h} } return h.appender() @@ -544,10 +561,11 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), + head: h, + minValidTime: h.MaxTime() - h.chunkRange/2, + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), } } @@ -576,15 +594,16 @@ func (h *Head) putBytesBuffer(b []byte) { } type headAppender struct { - head *Head - mint, maxt int64 + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 series []RefSeries samples []RefSample } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - if t < a.mint { + if t < a.minValidTime { return 0, ErrOutOfBounds } @@ -611,9 +630,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - if t < a.mint { + if t < a.minValidTime { return ErrOutOfBounds } + if t < a.mint { + a.mint = t + } if t > a.maxt { a.maxt = t } @@ -682,16 +704,7 @@ func (a *headAppender) Commit() error { } a.head.metrics.samplesAppended.Add(float64(total)) - - for { - ht := a.head.MaxTime() - if a.maxt <= ht { - break - } - if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) { - break - } - } + a.head.updateMinMaxTime(a.mint, a.maxt) return nil }