Update prometheus/tsdb dependency

This commit is contained in:
Tobias Schmidt 2017-10-16 15:27:09 +02:00
parent e948721a0b
commit 721050c6cb
13 changed files with 570 additions and 290 deletions

View File

@ -140,7 +140,11 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
}
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
return querier{q: a.db.Querier(mint, maxt)}, nil
q, err := a.db.Querier(mint, maxt)
if err != nil {
return nil, err
}
return querier{q: q}, nil
}
// Appender returns a new appender against the storage.

View File

@ -19,6 +19,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@ -26,33 +27,16 @@ import (
"github.com/prometheus/tsdb/labels"
)
// DiskBlock represents a data block backed by on-disk data.
type DiskBlock interface {
BlockReader
// Directory where block data is stored.
Dir() string
// Stats returns statistics about the block.
Meta() BlockMeta
Delete(mint, maxt int64, m ...labels.Matcher) error
Snapshot(dir string) error
Close() error
}
// BlockReader provides reading access to a data block.
type BlockReader interface {
// Index returns an IndexReader over the block's data.
Index() IndexReader
Index() (IndexReader, error)
// Chunks returns a ChunkReader over the block's data.
Chunks() ChunkReader
Chunks() (ChunkReader, error)
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() TombstoneReader
Tombstones() (TombstoneReader, error)
}
// Appendable defines an entity to which data can be appended.
@ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path)
}
type persistedBlock struct {
// Block represents a directory of time series data covering a continous time range.
type Block struct {
mtx sync.RWMutex
closing bool
pendingReaders sync.WaitGroup
dir string
meta BlockMeta
@ -159,7 +148,9 @@ type persistedBlock struct {
tombstones tombstoneReader
}
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
@ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
return nil, err
}
pb := &persistedBlock{
pb := &Block{
dir: dir,
meta: *meta,
chunkr: cr,
@ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
return pb, nil
}
func (pb *persistedBlock) Close() error {
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
pb.closing = true
pb.mtx.Unlock()
pb.pendingReaders.Wait()
var merr MultiError
merr.Add(pb.chunkr.Close())
merr.Add(pb.indexr.Close())
merr.Add(pb.tombstones.Close())
return merr.Err()
}
func (pb *persistedBlock) String() string {
func (pb *Block) String() string {
return pb.meta.ULID.String()
}
func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Tombstones() TombstoneReader {
return pb.tombstones
}
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
// Dir returns the directory of the block.
func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
func (pb *Block) startRead() error {
pb.mtx.RLock()
defer pb.mtx.RUnlock()
if pb.closing {
return ErrClosing
}
pb.pendingReaders.Add(1)
return nil
}
// Index returns a new IndexReader against the block data.
func (pb *Block) Index() (IndexReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil
}
// Chunks returns a new ChunkReader against the block data.
func (pb *Block) Chunks() (ChunkReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil
}
// Tombstones returns a new TombstoneReader against the block data.
func (pb *Block) Tombstones() (TombstoneReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
}
type blockIndexReader struct {
IndexReader
b *Block
}
func (r blockIndexReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
type blockTombstoneReader struct {
TombstoneReader
b *Block
}
func (r blockTombstoneReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
type blockChunkReader struct {
ChunkReader
b *Block
}
func (r blockChunkReader) Close() error {
r.b.pendingReaders.Done()
return nil
}
// Delete matching series between mint and maxt in the block.
func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
pb.mtx.Lock()
defer pb.mtx.Unlock()
if pb.closing {
return ErrClosing
}
func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
pr := newPostingsReader(pb.indexr)
p, absent := pr.Select(ms...)
@ -262,7 +335,8 @@ Outer:
return writeMetaFile(pb.dir, &pb.meta)
}
func (pb *persistedBlock) Snapshot(dir string) error {
// Snapshot creates snapshot of the block into dir.
func (pb *Block) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return errors.Wrap(err, "create snapshot block dir")
@ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if b > maxt {
b = maxt
}
return a, b
}

View File

@ -21,9 +21,9 @@ import (
"io"
"os"
"github.com/prometheus/tsdb/fileutil"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
)
const (

View File

@ -14,6 +14,7 @@
package tsdb
import (
"io"
"math/rand"
"os"
"path/filepath"
@ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var metas []*BlockMeta
for _, d := range dirs {
b, err := newPersistedBlock(d, c.chunkPool)
b, err := OpenBlock(d, c.chunkPool)
if err != nil {
return err
}
@ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
var (
set compactionSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
)
for i, b := range blocks {
defer func() { closeAll(closers...) }()
symbols, err := b.Index().Symbols()
for i, b := range blocks {
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %s", b)
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %s", b)
}
closers = append(closers, tombsr)
symbols, err := indexr.Symbols()
if err != nil {
return errors.Wrap(err, "read symbols")
}
@ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
allSymbols[s] = struct{}{}
}
indexr := b.Index()
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all)
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
if i == 0 {
set = s
@ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write postings")
}
}
return nil
}

View File

@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup"
"github.com/prometheus/tsdb/fileutil"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/nightlyone/lockfile"
@ -38,6 +37,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
)
@ -105,7 +105,7 @@ type DB struct {
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []DiskBlock
blocks []*Block
head *Head
@ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
return changes, fileutil.Fsync(df)
}
func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) {
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
for _, b := range db.blocks {
if b.Meta().ULID == id {
return b, true
@ -456,7 +456,7 @@ func (db *DB) reload() (err error) {
return errors.Wrap(err, "find blocks")
}
var (
blocks []DiskBlock
blocks []*Block
exist = map[ulid.ULID]struct{}{}
)
@ -468,7 +468,7 @@ func (db *DB) reload() (err error) {
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = newPersistedBlock(dir, db.chunkPool)
b, err = OpenBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
}
@ -505,7 +505,7 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func validateBlockSequence(bs []DiskBlock) error {
func validateBlockSequence(bs []*Block) error {
if len(bs) == 0 {
return nil
}
@ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error {
return nil
}
func (db *DB) Blocks() []DiskBlock {
func (db *DB) String() string {
return "HEAD"
}
// Blocks returns the databases persisted blocks.
func (db *DB) Blocks() []*Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
return db.blocks
}
// Head returns the databases's head.
func (db *DB) Head() *Head {
return db.head
}
@ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
for _, b := range db.Blocks() {
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
}
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) Querier {
db.mtx.RLock()
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
blocks := db.blocksForInterval(mint, maxt)
for _, b := range db.Blocks() {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, db.head)
}
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
db: db,
}
for _, b := range blocks {
sq.blocks = append(sq.blocks, &blockQuerier{
mint: mint,
maxt: maxt,
index: b.Index(),
chunks: b.Chunks(),
tombstones: b.Tombstones(),
})
q, err := NewBlockQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
sq.blocks = append(sq.blocks, q)
}
return sq
return sq, nil
}
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
@ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
var g errgroup.Group
for _, b := range db.blocks {
for _, b := range db.Blocks() {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b DiskBlock) func() error {
g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
}
}
g.Go(func() error {
return db.head.Delete(mint, maxt, ms...)
})
if err := g.Wait(); err != nil {
return err
}
return nil
}
@ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
// blocksForInterval returns all blocks within the partition that may contain
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader {
var bs []BlockReader
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
if maxt >= db.head.MinTime() {
bs = append(bs, db.head)
}
return bs
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false

View File

@ -15,6 +15,7 @@ package tsdb
import (
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -73,6 +74,7 @@ type headMetrics struct {
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Gauge
chunksRemoved prometheus.Gauge
@ -102,6 +104,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_head_series_not_found",
Help: "Total number of requests for series that were not found.",
})
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
@ -118,13 +124,13 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
})
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_max_time",
Help: "Maximum timestamp of the head block.",
}, func() float64 {
return float64(h.MaxTime())
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_head_min_time",
Help: "Minimum time bound of the head block.",
}, func() float64 {
@ -148,6 +154,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.series,
m.seriesCreated,
m.seriesRemoved,
m.seriesNotFound,
m.minTime,
m.maxTime,
m.gcDuration,
@ -178,7 +185,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
series: newStripeSeries(),
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: newMemPostings(),
postings: newUnorderedMemPostings(),
tombstones: newEmptyTombstoneReader(),
}
h.metrics = newHeadMetrics(h, r)
@ -186,28 +193,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
return h, nil
}
// ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error {
r := h.wal.Reader()
mint := h.MinTime()
// processWALSamples adds a partition of samples it receives to the head and passes
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
mint int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs int
seriesFunc := func(series []RefSeries) error {
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for samples := range input {
for _, s := range samples {
if s.T < mint {
if s.T < mint || s.Ref%total != partition {
continue
}
ms := h.series.getByID(s.Ref)
@ -221,9 +219,69 @@ func (h *Head) ReadWAL() error {
h.metrics.chunks.Inc()
}
}
return nil
output <- samples
}
deletesFunc := func(stones []Stone) error {
return unknownRefs
}
// ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error {
defer h.postings.ensureOrder()
r := h.wal.Reader()
mint := h.MinTime()
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
firstInput = make(chan []RefSample, 300)
input = firstInput
)
wg.Add(n)
for i := 0; i < n; i++ {
output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(i, input, output)
// The output feeds the next worker goroutine. For the last worker,
// it feeds the initial input again to reuse the RefSample slices.
input = output
}
// TODO(fabxc): series entries spread between samples can starve the sample workers.
// Even with bufferd channels, this can impact startup time with lots of series churn.
// We must not pralellize series creation itself but could make the indexing asynchronous.
seriesFunc := func(series []RefSeries) {
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
}
}
samplesFunc := func(samples []RefSample) {
var buf []RefSample
select {
case buf = <-input:
default:
buf = make([]RefSample, 0, len(samples)*11/10)
}
firstInput <- append(buf[:0], samples...)
}
deletesFunc := func(stones []Stone) {
for _, s := range stones {
for _, itv := range s.intervals {
if itv.Maxt < mint {
@ -232,16 +290,22 @@ func (h *Head) ReadWAL() error {
h.tombstones.add(s.ref, itv)
}
}
return nil
}
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
// Signal termination to first worker and wait for last one to close its output channel.
close(firstInput)
for range input {
}
wg.Wait()
if err != nil {
return errors.Wrap(err, "consume WAL")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
return nil
}
@ -303,6 +367,23 @@ func (h *Head) initTime(t int64) (initialized bool) {
return true
}
type rangeHead struct {
head *Head
mint, maxt int64
}
func (h *rangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
}
func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return h.head.tombstones, nil
}
// initAppender is a helper to initialize the time bounds of a the head
// upon the first sample it receives.
type initAppender struct {
@ -609,13 +690,14 @@ func (h *Head) gc() {
h.symMtx.Unlock()
}
func (h *Head) Tombstones() TombstoneReader {
return h.tombstones
// Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (TombstoneReader, error) {
return h.tombstones, nil
}
// Index returns an IndexReader against the block.
func (h *Head) Index() IndexReader {
return h.indexRange(math.MinInt64, math.MaxInt64)
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
@ -626,8 +708,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
}
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() ChunkReader {
return h.chunksRange(math.MinInt64, math.MaxInt64)
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
@ -680,10 +762,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
s.Lock()
c := s.chunk(int(cid))
mint, maxt := c.minTime, c.maxTime
s.Unlock()
// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{
@ -710,23 +793,6 @@ func (c *safeChunk) Iterator() chunks.Iterator {
// func (c *safeChunk) Bytes() []byte { panic("illegal") }
// func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
type rangeHead struct {
head *Head
mint, maxt int64
}
func (h *rangeHead) Index() IndexReader {
return h.head.indexRange(h.mint, h.maxt)
}
func (h *rangeHead) Chunks() ChunkReader {
return h.head.chunksRange(h.mint, h.maxt)
}
func (h *rangeHead) Tombstones() TombstoneReader {
return newEmptyTombstoneReader()
}
type headIndexReader struct {
head *Head
mint, maxt int64
@ -780,24 +846,17 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
if err := p.Err(); err != nil {
return errPostings{err: errors.Wrap(err, "expand postings")}
}
var err error
sort.Slice(ep, func(i, j int) bool {
if err != nil {
return false
}
a := h.head.series.getByID(ep[i])
b := h.head.series.getByID(ep[j])
if a == nil || b == nil {
err = errors.Errorf("series not found")
level.Debug(h.head.logger).Log("msg", "looked up series not found")
return false
}
return labels.Compare(a.lset, b.lset) < 0
})
if err != nil {
return errPostings{err: err}
}
return newListPostings(ep)
}
@ -806,6 +865,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
s := h.head.series.getByID(ref)
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return ErrNotFound
}
*lbls = append((*lbls)[:0], s.lset...)
@ -1169,10 +1229,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c = s.cut(t)
chunkCreated = true
}
numSamples := c.chunk.NumSamples()
if c.maxTime >= t {
return false, chunkCreated
}
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
@ -1180,7 +1242,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t
if c.chunk.NumSamples() == samplesPerChunk/4 {
if numSamples == samplesPerChunk/4 {
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
}

View File

@ -19,14 +19,14 @@ import (
"fmt"
"hash"
"io"
"math"
"os"
"path/filepath"
"sort"
"strings"
"math"
"github.com/prometheus/tsdb/fileutil"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
)
@ -525,6 +525,8 @@ type IndexReader interface {
// Postings returns the postings list iterator for the label pair.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
Postings(name, value string) (Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted
@ -533,6 +535,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
// LabelIndices returns the label pairs for which indices exist.

View File

@ -15,6 +15,7 @@ package tsdb
import (
"encoding/binary"
"runtime"
"sort"
"strings"
"sync"
@ -22,14 +23,30 @@ import (
"github.com/prometheus/tsdb/labels"
)
// memPostings holds postings list for series ID per label pair. They may be written
// to out of order.
// ensureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup.
type memPostings struct {
mtx sync.RWMutex
m map[labels.Label][]uint64
mtx sync.RWMutex
m map[labels.Label][]uint64
ordered bool
}
// newMemPoistings returns a memPostings that's ready for reads and writes.
func newMemPostings() *memPostings {
return &memPostings{
m: make(map[labels.Label][]uint64, 512),
m: make(map[labels.Label][]uint64, 512),
ordered: true,
}
}
// newUnorderedMemPostings returns a memPostings that is not safe to be read from
// until ensureOrder was called once.
func newUnorderedMemPostings() *memPostings {
return &memPostings{
m: make(map[labels.Label][]uint64, 512),
ordered: false,
}
}
@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings {
var allPostingsKey = labels.Label{}
// ensurePostings ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner.
func (p *memPostings) ensureOrder() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.ordered {
return
}
n := runtime.GOMAXPROCS(0)
workc := make(chan []uint64)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
for l := range workc {
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
}
wg.Done()
}()
}
for _, l := range p.m {
workc <- l
}
close(workc)
wg.Wait()
p.ordered = true
}
// add adds a document to the index. The caller has to ensure that no
// term argument appears twice.
func (p *memPostings) add(id uint64, lset labels.Labels) {
@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) {
list := append(p.m[l], id)
p.m[l] = list
if !p.ordered {
return
}
// There is no guarantee that no higher ID was inserted before as they may
// be generated independently before adding them to postings.
// We repair order violations on insert. The invariant is that the first n-1

View File

@ -18,6 +18,7 @@ import (
"sort"
"strings"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
@ -50,7 +51,6 @@ type Series interface {
// querier aggregates querying results from time blocks within
// a single partition.
type querier struct {
db *DB
blocks []Querier
}
@ -103,21 +103,30 @@ func (q *querier) Close() error {
for _, bq := range q.blocks {
merr.Add(bq.Close())
}
q.db.mtx.RUnlock()
return merr.Err()
}
// NewBlockQuerier returns a queries against the readers.
func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier {
return &blockQuerier{
index: ir,
chunks: cr,
tombstones: tr,
mint: mint,
maxt: maxt,
func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
indexr, err := b.Index()
if err != nil {
return nil, errors.Wrapf(err, "open index reader")
}
chunkr, err := b.Chunks()
if err != nil {
return nil, errors.Wrapf(err, "open chunk reader")
}
tombsr, err := b.Tombstones()
if err != nil {
return nil, errors.Wrapf(err, "open tombstone reader")
}
return &blockQuerier{
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
}
// blockQuerier provides querying access to a single block database.
@ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
}
func (q *blockQuerier) Close() error {
return nil
var merr MultiError
merr.Add(q.index.Close())
merr.Add(q.chunks.Close())
merr.Add(q.tombstones.Close())
return merr.Err()
}
// postingsReader is used to select matching postings from an IndexReader.
@ -435,6 +450,10 @@ Outer:
for s.p.Next() {
ref := s.p.At()
if err := s.index.Series(ref, &lset, &chunks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == ErrNotFound {
continue
}
s.err = err
return false
}

18
vendor/github.com/prometheus/tsdb/tabwriter.go generated vendored Normal file
View File

@ -0,0 +1,18 @@
package tsdb
import (
"io"
"text/tabwriter"
)
const (
minwidth = 0
tabwidth = 0
padding = 2
padchar = ' '
flags = 0
)
func GetNewTabWriter(output io.Writer) *tabwriter.Writer {
return tabwriter.NewWriter(output, minwidth, tabwidth, padding, padchar, flags)
}

View File

@ -33,9 +33,11 @@ const (
tombstoneFormatV1 = 1
)
// TombstoneReader is the iterator over tombstones.
// TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface {
Get(ref uint64) Intervals
Close() error
}
func writeTombstoneFile(dir string, tr tombstoneReader) error {
@ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv)
}
func (tombstoneReader) Close() error {
return nil
}
// Interval represents a single time-interval.
type Interval struct {
Mint, Maxt int64

View File

@ -27,16 +27,16 @@ import (
"sync"
"time"
"github.com/prometheus/tsdb/fileutil"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
)
// WALEntryType indicates what data a WAL entry contains.
type WALEntryType byte
type WALEntryType uint8
const (
// WALMagic is a 4 byte number every WAL segment file starts with.
@ -54,20 +54,9 @@ const (
WALEntryDeletes WALEntryType = 4
)
// SamplesCB is the callback after reading samples. The passed slice
// is only valid until the call returns.
type SamplesCB func([]RefSample) error
// SeriesCB is the callback after reading series. The passed slice
// is only valid until the call returns.
type SeriesCB func([]RefSeries) error
// DeletesCB is the callback after reading deletes. The passed slice
// is only valid until the call returns.
type DeletesCB func([]Stone) error
type walMetrics struct {
fsyncDuration prometheus.Summary
corruptions prometheus.Counter
}
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
@ -77,10 +66,15 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
Name: "tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
})
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})
if r != nil {
r.MustRegister(
m.fsyncDuration,
m.corruptions,
)
}
return m
@ -104,17 +98,27 @@ func NopWAL() WAL {
type nopWAL struct{}
func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil }
func (w nopWAL) Reader() WALReader { return w }
func (nopWAL) LogSeries([]RefSeries) error { return nil }
func (nopWAL) LogSamples([]RefSample) error { return nil }
func (nopWAL) LogDeletes([]Stone) error { return nil }
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
func (nopWAL) Close() error { return nil }
func (nopWAL) Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error {
return nil
}
func (w nopWAL) Reader() WALReader { return w }
func (nopWAL) LogSeries([]RefSeries) error { return nil }
func (nopWAL) LogSamples([]RefSample) error { return nil }
func (nopWAL) LogDeletes([]Stone) error { return nil }
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil }
func (nopWAL) Close() error { return nil }
// WALReader reads entries from a WAL.
type WALReader interface {
Read(SeriesCB, SamplesCB, DeletesCB) error
Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error
}
// RefSeries is the series labels with the series ID.
@ -170,7 +174,7 @@ func newCRC32() hash.Hash32 {
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
mtx sync.Mutex
metrics *walMetrics
dirFile *os.File
@ -238,15 +242,20 @@ type repairingWALReader struct {
r WALReader
}
func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
err := r.r.Read(series, samples, deletes)
func (r *repairingWALReader) Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error {
err := r.r.Read(seriesf, samplesf, deletesf)
if err == nil {
return nil
}
cerr, ok := err.(walCorruptionErr)
cerr, ok := errors.Cause(err).(walCorruptionErr)
if !ok {
return err
}
r.wal.metrics.corruptions.Inc()
return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset)
}
@ -336,6 +345,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
var (
csf = newSegmentFile(f)
crc32 = newCRC32()
decSeries = []RefSeries{}
activeSeries = []RefSeries{}
)
@ -345,13 +355,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
if rt != WALEntrySeries {
continue
}
series, err := r.decodeSeries(flag, byt)
decSeries = decSeries[:0]
activeSeries = activeSeries[:0]
err := r.decodeSeries(flag, byt, &decSeries)
if err != nil {
return errors.Wrap(err, "decode samples while truncating")
}
activeSeries = activeSeries[:0]
for _, s := range series {
for _, s := range decSeries {
if keep(s.Ref) {
activeSeries = append(activeSeries, s)
}
@ -807,10 +818,6 @@ type walReader struct {
curBuf []byte
lastOffset int64 // offset after last successfully read entry
seriesBuf []RefSeries
sampleBuf []RefSample
tombstoneBuf []Stone
err error
}
@ -831,70 +838,118 @@ func (r *walReader) Err() error {
return r.err
}
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
if seriesf == nil {
seriesf = func([]RefSeries) error { return nil }
}
if samplesf == nil {
samplesf = func([]RefSample) error { return nil }
}
if deletesf == nil {
deletesf = func([]Stone) error { return nil }
}
func (r *walReader) Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error {
// Concurrency for replaying the WAL is very limited. We at least split out decoding and
// processing into separate threads.
// Historically, the processing is the bottleneck with reading and decoding using only
// 15% of the CPU.
var (
seriesPool sync.Pool
samplePool sync.Pool
deletePool sync.Pool
)
donec := make(chan struct{})
datac := make(chan interface{}, 100)
go func() {
defer close(donec)
for x := range datac {
switch v := x.(type) {
case []RefSeries:
if seriesf != nil {
seriesf(v)
}
seriesPool.Put(v[:0])
case []RefSample:
if samplesf != nil {
samplesf(v)
}
samplePool.Put(v[:0])
case []Stone:
if deletesf != nil {
deletesf(v)
}
deletePool.Put(v[:0])
default:
level.Error(r.logger).Log("msg", "unexpected data type")
}
}
}()
var err error
for r.next() {
et, flag, b := r.at()
// In decoding below we never return a walCorruptionErr for now.
// Those should generally be catched by entry decoding before.
switch et {
case WALEntrySeries:
series, err := r.decodeSeries(flag, b)
var series []RefSeries
if v := seriesPool.Get(); v == nil {
series = make([]RefSeries, 0, 512)
} else {
series = v.([]RefSeries)
}
err := r.decodeSeries(flag, b, &series)
if err != nil {
return errors.Wrap(err, "decode series entry")
}
if err := seriesf(series); err != nil {
return err
err = errors.Wrap(err, "decode series entry")
break
}
datac <- series
cf := r.current()
for _, s := range series {
if cf.minSeries > s.Ref {
cf.minSeries = s.Ref
}
}
case WALEntrySamples:
samples, err := r.decodeSamples(flag, b)
var samples []RefSample
if v := samplePool.Get(); v == nil {
samples = make([]RefSample, 0, 512)
} else {
samples = v.([]RefSample)
}
err := r.decodeSamples(flag, b, &samples)
if err != nil {
return errors.Wrap(err, "decode samples entry")
}
if err := samplesf(samples); err != nil {
return err
err = errors.Wrap(err, "decode samples entry")
break
}
datac <- samples
// Update the times for the WAL segment file.
cf := r.current()
for _, s := range samples {
if cf.maxTime < s.T {
cf.maxTime = s.T
}
}
case WALEntryDeletes:
stones, err := r.decodeDeletes(flag, b)
var deletes []Stone
if v := deletePool.Get(); v == nil {
deletes = make([]Stone, 0, 512)
} else {
deletes = v.([]Stone)
}
err := r.decodeDeletes(flag, b, &deletes)
if err != nil {
return errors.Wrap(err, "decode delete entry")
}
if err := deletesf(stones); err != nil {
return err
err = errors.Wrap(err, "decode delete entry")
break
}
datac <- deletes
// Update the times for the WAL segment file.
cf := r.current()
for _, s := range stones {
for _, s := range deletes {
for _, iv := range s.intervals {
if cf.maxTime < iv.Maxt {
cf.maxTime = iv.Maxt
@ -903,27 +958,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
}
}
}
close(datac)
<-donec
return r.Err()
}
// nextEntry retrieves the next entry. It is also used as a testing hook.
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
if r.cur >= len(r.files) {
return 0, 0, nil, io.EOF
if err != nil {
return err
}
cf := r.current()
et, flag, b, err := r.entry(cf)
// If we reached the end of the reader, advance to the next one and close.
// Do not close on the last one as it will still be appended to.
if err == io.EOF && r.cur < len(r.files)-1 {
// Current reader completed. Leave the file open for later reads
// for truncating.
r.cur++
return r.nextEntry()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read entry")
}
return et, flag, b, err
return nil
}
func (r *walReader) at() (WALEntryType, byte, []byte) {
@ -1043,9 +1087,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
return etype, flag, buf, nil
}
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
r.seriesBuf = r.seriesBuf[:0]
func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error {
dec := decbuf{b: b}
for len(dec.b) > 0 && dec.err() == nil {
@ -1059,25 +1101,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
}
sort.Sort(lset)
r.seriesBuf = append(r.seriesBuf, RefSeries{
*res = append(*res, RefSeries{
Ref: ref,
Labels: lset,
})
}
if dec.err() != nil {
return nil, dec.err()
return dec.err()
}
if len(dec.b) > 0 {
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return r.seriesBuf, nil
return nil
}
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error {
if len(b) == 0 {
return nil, nil
return nil
}
r.sampleBuf = r.sampleBuf[:0]
dec := decbuf{b: b}
var (
@ -1090,7 +1131,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
dtime := dec.varint64()
val := dec.be64()
r.sampleBuf = append(r.sampleBuf, RefSample{
*res = append(*res, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
@ -1098,20 +1139,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
}
if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf))
return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res))
}
if len(dec.b) > 0 {
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return r.sampleBuf, nil
return nil
}
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
dec := &decbuf{b: b}
r.tombstoneBuf = r.tombstoneBuf[:0]
for dec.len() > 0 && dec.err() == nil {
r.tombstoneBuf = append(r.tombstoneBuf, Stone{
*res = append(*res, Stone{
ref: dec.be64(),
intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()},
@ -1119,10 +1159,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
})
}
if dec.err() != nil {
return nil, dec.err()
return dec.err()
}
if len(dec.b) > 0 {
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return r.tombstoneBuf, nil
return nil
}

18
vendor/vendor.json vendored
View File

@ -843,28 +843,28 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "u1ERYVx8oD5cH3UkQunUTi9n1WI=",
"checksumSHA1": "h3i8+wLSIqLvWBWjNPcARM0IQik=",
"path": "github.com/prometheus/tsdb",
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
"revisionTime": "2017-10-05T07:27:10Z"
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
},
{
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
"revisionTime": "2017-10-05T07:27:10Z"
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
},
{
"checksumSHA1": "7RhNAVcmDmLFqn9nWiudT0B76f8=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
"revisionTime": "2017-10-05T07:27:10Z"
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
},
{
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "4a7c39d9d8e53151f2137df3b16b7173ce435397",
"revisionTime": "2017-10-05T07:27:10Z"
"revision": "7f8fa07cf7ee8ebde7bdb9ed084f7931c7c0e579",
"revisionTime": "2017-10-12T13:27:08Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",