From 5aa7f7cce8c1e8164719c1f47c5156584209fc09 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 4 Jan 2017 21:11:15 +0100 Subject: [PATCH] Compact head block into persisted block --- compact.go | 150 +++++++++++++++++++++-------------------------------- db.go | 14 ++--- head.go | 47 ----------------- 3 files changed, 63 insertions(+), 148 deletions(-) diff --git a/compact.go b/compact.go index 5b207fc34..13e29a56f 100644 --- a/compact.go +++ b/compact.go @@ -2,7 +2,6 @@ package tsdb import ( "fmt" - "math" "os" "path/filepath" "sync" @@ -92,21 +91,31 @@ func (c *compactor) run() { for range c.triggerc { c.metrics.triggered.Inc() - bs := c.pick() - if len(bs) == 0 { - continue - } + // Compact as long as there are candidate blocks. + for { + rev := c.pick() + var bs []block + for _, b := range rev { + bs = append([]block{b}, bs...) + } - start := time.Now() - err := c.compact(bs...) + c.logger.Log("msg", "picked for compaction", "candidates", fmt.Sprintf("%v", bs)) - c.metrics.ran.Inc() - c.metrics.duration.Observe(time.Since(start).Seconds()) + if len(bs) == 0 { + break + } - if err != nil { - c.logger.Log("msg", "compaction failed", "err", err) - c.metrics.failed.Inc() - continue + start := time.Now() + err := c.compact(bs...) + + c.metrics.ran.Inc() + c.metrics.duration.Observe(time.Since(start).Seconds()) + + if err != nil { + c.logger.Log("msg", "compaction failed", "err", err) + c.metrics.failed.Inc() + break + } } // Drain channel of signals triggered during compaction. @@ -118,31 +127,35 @@ func (c *compactor) run() { close(c.donec) } +const ( + compactionMaxSize = 1 << 30 // 1GB + compactionBlocks = 2 +) + func (c *compactor) pick() []block { bs := c.blocks.compactable() if len(bs) == 0 { return nil } - - if !bs[len(bs)-1].persisted() { - // TODO(fabxc): double check scoring function here or only do it here - // and trigger every X scrapes? - return bs[len(bs)-1:] + if len(bs) == 1 && !bs[0].persisted() { + return bs } - candidate := []block{} - trange := int64(math.MaxInt64) - - for i, b := range bs[:len(bs)-1] { - r := bs[i+1].stats().MaxTime - b.stats().MinTime - - if r < trange { - trange = r - candidate = bs[i : i+1] + for i := 0; i+1 < len(bs); i += 2 { + tpl := bs[i : i+2] + if compactionMatch(tpl) { + return tpl } } + return nil +} - return candidate +func compactionMatch(blocks []block) bool { + // TODO(fabxc): check whether combined size is below maxCompactionSize. + // Apply maximum time range? or number of series? – might already be covered by size implicitly. + + // Blocks should be roughly equal in size. + return true } func (c *compactor) Close() error { @@ -215,6 +228,11 @@ func (c *compactor) compact(blocks ...block) error { if err := renameDir(tmpdir, blocks[0].dir()); err != nil { return errors.Wrap(err, "rename dir") } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.dir()); err != nil { + return errors.Wrap(err, "delete dir") + } + } var merr MultiError @@ -256,6 +274,7 @@ func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWrite if err := chunkw.WriteSeries(i, lset, chunks); err != nil { return err } + fmt.Println("next", lset, chunks) stats.ChunkCount += uint32(len(chunks)) stats.SeriesCount++ @@ -367,9 +386,9 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { type compactionMerger struct { a, b compactionSet - adone, bdone bool - l labels.Labels - c []ChunkMeta + aok, bok bool + l labels.Labels + c []ChunkMeta } type compactionSeries struct { @@ -384,17 +403,17 @@ func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { } // Initialize first elements of both sets as Next() needs // one element look-ahead. - c.adone = !c.a.Next() - c.bdone = !c.b.Next() + c.aok = c.a.Next() + c.bok = c.b.Next() return c, c.Err() } func (c *compactionMerger) compare() int { - if c.adone { + if !c.aok { return 1 } - if c.bdone { + if !c.bok { return -1 } a, _ := c.a.At() @@ -403,7 +422,7 @@ func (c *compactionMerger) compare() int { } func (c *compactionMerger) Next() bool { - if c.adone && c.bdone || c.Err() != nil { + if !c.aok && !c.bok || c.Err() != nil { return false } @@ -411,10 +430,10 @@ func (c *compactionMerger) Next() bool { // Both sets contain the current series. Chain them into a single one. if d > 0 { c.l, c.c = c.b.At() - c.bdone = !c.b.Next() + c.bok = c.b.Next() } else if d < 0 { c.l, c.c = c.a.At() - c.adone = !c.a.Next() + c.aok = c.a.Next() } else { l, ca := c.a.At() _, cb := c.b.At() @@ -422,8 +441,8 @@ func (c *compactionMerger) Next() bool { c.l = l c.c = append(ca, cb...) - c.adone = !c.a.Next() - c.bdone = !c.b.Next() + c.aok = c.a.Next() + c.bok = c.b.Next() } return true } @@ -439,57 +458,6 @@ func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { return c.l, c.c } -func persist(dir string, write func(IndexWriter, SeriesWriter) error) error { - tmpdir := dir + ".tmp" - - // Write to temporary directory to make persistence appear atomic. - if fileutil.Exist(tmpdir) { - if err := os.RemoveAll(tmpdir); err != nil { - return err - } - } - if err := fileutil.CreateDirAll(tmpdir); err != nil { - return err - } - - chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - - indexw := newIndexWriter(indexf) - chunkw := newSeriesWriter(chunkf, indexw) - - if err := write(indexw, chunkw); err != nil { - return err - } - - if err := chunkw.Close(); err != nil { - return err - } - if err := indexw.Close(); err != nil { - return err - } - if err := fileutil.Fsync(chunkf.File); err != nil { - return err - } - if err := fileutil.Fsync(indexf.File); err != nil { - return err - } - if err := chunkf.Close(); err != nil { - return err - } - if err := indexf.Close(); err != nil { - return err - } - - return renameDir(tmpdir, dir) -} - func renameDir(from, to string) error { if err := os.RemoveAll(to); err != nil { return err diff --git a/db.go b/db.go index 74dc66e0f..6b7703806 100644 --- a/db.go +++ b/db.go @@ -335,7 +335,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { } // TODO(fabxc): randomize over time and use better scoring function. - if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 500 { + if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { if err := s.cut(); err != nil { s.logger.Log("msg", "cut failed", "err", err) } @@ -383,12 +383,6 @@ func (s *Shard) reinit(dir string) error { return nil } - // If a block dir has to be reinitialized and it wasn't a deletion, - // it has to be a newly persisted or compacted one. - if !fileutil.Exist(chunksFileName(dir)) { - return errors.New("no chunk file for new block dir") - } - // Remove a previous head block. if i, ok := s.headForDir(dir); ok { if err := s.heads[i].Close(); err != nil { @@ -419,7 +413,7 @@ func (s *Shard) reinit(dir string) error { func (s *Shard) compactable() []block { var blocks []block for _, pb := range s.persisted { - blocks = append(blocks, pb) + blocks = append([]block{pb}, blocks...) } // threshold := s.heads[len(s.heads)-1].bstats.MaxTime - headGracePeriod @@ -430,7 +424,7 @@ func (s *Shard) compactable() []block { // } // } for _, hb := range s.heads[:len(s.heads)-1] { - blocks = append(blocks, hb) + blocks = append([]block{hb}, blocks...) } return blocks @@ -565,7 +559,7 @@ type MultiError []error func (es MultiError) Error() string { var buf bytes.Buffer - if len(es) > 0 { + if len(es) > 1 { fmt.Fprintf(&buf, "%d errors: ", len(es)) } diff --git a/head.go b/head.go index 4cdf38bd6..f946d9e3d 100644 --- a/head.go +++ b/head.go @@ -270,50 +270,3 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { return nil } - -func (h *HeadBlock) persist(indexw IndexWriter, chunkw SeriesWriter) error { - if err := h.wal.Close(); err != nil { - return err - } - - for ref, cd := range h.descs { - if err := chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ - { - MinTime: cd.firstTimestamp, - MaxTime: cd.lastTimestamp, - Chunk: cd.chunk, - }, - }); err != nil { - return err - } - } - - if err := indexw.WriteStats(h.bstats); err != nil { - return err - } - for n, v := range h.values { - s := make([]string, 0, len(v)) - for x := range v { - s = append(s, x) - } - - if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return err - } - } - - for t := range h.postings.m { - if err := indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { - return err - } - } - // Write a postings list containing all series. - all := make([]uint32, len(h.descs)) - for i := range all { - all[i] = uint32(i) - } - if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return err - } - return nil -}