diff --git a/block.go b/block.go index 0eb2b7c92..792a96e9f 100644 --- a/block.go +++ b/block.go @@ -1,4 +1,5 @@ // Copyright 2017 The Prometheus Authors + // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -52,13 +53,13 @@ type DiskBlock interface { type Block interface { DiskBlock Queryable + Snapshottable } // headBlock is a regular block that can still be appended to. type headBlock interface { Block Appendable - Snapshottable } // Snapshottable defines an entity that can be backedup online. @@ -278,6 +279,42 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } +func (pb *persistedBlock) Snapshot(dir string) error { + blockDir := filepath.Join(dir, pb.meta.ULID.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot block dir") + } + + chunksDir := chunkDir(blockDir) + if err := os.MkdirAll(chunksDir, 0777); err != nil { + return errors.Wrap(err, "create snapshot chunk dir") + } + + // Hardlink meta, index and tombstones + filenames := []string{metaFilename, indexFilename, tombstoneFilename} + for _, fname := range filenames { + if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { + return errors.Wrapf(err, "create snapshot %s", fname) + } + } + + // Hardlink the chunks + curChunkDir := chunkDir(pb.dir) + files, err := ioutil.ReadDir(curChunkDir) + if err != nil { + return errors.Wrap(err, "ReadDir the current chunk dir") + } + + for _, f := range files { + err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name())) + if err != nil { + return errors.Wrap(err, "hardlink a chunk") + } + } + + return nil +} + func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } diff --git a/db.go b/db.go index a7aea28e8..f7a646372 100644 --- a/db.go +++ b/db.go @@ -121,7 +121,8 @@ type DB struct { stopc chan struct{} // cmtx is used to control compactions and deletions. - cmtx sync.Mutex + cmtx sync.Mutex + compacting bool } type dbMetrics struct { @@ -200,12 +201,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + compacting: true, } db.metrics = newDBMetrics(db, r) @@ -528,27 +530,31 @@ func (db *DB) Close() error { return merr.Err() } -// DisableCompactions disables compactions. -func (db *DB) DisableCompactions() error { - db.stopc <- struct{}{} // TODO: Can this block? - db.cmtx.Lock() - return nil -} +// ToggleCompactions toggles compactions and returns if compactions are on or not. +func (db *DB) ToggleCompactions() bool { + if db.compacting { + db.cmtx.Lock() + db.compacting = false + return false + } -// EnableCompactions enables compactions. -func (db *DB) EnableCompactions() error { db.cmtx.Unlock() - return nil + db.compacting = true + return true } // Snapshot writes the current headBlock snapshots to snapshots directory. func (db *DB) Snapshot(dir string) error { - db.headmtx.RLock() - heads := db.heads[:] - db.headmtx.RUnlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() - for _, h := range heads { - if err := h.Snapshot(dir); err != nil { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + blocks := db.blocks[:] + for _, b := range blocks { + db.logger.Log("msg", "compacting block", "block", b.Dir()) + if err := b.Snapshot(dir); err != nil { return errors.Wrap(err, "error snapshotting headblock") } } diff --git a/head.go b/head.go index 426f1d96a..8bf0762bb 100644 --- a/head.go +++ b/head.go @@ -263,11 +263,10 @@ Outer: } // Snapshot persists the current state of the headblock to the given directory. +// TODO(gouthamve): Snapshot must be called when there are no active appenders. +// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should +// be removed in the future. func (h *HeadBlock) Snapshot(snapshotDir string) error { - // Needed to stop any appenders. - h.mtx.Lock() - defer h.mtx.Unlock() - if h.meta.Stats.NumSeries == 0 { return nil } diff --git a/index.go b/index.go index b36eb49bd..3264d9263 100644 --- a/index.go +++ b/index.go @@ -39,6 +39,8 @@ const ( indexFormatV1 = 1 ) +const indexFilename = "index" + const compactionPageBytes = minSectorSize * 64 type indexWriterSeries struct { @@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { if err != nil { return nil, err } - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) + f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err }