diff --git a/tsdb/db.go b/tsdb/db.go index 2f53a02b1..53257b798 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -324,7 +324,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error { } mint := head.MinTime() maxt := head.MaxTime() - rh := &rangeHead{ + rh := &RangeHead{ head: head, mint: mint, maxt: maxt, @@ -685,41 +685,61 @@ func (db *DB) Compact() (err error) { maxt := rangeForTimestamp(mint, db.head.chunkRange) // Wrap head into a range that bounds all reads to it. - head := &rangeHead{ - head: db.head, - mint: mint, - // We remove 1 millisecond from maxt because block - // intervals are half-open: [b.MinTime, b.MaxTime). But - // chunk intervals are closed: [c.MinTime, c.MaxTime]; - // so in order to make sure that overlaps are evaluated - // consistently, we explicitly remove the last value - // from the block interval here. - maxt: maxt - 1, + // We remove 1 millisecond from maxt because block + // intervals are half-open: [b.MinTime, b.MaxTime). But + // chunk intervals are closed: [c.MinTime, c.MaxTime]; + // so in order to make sure that overlaps are evaluated + // consistently, we explicitly remove the last value + // from the block interval here. + head := NewRangeHead(db.head, mint, maxt-1) + if err := db.compactHead(head, mint, maxt); err != nil { + return err } - uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) - if err != nil { - return errors.Wrap(err, "persist head block") - } - - runtime.GC() - - if err := db.reload(); err != nil { - if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) - } - return errors.Wrap(err, "reload blocks") - } - if (uid == ulid.ULID{}) { - // Compaction resulted in an empty block. - // Head truncating during db.reload() depends on the persisted blocks and - // in this case no new block will be persisted so manually truncate the head. - if err = db.head.Truncate(maxt); err != nil { - return errors.Wrap(err, "head truncate failed (in compact)") - } - } - runtime.GC() } + return db.compactBlocks() +} + +// CompactHead compacts the given the RangeHead. +func (db *DB) CompactHead(head *RangeHead, mint, maxt int64) (err error) { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + return db.compactHead(head, mint, maxt) +} + +// compactHead compacts the given the RangeHead. +// The compaction mutex should be held before calling this method. +func (db *DB) compactHead(head *RangeHead, mint, maxt int64) (err error) { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { + return errors.Wrap(err, "persist head block") + } + + runtime.GC() + + if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) + } + return errors.Wrap(err, "reload blocks") + } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } + runtime.GC() + + return nil +} + +// compactBlocks compacts all the eligible on-disk blocks. +// The compaction mutex should be held before calling this method. +func (db *DB) compactBlocks() (err error) { // Check for compactions of multiple blocks. for { plan, err := db.compactor.Plan(db.dir) @@ -1192,7 +1212,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { mint := db.head.MinTime() maxt := db.head.MaxTime() - head := &rangeHead{ + head := &RangeHead{ head: db.head, mint: mint, maxt: maxt, @@ -1221,7 +1241,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } } if maxt >= db.head.MinTime() { - blocks = append(blocks, &rangeHead{ + blocks = append(blocks, &RangeHead{ head: db.head, mint: mint, maxt: maxt, diff --git a/tsdb/head.go b/tsdb/head.go index 5769c770a..f00abde97 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -748,36 +748,45 @@ func (h *Head) initTime(t int64) (initialized bool) { return true } -type rangeHead struct { +type RangeHead struct { head *Head mint, maxt int64 } -func (h *rangeHead) Index() (IndexReader, error) { +// NewRangeHead returns a *rangeHead. +func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { + return &RangeHead{ + head: head, + mint: mint, + maxt: maxt, + } +} + +func (h *RangeHead) Index() (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil } -func (h *rangeHead) Chunks() (ChunkReader, error) { +func (h *RangeHead) Chunks() (ChunkReader, error) { return h.head.chunksRange(h.mint, h.maxt), nil } -func (h *rangeHead) Tombstones() (tombstones.Reader, error) { +func (h *RangeHead) Tombstones() (tombstones.Reader, error) { return h.head.tombstones, nil } -func (h *rangeHead) MinTime() int64 { +func (h *RangeHead) MinTime() int64 { return h.mint } -func (h *rangeHead) MaxTime() int64 { +func (h *RangeHead) MaxTime() int64 { return h.maxt } -func (h *rangeHead) NumSeries() uint64 { +func (h *RangeHead) NumSeries() uint64 { return h.head.NumSeries() } -func (h *rangeHead) Meta() BlockMeta { +func (h *RangeHead) Meta() BlockMeta { return BlockMeta{ MinTime: h.MinTime(), MaxTime: h.MaxTime(),