diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 63c4d5219..3211e8acb 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -56,11 +56,21 @@ type Options struct { // Open returns a new storage backed by a TSDB database that is configured for Prometheus. func Open(path string, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) { + // Start with smallest block duration and create exponential buckets until the exceed the + // configured maximum block duration. + rngs := tsdb.ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 3, 10) + + for i, v := range rngs { + if v > int64(time.Duration(opts.MaxBlockDuration).Seconds()*1000) { + rngs = rngs[:i] + break + } + } + db, err := tsdb.Open(path, nil, r, &tsdb.Options{ WALFlushInterval: 10 * time.Second, - MinBlockDuration: uint64(time.Duration(opts.MinBlockDuration).Seconds() * 1000), - MaxBlockDuration: uint64(time.Duration(opts.MaxBlockDuration).Seconds() * 1000), RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000), + BlockRanges: rngs, NoLockfile: opts.NoLockfile, }) if err != nil { diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 45c8b60cf..53f8b6c4f 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -251,9 +251,12 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { // Choose only valid postings which have chunks in the time-range. stones := map[uint32]intervals{} + var lset labels.Labels + var chks []*ChunkMeta + Outer: for p.Next() { - lset, chunks, err := ir.Series(p.At()) + err := ir.Series(p.At(), &lset, &chks) if err != nil { return err } @@ -264,10 +267,10 @@ Outer: } } - for _, chk := range chunks { + for _, chk := range chks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current vlaues and not beyond. - tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime) + tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) stones[p.At()] = intervals{{tmin, tmax}} continue Outer } diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunk.go b/vendor/github.com/prometheus/tsdb/chunks/chunk.go index 86f456be8..6bed4455f 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunk.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunk.go @@ -13,10 +13,7 @@ package chunks -import ( - "encoding/binary" - "fmt" -) +import "fmt" // Encoding is the identifier for a chunk encoding. type Encoding uint8 @@ -43,16 +40,14 @@ type Chunk interface { Encoding() Encoding Appender() (Appender, error) Iterator() Iterator + NumSamples() int } // FromData returns a chunk from a byte slice of chunk data. func FromData(e Encoding, d []byte) (Chunk, error) { switch e { case EncXOR: - return &XORChunk{ - b: &bstream{count: 0, stream: d}, - num: binary.BigEndian.Uint16(d), - }, nil + return &XORChunk{b: &bstream{count: 0, stream: d}}, nil } return nil, fmt.Errorf("unknown chunk encoding: %d", e) } diff --git a/vendor/github.com/prometheus/tsdb/chunks/xor.go b/vendor/github.com/prometheus/tsdb/chunks/xor.go index a72e9ef0c..501db704a 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/xor.go +++ b/vendor/github.com/prometheus/tsdb/chunks/xor.go @@ -52,8 +52,7 @@ import ( // XORChunk holds XOR encoded sample data. type XORChunk struct { - b *bstream - num uint16 + b *bstream } // NewXORChunk returns a new chunk with XOR encoding of the given size. @@ -72,6 +71,11 @@ func (c *XORChunk) Bytes() []byte { return c.b.bytes() } +// NumSamples returns the number of samples in the chunk. +func (c *XORChunk) NumSamples() int { + return int(binary.BigEndian.Uint16(c.Bytes())) +} + // Appender implements the Chunk interface. func (c *XORChunk) Appender() (Appender, error) { it := c.iterator() diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 7799b640f..0027cd50c 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -30,6 +30,18 @@ import ( "github.com/prometheus/tsdb/labels" ) +// ExponentialBlockRanges returns the time ranges based on the stepSize +func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { + ranges := make([]int64, 0, steps) + curRange := minSize + for i := 0; i < steps; i++ { + ranges = append(ranges, curRange) + curRange = curRange * int64(stepSize) + } + + return ranges +} + // Compactor provides compaction against an underlying storage // of time series data. type Compactor interface { @@ -87,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type compactorOptions struct { - maxBlockRange uint64 + blockRanges []int64 } func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { @@ -133,37 +145,113 @@ func (c *compactor) Plan() ([][]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) - if len(dms) == 0 { + if len(dms) <= 1 { return nil, nil } - sliceDirs := func(i, j int) [][]string { + sliceDirs := func(dms []dirMeta) [][]string { + if len(dms) == 0 { + return nil + } var res []string - for k := i; k < j; k++ { - res = append(res, dms[k].dir) + for _, dm := range dms { + res = append(res, dm.dir) } return [][]string{res} } - // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(dms)-compactionBlocksLen+1; i++ { - if c.match(dms[i : i+3]) { - return sliceDirs(i, i+compactionBlocksLen), nil + planDirs := sliceDirs(c.selectDirs(dms)) + if len(dirs) > 1 { + return planDirs, nil + } + + // Compact any blocks that have >5% tombstones. + for i := len(dms) - 1; i >= 0; i-- { + meta := dms[i].meta + if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] { + break + } + + if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5% + return [][]string{{dms[i].dir}}, nil } } return nil, nil } -func (c *compactor) match(dirs []dirMeta) bool { - g := dirs[0].meta.Compaction.Generation +// selectDirs returns the dir metas that should be compacted into a single new block. +// If only a single block range is configured, the result is always nil. +func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { + if len(c.opts.blockRanges) < 2 || len(ds) < 1 { + return nil + } - for _, d := range dirs { - if d.meta.Compaction.Generation != g { - return false + highTime := ds[len(ds)-1].meta.MinTime + + for _, iv := range c.opts.blockRanges[1:] { + parts := splitByRange(ds, iv) + if len(parts) == 0 { + continue + } + + for _, p := range parts { + mint := p[0].meta.MinTime + maxt := p[len(p)-1].meta.MaxTime + // Pick the range of blocks if it spans the full range (potentially with gaps) + // or is before the most recent block. + // This ensures we don't compact blocks prematurely when another one of the same + // size still fits in the range. + if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { + return p + } } } - return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange + + return nil +} + +// splitByRange splits the directories by the time range. The range sequence starts at 0. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { + var splitDirs [][]dirMeta + + for i := 0; i < len(ds); { + var ( + group []dirMeta + t0 int64 + m = ds[i].meta + ) + // Compute start of aligned time range of size tr closest to the current block's start. + if m.MinTime >= 0 { + t0 = tr * (m.MinTime / tr) + } else { + t0 = tr * ((m.MinTime - tr + 1) / tr) + } + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + i++ + continue + } + + // Add all dirs to the current group that are within [t0, t0+tr]. + for ; i < len(ds); i++ { + // Either the block falls into the next range or doesn't fit at all (checked above). + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + break + } + group = append(group, ds[i]) + } + + if len(group) > 0 { + splitDirs = append(splitDirs, group) + } + } + + return splitDirs } func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { @@ -173,8 +261,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { sources := map[ulid.ULID]struct{}{} for _, b := range blocks { - res.Stats.NumSamples += b.Stats.NumSamples - if b.Compaction.Generation > res.Compaction.Generation { res.Compaction.Generation = b.Compaction.Generation } @@ -312,17 +398,31 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { - var set compactionSet - var metas []BlockMeta - + var ( + set compactionSet + metas []BlockMeta + allSymbols = make(map[string]struct{}, 1<<16) + ) for i, b := range blocks { metas = append(metas, b.Meta()) - all, err := b.Index().Postings("", "") + symbols, err := b.Index().Symbols() + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + for s := range symbols { + allSymbols[s] = struct{}{} + } + + indexr := b.Index() + + all, err := indexr.Postings("", "") if err != nil { return nil, err } - s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all) + all = indexr.SortedPostings(all) + + s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) if i == 0 { set = s @@ -342,9 +442,18 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo meta = compactBlockMetas(metas...) ) + if err := indexw.AddSymbols(allSymbols); err != nil { + return nil, errors.Wrap(err, "add symbols") + } + for set.Next() { lset, chks, dranges := set.At() // The chunks here are not fully deleted. + // Skip the series with all deleted chunks. + if len(chks) == 0 { + continue + } + if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. for _, chk := range chks { @@ -370,10 +479,15 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo return nil, err } - indexw.AddSeries(i, lset, chks...) + if err := indexw.AddSeries(i, lset, chks...); err != nil { + return nil, errors.Wrapf(err, "add series") + } meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ + for _, chk := range chks { + meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) + } for _, l := range lset { valset, ok := values[l.Name] @@ -431,6 +545,7 @@ type compactionSeriesSet struct { index IndexReader chunks ChunkReader tombstones TombstoneReader + series SeriesSet l labels.Labels c []*ChunkMeta @@ -451,11 +566,9 @@ func (c *compactionSeriesSet) Next() bool { if !c.p.Next() { return false } - c.intervals = c.tombstones.Get(c.p.At()) - c.l, c.c, c.err = c.index.Series(c.p.At()) - if c.err != nil { + if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil { return false } @@ -535,14 +648,24 @@ func (c *compactionMerger) Next() bool { if !c.aok && !c.bok || c.Err() != nil { return false } + // While advancing child iterators the memory used for labels and chunks + // may be reused. When picking a series we have to store the result. + var lset labels.Labels + var chks []*ChunkMeta d := c.compare() // Both sets contain the current series. Chain them into a single one. if d > 0 { - c.l, c.c, c.intervals = c.b.At() + lset, chks, c.intervals = c.b.At() + c.l = append(c.l[:0], lset...) + c.c = append(c.c[:0], chks...) + c.bok = c.b.Next() } else if d < 0 { - c.l, c.c, c.intervals = c.a.At() + lset, chks, c.intervals = c.a.At() + c.l = append(c.l[:0], lset...) + c.c = append(c.c[:0], chks...) + c.aok = c.a.Next() } else { l, ca, ra := c.a.At() @@ -551,8 +674,8 @@ func (c *compactionMerger) Next() bool { ra = ra.add(r) } - c.l = l - c.c = append(ca, cb...) + c.l = append(c.l[:0], l...) + c.c = append(append(c.c[:0], ca...), cb...) c.intervals = ra c.aok = c.a.Next() diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index a4801bbf0..928e8e9e9 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -45,8 +45,7 @@ import ( var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds + BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), NoLockfile: false, } @@ -58,12 +57,8 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 - // The timestamp range of head blocks after which they get persisted. - // It's the minimum duration of any persisted block. - MinBlockDuration uint64 - - // The maximum timestamp range of compacted blocks. - MaxBlockDuration uint64 + // The sizes of the Blocks. + BlockRanges []int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool @@ -104,14 +99,13 @@ type DB struct { metrics *dbMetrics opts *Options - // Mutex for that must be held when modifying the general - // block layout. + // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex blocks []Block // Mutex that must be held when modifying just the head blocks // or the general layout. - // Must never be held when acquiring a blocks's mutex! + // mtx must be held before acquiring. headmtx sync.RWMutex heads []headBlock @@ -122,8 +116,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex - compacting bool + cmtx sync.Mutex + compactionsEnabled bool } type dbMetrics struct { @@ -202,13 +196,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - compacting: true, + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compactionsEnabled: true, } db.metrics = newDBMetrics(db, r) @@ -227,9 +221,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(dir, r, l, &compactorOptions{ - maxBlockRange: opts.MaxBlockDuration, - }) + copts := &compactorOptions{ + blockRanges: opts.BlockRanges, + } + + if len(copts.blockRanges) == 0 { + return nil, errors.New("at least one block-range must exist") + } + + for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 { + if len(copts.blockRanges) == 1 { + break + } + + // Max overflow is restricted to 20%. + copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] + } + + db.compactor = newCompactor(dir, r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -315,37 +324,62 @@ func headFullness(h headBlock) float64 { return a / b } +// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendableHeads() (r []headBlock) { + switch l := len(db.heads); l { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + if headFullness(db.heads[l-1]) < 0.5 { + r = append(r, db.heads[l-2]) + } + r = append(r, db.heads[l-1]) + } + return r +} + +func (db *DB) completedHeads() (r []headBlock) { + db.mtx.RLock() + defer db.mtx.RUnlock() + + db.headmtx.RLock() + defer db.headmtx.RUnlock() + + if len(db.heads) < 2 { + return nil + } + + // Select all old heads unless they still have pending appenders. + for _, h := range db.heads[:len(db.heads)-2] { + if h.ActiveWriters() > 0 { + return r + } + r = append(r, h) + } + // Add the 2nd last head if the last head is more than 50% filled. + // Compacting it early allows us to free its memory before allocating + // more for the next block and thus reduces spikes. + h0 := db.heads[len(db.heads)-1] + h1 := db.heads[len(db.heads)-2] + + if headFullness(h0) >= 0.5 && h1.ActiveWriters() == 0 { + r = append(r, h1) + } + return r +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - db.headmtx.RLock() + if !db.compactionsEnabled { + return false, nil + } // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []Block - - // Collect head blocks that are ready for compaction. Write them after - // returning the lock to not block Appenders. - // Selected blocks are semantically ensured to not be written to afterwards - // by appendable(). - if len(db.heads) > 1 { - f := headFullness(db.heads[len(db.heads)-1]) - - for _, h := range db.heads[:len(db.heads)-1] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if h.ActiveWriters() > 0 || f < 0.5 { - break - } - singles = append(singles, h) - } - } - - db.headmtx.RUnlock() - - for _, h := range singles { + for _, h := range db.completedHeads() { select { case <-db.stopc: return changes, nil @@ -551,30 +585,30 @@ func (db *DB) Close() error { // DisableCompactions disables compactions. func (db *DB) DisableCompactions() { - if db.compacting { - db.cmtx.Lock() - db.compacting = false - db.logger.Log("msg", "compactions disabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = false + db.logger.Log("msg", "compactions disabled") } // EnableCompactions enables compactions. func (db *DB) EnableCompactions() { - if !db.compacting { - db.cmtx.Unlock() - db.compacting = true - db.logger.Log("msg", "compactions enabled") - } + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.compactionsEnabled = true + db.logger.Log("msg", "compactions enabled") } // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - db.mtx.Lock() // To block any appenders. - defer db.mtx.Unlock() - db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() + blocks := db.blocks[:] for _, b := range blocks { db.logger.Log("msg", "snapshotting block", "block", b) @@ -667,7 +701,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { } var hb headBlock - for _, h := range a.db.appendable() { + for _, h := range a.db.appendableHeads() { m := h.Meta() if intervalContains(m.MinTime, m.MaxTime-1, t) { @@ -699,20 +733,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { var ( - mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0])) addBuffer = len(db.blocks) == 0 last BlockMeta ) if !addBuffer { last = db.blocks[len(db.blocks)-1].Meta() - addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) + addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0]) } // Create another block of buffer in front if the DB is initialized or retrieving // new data after a long gap. // This ensures we always have a full block width of append window. if addBuffer { - if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { + if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil { return err } // If the previous block reaches into our new window, make it smaller. @@ -779,6 +813,7 @@ func (a *dbAppender) Rollback() error { func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() defer db.mtx.Unlock() @@ -799,18 +834,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } -// appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() (r []headBlock) { - switch len(db.heads) { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - r = append(r, db.heads[len(db.heads)-2:]...) - } - return r -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index cb8e7329a..9e99d3777 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -67,6 +67,7 @@ type HeadBlock struct { // to their chunk descs. hashes map[uint64][]*memSeries + symbols map[string]struct{} values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -117,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, + symbols: map[string]struct{}{}, postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, tombstones: newEmptyTombstoneReader(), @@ -332,7 +334,12 @@ func (h *HeadBlock) Snapshot(snapshotDir string) error { func (h *HeadBlock) Dir() string { return h.dir } // Index returns an IndexReader against the block. -func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *HeadBlock) Index() IndexReader { + h.mtx.RLock() + defer h.mtx.RUnlock() + + return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)} +} // Chunks returns a ChunkReader against the block. func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } @@ -340,14 +347,10 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } // Querier returns a new Querier against the block for the range [mint, maxt]. func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() - defer h.mtx.RUnlock() - if h.closed { panic(fmt.Sprintf("block %s already closed", h.dir)) } - - // Reference on the original slice to use for postings mapping. - series := h.series[:] + h.mtx.RUnlock() return &blockQuerier{ mint: mint, @@ -355,27 +358,6 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { index: h.Index(), chunks: h.Chunks(), tombstones: h.Tombstones(), - - postingsMapper: func(p Postings) Postings { - ep := make([]uint32, 0, 64) - - for p.Next() { - // Skip posting entries that include series added after we - // instantiated the querier. - if int(p.At()) >= len(series) { - break - } - ep = append(ep, p.At()) - } - if err := p.Err(); err != nil { - return errPostings{err: errors.Wrap(err, "expand postings")} - } - - sort.Slice(ep, func(i, j int) bool { - return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0 - }) - return newListPostings(ep) - }, } } @@ -661,6 +643,12 @@ func (c *safeChunk) Iterator() chunks.Iterator { type headIndexReader struct { *HeadBlock + // Highest series that existed when the index reader was instantiated. + maxSeries uint32 +} + +func (h *headIndexReader) Symbols() (map[string]struct{}, error) { + return h.symbols, nil } // LabelValues returns the possible label values @@ -689,33 +677,59 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) { return h.postings.get(term{name: name, value: value}), nil } -// Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { +func (h *headIndexReader) SortedPostings(p Postings) Postings { h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.series) { - return nil, nil, ErrNotFound + ep := make([]uint32, 0, 1024) + + for p.Next() { + // Skip posting entries that include series added after we + // instantiated the index reader. + if p.At() > h.maxSeries { + break + } + ep = append(ep, p.At()) + } + if err := p.Err(); err != nil { + return errPostings{err: errors.Wrap(err, "expand postings")} + } + + sort.Slice(ep, func(i, j int) bool { + return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0 + }) + return newListPostings(ep) +} + +// Series returns the series for the given reference. +func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if ref > h.maxSeries { + return ErrNotFound } s := h.series[ref] if s == nil { - return nil, nil, ErrNotFound + return ErrNotFound } - metas := make([]*ChunkMeta, 0, len(s.chunks)) + *lbls = append((*lbls)[:0], s.lset...) s.mtx.RLock() defer s.mtx.RUnlock() + *chks = (*chks)[:0] + for i, c := range s.chunks { - metas = append(metas, &ChunkMeta{ + *chks = append(*chks, &ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), }) } - return s.lset, metas, nil + return nil } func (h *headIndexReader) LabelIndices() ([][]string, error) { @@ -760,6 +774,9 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { valset.set(l.Value) h.postings.add(s.ref, term{name: l.Name, value: l.Value}) + + h.symbols[l.Name] = struct{}{} + h.symbols[l.Value] = struct{}{} } h.postings.add(s.ref, term{}) diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index 3264d9263..c948ee27c 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -61,7 +61,9 @@ func (s indexWriterSeriesSlice) Less(i, j int) bool { type indexWriterStage uint8 const ( - idxStagePopulate indexWriterStage = iota + idxStageNone indexWriterStage = iota + idxStageSymbols + idxStageSeries idxStageLabelIndex idxStagePostings idxStageDone @@ -69,8 +71,12 @@ const ( func (s indexWriterStage) String() string { switch s { - case idxStagePopulate: - return "populate" + case idxStageNone: + return "none" + case idxStageSymbols: + return "symbols" + case idxStageSeries: + return "series" case idxStageLabelIndex: return "label index" case idxStagePostings: @@ -82,12 +88,18 @@ func (s indexWriterStage) String() string { } // IndexWriter serializes the index for a block of series data. -// The methods must generally be called in the order they are specified in. +// The methods must be called in the order they are specified in. type IndexWriter interface { + // AddSymbols registers all string symbols that are encountered in series + // and other indices. + AddSymbols(sym map[string]struct{}) error + // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. - // The reference number is used to resolve a series against the postings - // list iterator. It only has to be available during the write processing. + // Implementations may require series to be insert in increasing order by + // their labels. + // The reference numbers are used to resolve entries in postings lists that + // are added later. AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. @@ -118,10 +130,13 @@ type indexWriter struct { buf2 encbuf uint32s []uint32 - series map[uint32]*indexWriterSeries - symbols map[string]uint32 // symbol offsets - labelIndexes []hashEntry // label index offsets - postings []hashEntry // postings lists offsets + symbols map[string]uint32 // symbol offsets + seriesOffsets map[uint32]uint64 // offsets of series + labelIndexes []hashEntry // label index offsets + postings []hashEntry // postings lists offsets + + // Hold last series to validate that clients insert new series in order. + lastSeries labels.Labels crc32 hash.Hash } @@ -152,7 +167,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, - stage: idxStagePopulate, + stage: idxStageNone, // Reusable memory. buf1: encbuf{b: make([]byte, 0, 1<<22)}, @@ -160,9 +175,9 @@ func newIndexWriter(dir string) (*indexWriter, error) { uint32s: make([]uint32, 0, 1<<15), // Caches. - symbols: make(map[string]uint32, 1<<13), - series: make(map[uint32]*indexWriterSeries, 1<<16), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + symbols: make(map[string]uint32, 1<<13), + seriesOffsets: make(map[uint32]uint64, 1<<16), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := iw.writeMeta(); err != nil { return nil, err @@ -207,20 +222,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } - // Complete population stage by writing symbols and series. - if w.stage == idxStagePopulate { - w.toc.symbols = w.pos - if err := w.writeSymbols(); err != nil { - return err - } - w.toc.series = w.pos - if err := w.writeSeries(); err != nil { - return err - } - } - // Mark start of sections in table of contents. switch s { + case idxStageSymbols: + w.toc.symbols = w.pos + case idxStageSeries: + w.toc.series = w.pos + case idxStageLabelIndex: w.toc.labelIndices = w.pos @@ -254,26 +262,65 @@ func (w *indexWriter) writeMeta() error { } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { - if _, ok := w.series[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) + if err := w.ensureStage(idxStageSeries); err != nil { + return err } - // Populate the symbol table from all label sets we have to reference. - for _, l := range lset { - w.symbols[l.Name] = 0 - w.symbols[l.Value] = 0 + if labels.Compare(lset, w.lastSeries) <= 0 { + return errors.Errorf("out-of-order series added with label set %q", lset) } - w.series[ref] = &indexWriterSeries{ - labels: lset, - chunks: chunks, + if _, ok := w.seriesOffsets[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) } + w.seriesOffsets[ref] = w.pos + + w.buf2.reset() + w.buf2.putUvarint(len(lset)) + + for _, l := range lset { + offset, ok := w.symbols[l.Name] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", l.Name) + } + w.buf2.putUvarint32(offset) + + offset, ok = w.symbols[l.Value] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", l.Value) + } + w.buf2.putUvarint32(offset) + } + + w.buf2.putUvarint(len(chunks)) + + for _, c := range chunks { + w.buf2.putVarint64(c.MinTime) + w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(c.Ref) + } + + w.buf1.reset() + w.buf1.putUvarint(w.buf2.len()) + + w.buf2.putHash(w.crc32) + + if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { + return errors.Wrap(err, "write series data") + } + + w.lastSeries = append(w.lastSeries[:0], lset...) + return nil } -func (w *indexWriter) writeSymbols() error { +func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { + if err := w.ensureStage(idxStageSymbols); err != nil { + return err + } // Generate sorted list of strings we will store as reference table. - symbols := make([]string, 0, len(w.symbols)) - for s := range w.symbols { + symbols := make([]string, 0, len(sym)) + + for s := range sym { symbols = append(symbols, s) } sort.Strings(symbols) @@ -285,12 +332,14 @@ func (w *indexWriter) writeSymbols() error { w.buf2.putBE32int(len(symbols)) + w.symbols = make(map[string]uint32, len(symbols)) + for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) // NOTE: len(s) gives the number of runes, not the number of bytes. // Therefore the read-back length for strings with unicode characters will - // be off when not using putCstr. + // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -301,55 +350,6 @@ func (w *indexWriter) writeSymbols() error { return errors.Wrap(err, "write symbols") } -func (w *indexWriter) writeSeries() error { - // Series must be stored sorted along their labels. - series := make(indexWriterSeriesSlice, 0, len(w.series)) - - for _, s := range w.series { - series = append(series, s) - } - sort.Sort(series) - - // Header holds number of series. - w.buf1.reset() - w.buf1.putBE32int(len(series)) - - if err := w.write(w.buf1.get()); err != nil { - return errors.Wrap(err, "write series count") - } - - for _, s := range series { - s.offset = uint32(w.pos) - - w.buf2.reset() - w.buf2.putUvarint(len(s.labels)) - - for _, l := range s.labels { - w.buf2.putUvarint32(w.symbols[l.Name]) - w.buf2.putUvarint32(w.symbols[l.Value]) - } - - w.buf2.putUvarint(len(s.chunks)) - - for _, c := range s.chunks { - w.buf2.putVarint64(c.MinTime) - w.buf2.putVarint64(c.MaxTime) - w.buf2.putUvarint64(c.Ref) - } - - w.buf1.reset() - w.buf1.putUvarint(w.buf2.len()) - - w.buf2.putHash(w.crc32) - - if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { - return errors.Wrap(err, "write series data") - } - } - - return nil -} - func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { if len(values)%len(names) != 0 { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) @@ -379,7 +379,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { w.buf2.putBE32int(valt.Len()) for _, v := range valt.s { - w.buf2.putBE32(w.symbols[v]) + offset, ok := w.symbols[v] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", v) + } + w.buf2.putBE32(offset) } w.buf1.reset() @@ -450,11 +454,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { refs := w.uint32s[:0] for it.Next() { - s, ok := w.series[it.At()] + offset, ok := w.seriesOffsets[it.At()] if !ok { - return errors.Errorf("series for reference %d not found", it.At()) + return errors.Errorf("%p series for reference %d not found", w, it.At()) } - refs = append(refs, s.offset) + refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out. } if err := it.Err(); err != nil { return err @@ -503,6 +507,10 @@ func (w *indexWriter) Close() error { // IndexReader provides reading access of serialized index data. type IndexReader interface { + // Symbols returns a set of string symbols that may occur in series' labels + // and indices. + Symbols() (map[string]struct{}, error) + // LabelValues returns the possible label values LabelValues(names ...string) (StringTuples, error) @@ -510,8 +518,13 @@ type IndexReader interface { // The Postings here contain the offsets to the series inside the index. Postings(name, value string) (Postings, error) - // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []*ChunkMeta, error) + // SortedPostings returns a postings list that is reordered to be sorted + // by the label set of the underlying series. + SortedPostings(Postings) Postings + + // Series populates the given labels and chunk metas for the series identified + // by the reference. + Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -664,6 +677,21 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { return s, nil } +func (r *indexReader) Symbols() (map[string]struct{}, error) { + d1 := r.decbufAt(int(r.toc.symbols)) + d2 := d1.decbuf(d1.be32int()) + + count := d2.be32int() + sym := make(map[string]struct{}, count) + + for ; count > 0; count-- { + s := d2.uvarintStr() + sym[s] = struct{}{} + } + + return sym, d2.err() +} + func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { const sep = "\xff" @@ -712,36 +740,37 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { +func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { d1 := r.decbufAt(int(ref)) d2 := d1.decbuf(int(d1.uvarint())) + *lbls = (*lbls)[:0] + *chks = (*chks)[:0] + k := int(d2.uvarint()) - lbls := make(labels.Labels, 0, k) for i := 0; i < k; i++ { lno := uint32(d2.uvarint()) lvo := uint32(d2.uvarint()) if d2.err() != nil { - return nil, nil, errors.Wrap(d2.err(), "read series label offsets") + return errors.Wrap(d2.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) if err != nil { - return nil, nil, errors.Wrap(err, "lookup label name") + return errors.Wrap(err, "lookup label name") } lv, err := r.lookupSymbol(lvo) if err != nil { - return nil, nil, errors.Wrap(err, "lookup label value") + return errors.Wrap(err, "lookup label value") } - lbls = append(lbls, labels.Label{Name: ln, Value: lv}) + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. k = int(d2.uvarint()) - chunks := make([]*ChunkMeta, 0, k) for i := 0; i < k; i++ { mint := d2.varint64() @@ -749,10 +778,10 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { off := d2.uvarint64() if d2.err() != nil { - return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) + return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - chunks = append(chunks, &ChunkMeta{ + *chks = append(*chks, &ChunkMeta{ Ref: off, MinTime: mint, MaxTime: maxt, @@ -761,7 +790,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { // TODO(fabxc): verify CRC32. - return lbls, chunks, nil + return nil } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -787,6 +816,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return newBigEndianPostings(d2.get()), nil } +func (r *indexReader) SortedPostings(p Postings) Postings { + return p +} + type stringTuples struct { l int // tuple length s []string // flattened tuple entries diff --git a/vendor/github.com/prometheus/tsdb/labels/selector.go b/vendor/github.com/prometheus/tsdb/labels/selector.go index a8a7eeeaa..4f29452cf 100644 --- a/vendor/github.com/prometheus/tsdb/labels/selector.go +++ b/vendor/github.com/prometheus/tsdb/labels/selector.go @@ -13,7 +13,10 @@ package labels -import "regexp" +import ( + "regexp" + "strings" +) // Selector holds constraints for matching against a label set. type Selector []Matcher @@ -84,3 +87,22 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) } func Not(m Matcher) Matcher { return ¬Matcher{m} } + +// PrefixMatcher implements Matcher for labels which values matches prefix. +type PrefixMatcher struct { + name, prefix string +} + +// NewPrefixMatcher returns new Matcher for label name matching prefix. +func NewPrefixMatcher(name, prefix string) Matcher { + return &PrefixMatcher{name: name, prefix: prefix} +} + +// Name implements Matcher interface. +func (m *PrefixMatcher) Name() string { return m.name } + +// Prefix returns matching prefix. +func (m *PrefixMatcher) Prefix() string { return m.prefix } + +// Matches implements Matcher interface. +func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) } diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 523a67398..a54acdd5a 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -15,6 +15,7 @@ package tsdb import ( "fmt" + "sort" "strings" "github.com/prometheus/tsdb/chunks" @@ -53,8 +54,8 @@ type querier struct { blocks []Querier } -// Querier returns a new querier over the data partition for the given -// time range. +// Querier returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() @@ -133,8 +134,6 @@ type blockQuerier struct { chunks ChunkReader tombstones TombstoneReader - postingsMapper func(Postings) Postings - mint, maxt int64 } @@ -143,10 +142,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { p, absent := pr.Select(ms...) - if q.postingsMapper != nil { - p = q.postingsMapper(p) - } - return &blockSeriesSet{ set: &populatedChunkSeries{ set: &baseChunkSeries{ @@ -217,7 +212,38 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { p := Intersect(its...) - return p, absent + return r.index.SortedPostings(p), absent +} + +// tuplesByPrefix uses binary search to find prefix matches within ts. +func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) { + var outErr error + tslen := ts.Len() + i := sort.Search(tslen, func(i int) bool { + vs, err := ts.At(i) + if err != nil { + outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err) + return true + } + val := vs[0] + l := len(m.Prefix()) + if l > len(vs) { + l = len(val) + } + return val[:l] >= m.Prefix() + }) + if outErr != nil { + return nil, outErr + } + var matches []string + for ; i < tslen; i++ { + vs, err := ts.At(i) + if err != nil || !m.Matches(vs[0]) { + return matches, err + } + matches = append(matches, vs[0]) + } + return matches, nil } func (r *postingsReader) selectSingle(m labels.Matcher) Postings { @@ -230,22 +256,27 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { return it } - // TODO(fabxc): use interface upgrading to provide fast solution - // for prefix matches. Tuples are lexicographically sorted. tpls, err := r.index.LabelValues(m.Name()) if err != nil { return errPostings{err: err} } var res []string - - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) + if pm, ok := m.(*labels.PrefixMatcher); ok { + res, err = tuplesByPrefix(pm, tpls) if err != nil { return errPostings{err: err} } - if m.Matches(vals[0]) { - res = append(res, vals[0]) + + } else { + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + return errPostings{err: err} + } + if m.Matches(vals[0]) { + res = append(res, vals[0]) + } } } @@ -397,11 +428,14 @@ func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { + var ( + lset labels.Labels + chunks []*ChunkMeta + ) Outer: for s.p.Next() { ref := s.p.At() - lset, chunks, err := s.index.Series(ref) - if err != nil { + if err := s.index.Series(ref, &lset, &chunks); err != nil { s.err = err return false } diff --git a/vendor/vendor.json b/vendor/vendor.json index 7ac9a5eb9..37b51618b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -853,22 +853,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "GgHaU/6pJjJ7I8aTfaZXnV/OWxA=", + "checksumSHA1": "beuXFIZLYTpeK3uRpnWxgm0dPvg=", "path": "github.com/prometheus/tsdb", - "revision": "969c407335d68cbd8154dcd1bca6259786b27f53", - "revisionTime": "2017-07-12T11:54:31Z" + "revision": "912302877bfc98f632e8df61938e4e9600cef945", + "revisionTime": "2017-08-10T08:08:49Z" }, { - "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=", + "checksumSHA1": "bHZjrxtacFkQRNI8/yj3gOOd9aA=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "d492bfd973c24026ab784c1c1821af426bc80e90", - "revisionTime": "2017-06-30T13:17:34Z" + "revision": "912302877bfc98f632e8df61938e4e9600cef945", + "revisionTime": "2017-08-10T08:08:49Z" }, { - "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=", + "checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=", "path": "github.com/prometheus/tsdb/labels", - "revision": "d492bfd973c24026ab784c1c1821af426bc80e90", - "revisionTime": "2017-06-30T13:17:34Z" + "revision": "912302877bfc98f632e8df61938e4e9600cef945", + "revisionTime": "2017-08-10T08:08:49Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",