diff --git a/block.go b/block.go index 8e9d0c2e7..41dc640e4 100644 --- a/block.go +++ b/block.go @@ -68,9 +68,9 @@ type persistedBlock struct { } type blockMeta struct { - *BlockMeta - Version int `json:"version"` + + *BlockMeta } const metaFilename = "meta.json" diff --git a/compact.go b/compact.go index 9267dd5ef..20c46aaa8 100644 --- a/compact.go +++ b/compact.go @@ -49,7 +49,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type compactorOptions struct { - maxBlocks uint8 maxBlockRange uint64 maxSize uint64 } @@ -61,9 +60,14 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { } } +type compactionInfo struct { + generation int + mint, maxt int64 +} + // pick returns a range [i, j] in the blocks that are suitable to be compacted // into a single block at position i. -func (c *compactor) pick(bs []Block) (i, j int, ok bool) { +func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { last := len(bs) - 1 if len(bs) == 0 { @@ -71,8 +75,8 @@ func (c *compactor) pick(bs []Block) (i, j int, ok bool) { } // Make sure we always compact the last block if unpersisted. - if bs[last].Meta().Compaction.Generation == 0 { - if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) { + if bs[last].generation == 0 { + if len(bs) >= 3 && c.match(bs[last-2:last+1]) { return last - 2, last, true } return last, last, true @@ -80,55 +84,40 @@ func (c *compactor) pick(bs []Block) (i, j int, ok bool) { for i := len(bs); i-3 >= 0; i -= 3 { tpl := bs[i-3 : i] - if compactionMatch(tpl) { + if c.match(tpl) { return i - 3, i - 1, true } } return 0, 0, false } -func compactionMatch(blocks []Block) bool { - g := blocks[0].Meta().Compaction.Generation +func (c *compactor) match(bs []compactionInfo) bool { + g := bs[0].generation if g >= 5 { return false } - for _, b := range blocks[1:] { - if b.Meta().Compaction.Generation == 0 { + for _, b := range bs { + if b.generation == 0 { continue } - if b.Meta().Compaction.Generation != g { + if b.generation != g { return false } } - return true - // TODO(fabxc): check whether combined size is below maxCompactionSize. - // Apply maximum time range? or number of series? – might already be covered by size implicitly. - - // Naively check whether both blocks have roughly the same number of samples - // and whether the total sample count doesn't exceed 2GB chunk file size - // by rough approximation. - n := float64(blocks[0].Meta().Stats.NumSamples) - t := n - - for _, b := range blocks[1:] { - m := float64(b.Meta().Stats.NumSamples) - - if m < 0.7*n || m > 1.3*n { - return false - } - t += m - } - - // Pessimistic 10 bytes/sample should do. - return t < 10*200e6 + return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange } func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MinTime = blocks[0].Meta().MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime - res.Compaction.Generation = blocks[0].Meta().Compaction.Generation + 1 + + g := blocks[0].Meta().Compaction.Generation + if g == 0 && len(blocks) > 1 { + g++ + } + res.Compaction.Generation = g + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples diff --git a/db.go b/db.go index 99aa16810..0e869f0eb 100644 --- a/db.go +++ b/db.go @@ -132,7 +132,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { } db.compactor = newCompactor(r, &compactorOptions{ maxBlockRange: opts.MaxBlockRange, - maxBlocks: 3, maxSize: 1 << 29, // 512MB }) @@ -148,31 +147,49 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { func (db *DB) run() { defer close(db.donec) - for { - select { - case <-db.cutc: - db.mtx.Lock() - _, err := db.cut() - db.mtx.Unlock() - - if err != nil { - db.logger.Log("msg", "cut failed", "err", err) - } else { - select { - case db.compactc <- struct{}{}: - default: - } - } - // Drain cut channel so we don't trigger immediately again. + go func() { + for { select { case <-db.cutc: - default: - } + db.mtx.Lock() + _, err := db.cut() + db.mtx.Unlock() + if err != nil { + db.logger.Log("msg", "cut failed", "err", err) + } else { + select { + case db.compactc <- struct{}{}: + default: + } + } + // Drain cut channel so we don't trigger immediately again. + select { + case <-db.cutc: + default: + } + case <-db.stopc: + } + } + }() + + for { + select { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - i, j, ok := db.compactor.pick(db.compactable()) + var infos []compactionInfo + for _, b := range db.compactable() { + m := b.Meta() + + infos = append(infos, compactionInfo{ + generation: m.Compaction.Generation, + mint: *m.MinTime, + maxt: *m.MaxTime, + }) + } + + i, j, ok := db.compactor.pick(infos) if !ok { continue } @@ -180,6 +197,7 @@ func (db *DB) run() { db.logger.Log("msg", "compaction failed", "err", err) continue } + db.logger.Log("msg", "compaction completed") // Trigger another compaction in case there's more work to do. select { case db.compactc <- struct{}{}: diff --git a/writer.go b/writer.go index 5f250b02e..886e518bd 100644 --- a/writer.go +++ b/writer.go @@ -1,6 +1,7 @@ package tsdb import ( + "bufio" "encoding/binary" "hash/crc32" "io" @@ -8,7 +9,6 @@ import ( "strings" "github.com/bradfitz/slice" - "github.com/coreos/etcd/pkg/ioutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" @@ -43,7 +43,7 @@ type SeriesWriter interface { // serialization format. type seriesWriter struct { ow io.Writer - w *ioutil.PageWriter + w *bufio.Writer n int64 c int @@ -53,7 +53,7 @@ type seriesWriter struct { func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { return &seriesWriter{ ow: w, - w: ioutil.NewPageWriter(w, compactionPageBytes, 0), + w: bufio.NewWriterSize(w, 1*1024*1024), n: 0, index: index, } @@ -180,7 +180,7 @@ type indexWriterSeries struct { // serialization format. type indexWriter struct { ow io.Writer - w *ioutil.PageWriter + w *bufio.Writer n int64 started bool @@ -193,7 +193,7 @@ type indexWriter struct { func newIndexWriter(w io.Writer) *indexWriter { return &indexWriter{ - w: ioutil.NewPageWriter(w, compactionPageBytes, 0), + w: bufio.NewWriterSize(w, 1*1024*1024), ow: w, n: 0, symbols: make(map[string]uint32, 4096),