From 5cf2662074b0afdc946fabcd19b9f29a3870fbae Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 30 Aug 2017 18:34:54 +0200 Subject: [PATCH] Refactor WAL into Head and misc improvements --- block.go | 2 + chunks.go | 4 +- db.go | 629 +++++++-------------------------------------------- db_test.go | 4 +- head.go | 551 ++++++++++++++++++++++++++++++++++++++++++-- head_test.go | 24 +- wal.go | 11 +- wal_test.go | 6 +- 8 files changed, 633 insertions(+), 598 deletions(-) diff --git a/block.go b/block.go index 60a644f4b..a03a79dce 100644 --- a/block.go +++ b/block.go @@ -37,6 +37,8 @@ type DiskBlock interface { Delete(mint, maxt int64, m ...labels.Matcher) error + Snapshot(dir string) error + Close() error } diff --git a/chunks.go b/chunks.go index 1926ab599..5955c5085 100644 --- a/chunks.go +++ b/chunks.go @@ -179,7 +179,7 @@ func (w *chunkWriter) cut() error { return err } - p, _, err := nextSequenceFile(w.dirFile.Name(), "") + p, _, err := nextSequenceFile(w.dirFile.Name()) if err != nil { return err } @@ -302,7 +302,7 @@ type chunkReader struct { // newChunkReader returns a new chunkReader based on mmaped files found in dir. func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { - files, err := sequenceFiles(dir, "") + files, err := sequenceFiles(dir) if err != nil { return nil, err } diff --git a/db.go b/db.go index 1c6c53c40..8d4c56932 100644 --- a/db.go +++ b/db.go @@ -16,19 +16,15 @@ package tsdb import ( "bytes" - "encoding/binary" "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" "runtime" "sort" "strconv" - "strings" "sync" - "sync/atomic" "time" "unsafe" @@ -99,13 +95,12 @@ type DB struct { dir string lockf *lockfile.Lockfile - logger log.Logger - metrics *dbMetrics - opts *Options - chunkPool chunks.Pool - appendPool sync.Pool - compactor Compactor - wal WAL + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunks.Pool + compactor Compactor + wal WAL // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -123,33 +118,15 @@ type DB struct { } type dbMetrics struct { - activeAppenders prometheus.Gauge - loadedBlocks prometheus.GaugeFunc - reloads prometheus.Counter - reloadsFailed prometheus.Counter - walTruncateDuration prometheus.Summary - samplesAppended prometheus.Counter - - headSeries prometheus.Gauge - headSeriesCreated prometheus.Counter - headSeriesRemoved prometheus.Counter - headChunks prometheus.Gauge - headChunksCreated prometheus.Gauge - headChunksRemoved prometheus.Gauge - headGCDuration prometheus.Summary - headMinTime prometheus.GaugeFunc - headMaxTime prometheus.GaugeFunc - + loadedBlocks prometheus.GaugeFunc + reloads prometheus.Counter + reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m := &dbMetrics{} - m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_active_appenders", - Help: "Number of currently active appender transactions", - }) m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "tsdb_blocks_loaded", Help: "Number of currently loaded data blocks", @@ -166,57 +143,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "tsdb_reloads_failures_total", Help: "Number of times the database failed to reload black data from disk.", }) - - m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_wal_truncate_duration_seconds", - Help: "Duration of WAL truncation.", - }) - - m.headSeries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series", - Help: "Total number of series in the head block.", - }) - m.headSeriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series_created_total", - Help: "Total number of series created in the head", - }) - m.headSeriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_series_removed_total", - Help: "Total number of series removed in the head", - }) - m.headChunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks", - Help: "Total number of chunks in the head block.", - }) - m.headChunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks_created_total", - Help: "Total number of chunks created in the head", - }) - m.headChunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tsdb_head_chunks_removed_total", - Help: "Total number of chunks removed in the head", - }) - m.headGCDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_head_gc_duration_seconds", - Help: "Runtime of garbage collection in the head block.", - }) - m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_max_time", - Help: "Maximum timestamp of the head block.", - }, func() float64 { - return float64(db.head.MaxTime()) - }) - m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_head_min_time", - Help: "Minimum time bound of the head block.", - }, func() float64 { - return float64(db.head.MinTime()) - }) - - m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_samples_appended_total", - Help: "Total number of appended sampledb.", - }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ Name: "tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", @@ -224,23 +150,9 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( - m.activeAppenders, m.loadedBlocks, m.reloads, m.reloadsFailed, - m.walTruncateDuration, - - m.headChunks, - m.headChunksCreated, - m.headChunksRemoved, - m.headSeries, - m.headSeriesCreated, - m.headSeriesRemoved, - m.headMinTime, - m.headMaxTime, - m.headGCDuration, - - m.samplesAppended, m.compactionsTriggered, ) } @@ -260,16 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db opts = DefaultOptions } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second) - if err != nil { - return nil, err - } - db = &DB{ dir: dir, logger: l, opts: opts, - wal: wal, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -312,14 +218,15 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.compactor = NewLeveledCompactor(r, l, copts) - db.head, err = NewHead(l, copts.blockRanges[0]) + wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second) if err != nil { return nil, err } - if err := db.readWAL(db.wal.Reader()); err != nil { + db.head, err = NewHead(r, l, wal, copts.blockRanges[0]) + if err != nil { return nil, err } - if err := db.reloadBlocks(); err != nil { + if err := db.reload(); err != nil { return nil, err } @@ -341,6 +248,7 @@ func (db *DB) run() { for { select { case <-db.stopc: + return case <-time.After(backoff): } @@ -364,7 +272,9 @@ func (db *DB) run() { } if err1 != nil || err2 != nil { - exponential(backoff, 1*time.Second, 1*time.Minute) + backoff = exponential(backoff, 1*time.Second, 1*time.Minute) + } else { + backoff = 0 } case <-db.stopc: @@ -391,6 +301,30 @@ func (db *DB) retentionCutoff() (bool, error) { return retentionCutoff(db.dir, mint) } +// Appender opens a new appender against the database. +func (db *DB) Appender() Appender { + return dbAppender{db: db, Appender: db.head.Appender()} +} + +// dbAppender wraps the DB's head appender and triggers compactions on commit +// if necessary. +type dbAppender struct { + Appender + db *DB +} + +func (a dbAppender) Commit() error { + err := a.Appender.Commit() + + if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 { + select { + case a.db.compactc <- struct{}{}: + default: + } + } + return err +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -425,7 +359,7 @@ func (db *DB) compact() (changes bool, err error) { } changes = true - if err := db.reloadBlocks(); err != nil { + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -458,7 +392,7 @@ func (db *DB) compact() (changes bool, err error) { } } - if err := db.reloadBlocks(); err != nil { + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -512,50 +446,7 @@ func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { return nil, false } -func (db *DB) readWAL(r WALReader) error { - - seriesFunc := func(series []labels.Labels) error { - for _, lset := range series { - db.head.create(lset.Hash(), lset) - db.metrics.headSeries.Inc() - db.metrics.headSeriesCreated.Inc() - } - return nil - } - samplesFunc := func(samples []RefSample) error { - for _, s := range samples { - ms, ok := db.head.series[uint32(s.Ref)] - if !ok { - return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) - } - _, chunkCreated := ms.append(s.T, s.V) - if chunkCreated { - db.metrics.headChunksCreated.Inc() - db.metrics.headChunks.Inc() - } - } - - return nil - } - deletesFunc := func(stones []Stone) error { - for _, s := range stones { - for _, itv := range s.intervals { - db.head.tombstones.add(s.ref, itv) - } - } - - return nil - } - - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { - return errors.Wrap(err, "consume WAL") - } - - return nil - -} - -func (db *DB) reloadBlocks() (err error) { +func (db *DB) reload() (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -613,29 +504,11 @@ func (db *DB) reloadBlocks() (err error) { // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. if len(blocks) == 0 { - return + return nil } + maxt := blocks[len(db.blocks)-1].Meta().MaxTime - if maxt <= db.head.MinTime() { - return - } - start := time.Now() - atomic.StoreInt64(&db.head.minTime, maxt) - - series, chunks := db.head.gc() - db.metrics.headSeriesRemoved.Add(float64(series)) - db.metrics.headSeries.Sub(float64(series)) - db.metrics.headChunksRemoved.Add(float64(chunks)) - db.metrics.headChunks.Sub(float64(chunks)) - - db.logger.Log("msg", "head GC completed", "duration", time.Since(start)) - - start = time.Now() - - if err := db.wal.Truncate(maxt); err != nil { - return errors.Wrapf(err, "truncate WAL at %d", maxt) - } - db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + db.head.Truncate(maxt) return nil } @@ -701,24 +574,28 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - // if dir == db.dir { - // return errors.Errorf("cannot snapshot into base directory") - // } - // db.cmtx.Lock() - // defer db.cmtx.Unlock() + if dir == db.dir { + return errors.Errorf("cannot snapshot into base directory") + } + if _, err := ulid.Parse(dir); err == nil { + return errors.Errorf("dir must not be a valid ULID") + } - // db.mtx.Lock() // To block any appenders. - // defer db.mtx.Unlock() + db.cmtx.Lock() + defer db.cmtx.Unlock() - // blocks := db.blocks[:] - // for _, b := range blocks { - // db.logger.Log("msg", "snapshotting block", "block", b) - // if err := b.Snapshot(dir); err != nil { - // return errors.Wrap(err, "error snapshotting headblock") - // } - // } + db.mtx.RLock() + defer db.mtx.RUnlock() - return nil + for _, b := range db.blocks { + db.logger.Log("msg", "snapshotting block", "block", b) + + if err := b.Snapshot(dir); err != nil { + return errors.Wrap(err, "error snapshotting headblock") + } + } + + return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) } // Querier returns a new querier over the data partition for the given time range. @@ -741,320 +618,9 @@ func (db *DB) Querier(mint, maxt int64) Querier { tombstones: b.Tombstones(), }) } - return sq } -// initAppender is a helper to initialize the time bounds of a the head -// upon the first sample it receives. -type initAppender struct { - app Appender - db *DB -} - -func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - if a.app != nil { - return a.app.Add(lset, t, v) - } - for { - // In the init state, the head has a high timestamp of math.MinInt64. - ht := a.db.head.MaxTime() - if ht != math.MinInt64 { - break - } - cr := a.db.opts.BlockRanges[0] - mint, _ := rangeForTimestamp(t, cr) - - atomic.CompareAndSwapInt64(&a.db.head.maxTime, ht, t) - atomic.StoreInt64(&a.db.head.minTime, mint-cr) - } - a.app = a.db.appender() - - return a.app.Add(lset, t, v) -} - -func (a *initAppender) AddFast(ref string, t int64, v float64) error { - if a.app == nil { - return ErrNotFound - } - return a.app.AddFast(ref, t, v) -} - -func (a *initAppender) Commit() error { - if a.app == nil { - return nil - } - return a.app.Commit() -} - -func (a *initAppender) Rollback() error { - if a.app == nil { - return nil - } - return a.app.Rollback() -} - -// Appender returns a new Appender on the database. -func (db *DB) Appender() Appender { - db.metrics.activeAppenders.Inc() - - // The head cache might not have a starting point yet. The init appender - // picks up the first appended timestamp as the base. - if db.head.MaxTime() == math.MinInt64 { - return &initAppender{db: db} - } - return db.appender() -} - -func (db *DB) appender() *dbAppender { - db.head.mtx.RLock() - - return &dbAppender{ - db: db, - head: db.head, - wal: db.wal, - mint: db.head.MaxTime() - db.opts.BlockRanges[0]/2, - samples: db.getAppendBuffer(), - highTimestamp: math.MinInt64, - lowTimestamp: math.MaxInt64, - } -} - -func (db *DB) getAppendBuffer() []RefSample { - b := db.appendPool.Get() - if b == nil { - return make([]RefSample, 0, 512) - } - return b.([]RefSample) -} - -func (db *DB) putAppendBuffer(b []RefSample) { - db.appendPool.Put(b[:0]) -} - -type dbAppender struct { - db *DB - head *Head - wal WAL - mint int64 - - newSeries []*hashedLabels - newLabels []labels.Labels - newHashes map[uint64]uint64 - - samples []RefSample - highTimestamp int64 - lowTimestamp int64 -} - -type hashedLabels struct { - ref uint64 - hash uint64 - labels labels.Labels -} - -func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - if t < a.mint { - return "", ErrOutOfBounds - } - - hash := lset.Hash() - refb := make([]byte, 8) - - // Series exists already in the block. - if ms := a.head.get(hash, lset); ms != nil { - binary.BigEndian.PutUint64(refb, uint64(ms.ref)) - return string(refb), a.AddFast(string(refb), t, v) - } - // Series was added in this transaction previously. - if ref, ok := a.newHashes[hash]; ok { - binary.BigEndian.PutUint64(refb, ref) - // XXX(fabxc): there's no fast path for multiple samples for the same new series - // in the same transaction. We always return the invalid empty ref. It's has not - // been a relevant use case so far and is not worth the trouble. - return "", a.AddFast(string(refb), t, v) - } - - // The series is completely new. - if a.newSeries == nil { - a.newHashes = map[uint64]uint64{} - } - // First sample for new series. - ref := uint64(len(a.newSeries)) - - a.newSeries = append(a.newSeries, &hashedLabels{ - ref: ref, - hash: hash, - labels: lset, - }) - // First bit indicates its a series created in this transaction. - ref |= (1 << 63) - - a.newHashes[hash] = ref - binary.BigEndian.PutUint64(refb, ref) - - return "", a.AddFast(string(refb), t, v) -} - -func (a *dbAppender) AddFast(ref string, t int64, v float64) error { - if len(ref) != 8 { - return errors.Wrap(ErrNotFound, "invalid ref length") - } - var ( - refn = binary.BigEndian.Uint64(yoloBytes(ref)) - id = uint32(refn) - inTx = refn&(1<<63) != 0 - ) - // Distinguish between existing series and series created in - // this transaction. - if inTx { - if id > uint32(len(a.newSeries)-1) { - return errors.Wrap(ErrNotFound, "transaction series ID too high") - } - // TODO(fabxc): we also have to validate here that the - // sample sequence is valid. - // We also have to revalidate it as we switch locks and create - // the new series. - } else { - ms, ok := a.head.series[id] - if !ok { - return errors.Wrap(ErrNotFound, "unknown series") - } - if err := ms.appendable(t, v); err != nil { - return err - } - } - if t < a.mint { - return ErrOutOfBounds - } - - if t > a.highTimestamp { - a.highTimestamp = t - } - // if t < a.lowTimestamp { - // a.lowTimestamp = t - // } - - a.samples = append(a.samples, RefSample{ - Ref: refn, - T: t, - V: v, - }) - return nil -} - -func (a *dbAppender) createSeries() error { - if len(a.newSeries) == 0 { - return nil - } - a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) - base0 := len(a.head.series) - - a.head.mtx.RUnlock() - defer a.head.mtx.RLock() - a.head.mtx.Lock() - defer a.head.mtx.Unlock() - - base1 := len(a.head.series) - - for _, l := range a.newSeries { - // We switched locks and have to re-validate that the series were not - // created by another goroutine in the meantime. - if base1 > base0 { - if ms := a.head.get(l.hash, l.labels); ms != nil { - l.ref = uint64(ms.ref) - continue - } - } - // Series is still new. - a.newLabels = append(a.newLabels, l.labels) - - s := a.head.create(l.hash, l.labels) - l.ref = uint64(s.ref) - - a.db.metrics.headSeriesCreated.Inc() - a.db.metrics.headSeries.Inc() - } - - // Write all new series to the WAL. - if err := a.wal.LogSeries(a.newLabels); err != nil { - return errors.Wrap(err, "WAL log series") - } - - return nil -} - -func (a *dbAppender) Commit() error { - defer a.head.mtx.RUnlock() - - defer a.db.metrics.activeAppenders.Dec() - defer a.db.putAppendBuffer(a.samples) - - if err := a.createSeries(); err != nil { - return err - } - - // We have to update the refs of samples for series we just created. - for i := range a.samples { - s := &a.samples[i] - if s.Ref&(1<<63) != 0 { - s.Ref = a.newSeries[(s.Ref<<1)>>1].ref - } - } - - // Write all new samples to the WAL and add them to the - // in-mem database on success. - if err := a.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") - } - - total := uint64(len(a.samples)) - - for _, s := range a.samples { - series, ok := a.head.series[uint32(s.Ref)] - if !ok { - return errors.Errorf("series with ID %d not found", s.Ref) - } - ok, chunkCreated := series.append(s.T, s.V) - if !ok { - total-- - } - if chunkCreated { - a.db.metrics.headChunks.Inc() - a.db.metrics.headChunksCreated.Inc() - } - } - - a.db.metrics.samplesAppended.Add(float64(total)) - - for { - ht := a.head.MaxTime() - if a.highTimestamp <= ht { - break - } - if a.highTimestamp-a.head.MinTime() > a.head.chunkRange/2*3 { - select { - case a.db.compactc <- struct{}{}: - default: - } - } - if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) { - break - } - } - - return nil -} - -func (a *dbAppender) Rollback() error { - a.head.mtx.RUnlock() - - a.db.metrics.activeAppenders.Dec() - a.db.putAppendBuffer(a.samples) - - return nil -} - func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { mint = (t / width) * width return mint, mint + width @@ -1078,41 +644,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { }(b)) } } - if err := g.Wait(); err != nil { - return err - } - ir := db.head.Index() - - pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) - - var stones []Stone - -Outer: - for p.Next() { - series := db.head.series[p.At()] - - for _, abs := range absent { - if series.lset.Get(abs) != "" { - continue Outer - } - } - - // Delete only until the current values and not beyond. - t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) - } - - if p.Err() != nil { - return p.Err() - } - if err := db.wal.LogDeletes(stones); err != nil { - return err - } - for _, s := range stones { - db.head.tombstones.add(s.ref, s.intervals[0]) - } + g.Go(func() error { + return db.head.Delete(mint, maxt, ms...) + }) if err := g.Wait(); err != nil { return err @@ -1171,7 +706,7 @@ func blockDirs(dir string) ([]string, error) { return dirs, nil } -func sequenceFiles(dir, prefix string) ([]string, error) { +func sequenceFiles(dir string) ([]string, error) { files, err := ioutil.ReadDir(dir) if err != nil { return nil, err @@ -1179,24 +714,15 @@ func sequenceFiles(dir, prefix string) ([]string, error) { var res []string for _, fi := range files { - if isSequenceFile(fi, prefix) { - res = append(res, filepath.Join(dir, fi.Name())) + if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { + continue } + res = append(res, filepath.Join(dir, fi.Name())) } return res, nil } -func isSequenceFile(fi os.FileInfo, prefix string) bool { - if !strings.HasPrefix(fi.Name(), prefix) { - return false - } - if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil { - return false - } - return true -} - -func nextSequenceFile(dir, prefix string) (string, int, error) { +func nextSequenceFile(dir string) (string, int, error) { names, err := fileutil.ReadDir(dir) if err != nil { return "", 0, err @@ -1204,16 +730,13 @@ func nextSequenceFile(dir, prefix string) (string, int, error) { i := uint64(0) for _, n := range names { - if !strings.HasPrefix(n, prefix) { - continue - } - j, err := strconv.ParseUint(n[len(prefix):], 10, 32) + j, err := strconv.ParseUint(n, 10, 64) if err != nil { continue } i = j } - return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil + return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil } // The MultiError type implements the error interface, and contains the diff --git a/db_test.go b/db_test.go index 8a5d946eb..57d885833 100644 --- a/db_test.go +++ b/db_test.go @@ -33,9 +33,7 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { require.NoError(t, err) // Do not close the test database by default as it will deadlock on test failures. - return db, func() { - os.RemoveAll(tmpdir) - } + return db, func() { os.RemoveAll(tmpdir) } } // Convert a SeriesSet into a form useable with reflect.DeepEqual. diff --git a/head.go b/head.go index 6d5355c85..d61d14b74 100644 --- a/head.go +++ b/head.go @@ -14,14 +14,16 @@ package tsdb import ( - "fmt" + "encoding/binary" "math" "sort" "sync" "sync/atomic" + "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -47,6 +49,10 @@ var ( type Head struct { chunkRange int64 mtx sync.RWMutex + metrics *headMetrics + wal WAL + logger log.Logger + appendPool sync.Pool minTime, maxTime int64 lastSeriesID uint32 @@ -65,9 +71,110 @@ type Head struct { tombstones tombstoneReader } +type headMetrics struct { + activeAppenders prometheus.Gauge + series prometheus.Gauge + seriesCreated prometheus.Counter + seriesRemoved prometheus.Counter + chunks prometheus.Gauge + chunksCreated prometheus.Gauge + chunksRemoved prometheus.Gauge + gcDuration prometheus.Summary + minTime prometheus.GaugeFunc + maxTime prometheus.GaugeFunc + samplesAppended prometheus.Counter + walTruncateDuration prometheus.Summary +} + +func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { + m := &headMetrics{} + + m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_active_appenders", + Help: "Number of currently active appender transactions", + }) + m.series = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series", + Help: "Total number of series in the head block.", + }) + m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series_created_total", + Help: "Total number of series created in the head", + }) + m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series_removed_total", + Help: "Total number of series removed in the head", + }) + m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks", + Help: "Total number of chunks in the head block.", + }) + m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks_created_total", + Help: "Total number of chunks created in the head", + }) + m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks_removed_total", + Help: "Total number of chunks removed in the head", + }) + m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tsdb_head_gc_duration_seconds", + Help: "Runtime of garbage collection in the head block.", + }) + m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_head_max_time", + Help: "Maximum timestamp of the head block.", + }, func() float64 { + return float64(h.MaxTime()) + }) + m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_head_min_time", + Help: "Minimum time bound of the head block.", + }, func() float64 { + return float64(h.MinTime()) + }) + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tsdb_wal_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + }) + m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_head_samples_appended_total", + Help: "Total number of appended sampledb.", + }) + + if r != nil { + r.MustRegister( + m.activeAppenders, + m.chunks, + m.chunksCreated, + m.chunksRemoved, + m.series, + m.seriesCreated, + m.seriesRemoved, + m.minTime, + m.maxTime, + m.gcDuration, + m.walTruncateDuration, + m.samplesAppended, + ) + } + return m +} + // NewHead opens the head block in dir. -func NewHead(l log.Logger, chunkRange int64) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) { + if l == nil { + l = log.NewNopLogger() + } + if wal == nil { + wal = NopWAL{} + } + if chunkRange < 1 { + return nil, errors.Errorf("invalid chunk range %d", chunkRange) + } h := &Head{ + wal: wal, + logger: l, chunkRange: chunkRange, minTime: math.MaxInt64, maxTime: math.MinInt64, @@ -78,15 +185,422 @@ func NewHead(l log.Logger, chunkRange int64) (*Head, error) { postings: &memPostings{m: make(map[term][]uint32)}, tombstones: newEmptyTombstoneReader(), } - return h, nil + h.metrics = newHeadMetrics(h, r) + + return h, h.readWAL() +} + +func (h *Head) readWAL() error { + r := h.wal.Reader(h.MinTime()) + + seriesFunc := func(series []labels.Labels) error { + for _, lset := range series { + h.create(lset.Hash(), lset) + } + return nil + } + samplesFunc := func(samples []RefSample) error { + for _, s := range samples { + ms, ok := h.series[uint32(s.Ref)] + if !ok { + return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + } + _, chunkCreated := ms.append(s.T, s.V) + if chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + } + + return nil + } + deletesFunc := func(stones []Stone) error { + for _, s := range stones { + for _, itv := range s.intervals { + h.tombstones.add(s.ref, itv) + } + } + + return nil + } + + if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { + return errors.Wrap(err, "consume WAL") + } + return nil } func (h *Head) String() string { return "" } +// Truncate removes all data before mint from the head block and truncates its WAL. +func (h *Head) Truncate(mint int64) { + if h.minTime >= mint { + return + } + atomic.StoreInt64(&h.minTime, mint) + + start := time.Now() + + h.gc() + h.logger.Log("msg", "head GC completed", "duration", time.Since(start)) + h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + + start = time.Now() + + if err := h.wal.Truncate(mint); err == nil { + h.logger.Log("msg", "WAL truncation completed", "duration", time.Since(start)) + } else { + h.logger.Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) + } + h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) +} + +// initTime initializes a head with the first timestamp. This only needs to be called +// for a compltely fresh head with an empty WAL. +// Returns true if the initialization took an effect. +func (h *Head) initTime(t int64) (initialized bool) { + // In the init state, the head has a high timestamp of math.MinInt64. + if h.MaxTime() != math.MinInt64 { + return false + } + mint, _ := rangeForTimestamp(t, h.chunkRange) + + if !atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) { + return false + } + atomic.StoreInt64(&h.minTime, mint-h.chunkRange) + return true +} + +// initAppender is a helper to initialize the time bounds of a the head +// upon the first sample it receives. +type initAppender struct { + app Appender + head *Head +} + +func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { + if a.app != nil { + return a.app.Add(lset, t, v) + } + if a.head.initTime(t) { + a.app = a.head.appender() + } + return a.app.Add(lset, t, v) +} + +func (a *initAppender) AddFast(ref string, t int64, v float64) error { + if a.app == nil { + return ErrNotFound + } + return a.app.AddFast(ref, t, v) +} + +func (a *initAppender) Commit() error { + if a.app == nil { + return nil + } + return a.app.Commit() +} + +func (a *initAppender) Rollback() error { + if a.app == nil { + return nil + } + return a.app.Rollback() +} + +// Appender returns a new Appender on the database. +func (h *Head) Appender() Appender { + h.metrics.activeAppenders.Inc() + + // The head cache might not have a starting point yet. The init appender + // picks up the first appended timestamp as the base. + if h.MaxTime() == math.MinInt64 { + return &initAppender{head: h} + } + return h.appender() +} + +func (h *Head) appender() *headAppender { + h.mtx.RLock() + + return &headAppender{ + head: h, + mint: h.MaxTime() - h.chunkRange/2, + samples: h.getAppendBuffer(), + highTimestamp: math.MinInt64, + } +} + +func (h *Head) getAppendBuffer() []RefSample { + b := h.appendPool.Get() + if b == nil { + return make([]RefSample, 0, 512) + } + return b.([]RefSample) +} + +func (h *Head) putAppendBuffer(b []RefSample) { + h.appendPool.Put(b[:0]) +} + +type headAppender struct { + head *Head + mint int64 + + newSeries []*hashedLabels + newLabels []labels.Labels + newHashes map[uint64]uint64 + + samples []RefSample + highTimestamp int64 +} + +type hashedLabels struct { + ref uint64 + hash uint64 + labels labels.Labels +} + +func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { + if t < a.mint { + return "", ErrOutOfBounds + } + + hash := lset.Hash() + refb := make([]byte, 8) + + // Series exists already in the block. + if ms := a.head.get(hash, lset); ms != nil { + binary.BigEndian.PutUint64(refb, uint64(ms.ref)) + return string(refb), a.AddFast(string(refb), t, v) + } + // Series was added in this transaction previously. + if ref, ok := a.newHashes[hash]; ok { + binary.BigEndian.PutUint64(refb, ref) + // XXX(fabxc): there's no fast path for multiple samples for the same new series + // in the same transaction. We always return the invalid empty ref. It's has not + // been a relevant use case so far and is not worth the trouble. + return "", a.AddFast(string(refb), t, v) + } + + // The series is completely new. + if a.newSeries == nil { + a.newHashes = map[uint64]uint64{} + } + // First sample for new series. + ref := uint64(len(a.newSeries)) + + a.newSeries = append(a.newSeries, &hashedLabels{ + ref: ref, + hash: hash, + labels: lset, + }) + // First bit indicates its a series created in this transaction. + ref |= (1 << 63) + + a.newHashes[hash] = ref + binary.BigEndian.PutUint64(refb, ref) + + return "", a.AddFast(string(refb), t, v) +} + +func (a *headAppender) AddFast(ref string, t int64, v float64) error { + if len(ref) != 8 { + return errors.Wrap(ErrNotFound, "invalid ref length") + } + var ( + refn = binary.BigEndian.Uint64(yoloBytes(ref)) + id = uint32(refn) + inTx = refn&(1<<63) != 0 + ) + // Distinguish between existing series and series created in + // this transaction. + if inTx { + if id > uint32(len(a.newSeries)-1) { + return errors.Wrap(ErrNotFound, "transaction series ID too high") + } + // TODO(fabxc): we also have to validate here that the + // sample sequence is valid. + // We also have to revalidate it as we switch locks and create + // the new series. + } else { + ms, ok := a.head.series[id] + if !ok { + return errors.Wrap(ErrNotFound, "unknown series") + } + if err := ms.appendable(t, v); err != nil { + return err + } + } + if t < a.mint { + return ErrOutOfBounds + } + + if t > a.highTimestamp { + a.highTimestamp = t + } + + a.samples = append(a.samples, RefSample{ + Ref: refn, + T: t, + V: v, + }) + return nil +} + +func (a *headAppender) createSeries() error { + if len(a.newSeries) == 0 { + return nil + } + a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + base0 := len(a.head.series) + + a.head.mtx.RUnlock() + defer a.head.mtx.RLock() + a.head.mtx.Lock() + defer a.head.mtx.Unlock() + + base1 := len(a.head.series) + + for _, l := range a.newSeries { + // We switched locks and have to re-validate that the series were not + // created by another goroutine in the meantime. + if base1 > base0 { + if ms := a.head.get(l.hash, l.labels); ms != nil { + l.ref = uint64(ms.ref) + continue + } + } + // Series is still new. + a.newLabels = append(a.newLabels, l.labels) + + s := a.head.create(l.hash, l.labels) + l.ref = uint64(s.ref) + } + + // Write all new series to the WAL. + if err := a.head.wal.LogSeries(a.newLabels); err != nil { + return errors.Wrap(err, "WAL log series") + } + + return nil +} + +func (a *headAppender) Commit() error { + defer a.head.mtx.RUnlock() + + defer a.head.metrics.activeAppenders.Dec() + defer a.head.putAppendBuffer(a.samples) + + if err := a.createSeries(); err != nil { + return err + } + + // We have to update the refs of samples for series we just created. + for i := range a.samples { + s := &a.samples[i] + if s.Ref&(1<<63) != 0 { + s.Ref = a.newSeries[(s.Ref<<1)>>1].ref + } + } + + // Write all new samples to the WAL and add them to the + // in-mem database on success. + if err := a.head.wal.LogSamples(a.samples); err != nil { + return errors.Wrap(err, "WAL log samples") + } + + total := uint64(len(a.samples)) + + for _, s := range a.samples { + series, ok := a.head.series[uint32(s.Ref)] + if !ok { + return errors.Errorf("series with ID %d not found", s.Ref) + } + ok, chunkCreated := series.append(s.T, s.V) + if !ok { + total-- + } + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + } + + a.head.metrics.samplesAppended.Add(float64(total)) + + for { + ht := a.head.MaxTime() + if a.highTimestamp <= ht { + break + } + if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) { + break + } + } + + return nil +} + +func (a *headAppender) Rollback() error { + a.head.mtx.RUnlock() + + a.head.metrics.activeAppenders.Dec() + a.head.putAppendBuffer(a.samples) + + return nil +} + +// Delete all samples in the range of [mint, maxt] for series that satisfy the given +// label matchers. +func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { + // Do not delete anything beyond the currently valid range. + mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime()) + + ir := h.indexRange(mint, maxt) + + pr := newPostingsReader(ir) + p, absent := pr.Select(ms...) + + var stones []Stone + +Outer: + for p.Next() { + series := h.series[p.At()] + + for _, abs := range absent { + if series.lset.Get(abs) != "" { + continue Outer + } + } + + // Delete only until the current values and not beyond. + t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) + stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + } + + if p.Err() != nil { + return p.Err() + } + if err := h.wal.LogDeletes(stones); err != nil { + return err + } + for _, s := range stones { + h.tombstones.add(s.ref, s.intervals[0]) + } + return nil +} + // gc removes data before the minimum timestmap from the head. -func (h *Head) gc() (seriesRemoved, chunksRemoved int) { +func (h *Head) gc() { + var ( + seriesRemoved int + chunksRemoved int + ) // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() @@ -102,7 +616,6 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) { if len(s.chunks) == 0 { deletedHashes[hash] = append(deletedHashes[hash], s.ref) } - s.mtx.Unlock() } } @@ -142,12 +655,12 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) { continue } delete(h.series, s.ref) + seriesRemoved++ } if len(rem) > 0 { h.hashes[hash] = rem } else { delete(h.hashes, hash) - seriesRemoved++ } } @@ -185,7 +698,10 @@ func (h *Head) gc() (seriesRemoved, chunksRemoved int) { h.symbols = symbols h.values = values - return seriesRemoved, chunksRemoved + h.metrics.seriesRemoved.Add(float64(seriesRemoved)) + h.metrics.series.Sub(float64(seriesRemoved)) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) } func (h *Head) Tombstones() TombstoneReader { @@ -421,6 +937,9 @@ func (h *Head) get(hash uint64, lset labels.Labels) *memSeries { } func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { + h.metrics.series.Inc() + h.metrics.seriesCreated.Inc() + id := atomic.AddUint32(&h.lastSeriesID, 1) s := newMemSeries(lset, id, h.chunkRange) @@ -524,12 +1043,7 @@ func (s *memSeries) appendable(t int64, v float64) error { } func (s *memSeries) chunk(id int) *memChunk { - ix := id - s.firstChunkID - if ix >= len(s.chunks) || ix < 0 { - fmt.Println("get chunk", id, len(s.chunks), s.firstChunkID) - } - - return s.chunks[ix] + return s.chunks[id-s.firstChunkID] } func (s *memSeries) chunkID(pos int) int { @@ -571,6 +1085,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { } if c.samples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) + chunkCreated = true } s.app.Append(t, v) @@ -603,13 +1118,15 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(i int) chunks.Iterator { - c := s.chunk(i) +func (s *memSeries) iterator(id int) chunks.Iterator { + c := s.chunk(id) - if i < len(s.chunks)-1 { + // TODO(fabxc): !!! Test this and everything around chunk ID != list pos. + if id-s.firstChunkID < len(s.chunks)-1 { return c.chunk.Iterator() } - + // Serve the last 4 samples for the last chunk from the series buffer + // as their compressed bytes may be mutated by added samples. it := &memSafeIterator{ Iterator: c.chunk.Iterator(), i: -1, diff --git a/head_test.go b/head_test.go index a19d09c4e..5b9574ebd 100644 --- a/head_test.go +++ b/head_test.go @@ -28,26 +28,20 @@ import ( ) func BenchmarkCreateSeries(b *testing.B) { - lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) + lbls, err := readPrometheusLabels("testdata/all.series", b.N) require.NoError(b, err) - b.Run("", func(b *testing.B) { - dir, err := ioutil.TempDir("", "create_series_bench") + h, err := NewHead(nil, nil, nil, 10000) + if err != nil { require.NoError(b, err) - defer os.RemoveAll(dir) + } - h, err := NewHead(nil, nil, 10000) - if err != nil { - require.NoError(b, err) - } + b.ReportAllocs() + b.ResetTimer() - b.ReportAllocs() - b.ResetTimer() - - for _, l := range lbls[:b.N] { - h.create(l.Hash(), l) - } - }) + for _, l := range lbls { + h.create(l.Hash(), l) + } } func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { diff --git a/wal.go b/wal.go index 2ee889050..fc0109aef 100644 --- a/wal.go +++ b/wal.go @@ -80,7 +80,7 @@ type SegmentWAL struct { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. type WAL interface { - Reader() WALReader + Reader(mint int64) WALReader LogSeries([]labels.Labels) error LogSamples([]RefSample) error LogDeletes([]Stone) error @@ -91,10 +91,11 @@ type WAL interface { type NopWAL struct{} func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w NopWAL) Reader() WALReader { return w } +func (w NopWAL) Reader(int64) WALReader { return w } func (NopWAL) LogSeries([]labels.Labels) error { return nil } func (NopWAL) LogSamples([]RefSample) error { return nil } func (NopWAL) LogDeletes([]Stone) error { return nil } +func (NopWAL) Truncate(int64) error { return nil } func (NopWAL) Close() error { return nil } // WALReader reads entries from a WAL. @@ -162,7 +163,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *SegmentWAL) Reader() WALReader { +func (w *SegmentWAL) Reader(int64) WALReader { return newWALReader(w, w.logger) } @@ -210,7 +211,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { // initSegments finds all existing segment files and opens them in the // appropriate file modes. func (w *SegmentWAL) initSegments() error { - fns, err := sequenceFiles(w.dirFile.Name(), "") + fns, err := sequenceFiles(w.dirFile.Name()) if err != nil { return err } @@ -268,7 +269,7 @@ func (w *SegmentWAL) cut() error { } } - p, _, err := nextSequenceFile(w.dirFile.Name(), "") + p, _, err := nextSequenceFile(w.dirFile.Name()) if err != nil { return err } diff --git a/wal_test.go b/wal_test.go index 8b0ecda84..4f1d3f046 100644 --- a/wal_test.go +++ b/wal_test.go @@ -162,7 +162,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Set smaller segment size so we can actually write several files. w.segmentSize = 1000 * 1000 - r := w.Reader() + r := w.Reader(0) var ( resultSeries [][]labels.Labels @@ -340,7 +340,7 @@ func TestWALRestoreCorrupted(t *testing.T) { w2, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) - r := w2.Reader() + r := w2.Reader(0) serf := func(l []labels.Labels) error { require.Equal(t, 0, len(l)) return nil @@ -370,7 +370,7 @@ func TestWALRestoreCorrupted(t *testing.T) { w3, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) - r = w3.Reader() + r = w3.Reader(0) i = 0 require.NoError(t, r.Read(serf, samplf, delf))