Improve compaction processing

This commit is contained in:
Fabian Reinartz 2017-01-18 06:18:32 +01:00
parent 5ceca3c810
commit d2322f6095
3 changed files with 134 additions and 165 deletions

View File

@ -17,6 +17,7 @@ type Block interface {
Index() IndexReader
Series() SeriesReader
Persisted() bool
Close() error
}
// BlockStats provides stats on a data block.

View File

@ -13,7 +13,7 @@ import (
type compactor struct {
metrics *compactorMetrics
blocks compactableBlocks
opts *compactorOptions
}
type compactorMetrics struct {
@ -48,49 +48,42 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
return m
}
type blockStore interface {
blocks() []Block
type compactorOptions struct {
maxBlocks uint8
maxBlockRange uint64
maxSize uint64
}
type compactableBlocks interface {
compactable() []Block
}
func newCompactor(blocks compactableBlocks, r prometheus.Registerer) (*compactor, error) {
c := &compactor{
blocks: blocks,
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
return &compactor{
opts: opts,
metrics: newCompactorMetrics(r),
}
return c, nil
}
const (
compactionMaxSize = 1 << 30 // 1GB
compactionBlocks = 2
)
func (c *compactor) pick() []Block {
bs := c.blocks.compactable()
// pick returns a range [i, j] in the blocks that are suitable to be compacted
// into a single block at position i.
func (c *compactor) pick(bs []Block) (i, j int, ok bool) {
last := len(bs) - 1
if len(bs) == 0 {
return nil
return 0, 0, false
}
if len(bs) == 1 && !bs[0].Persisted() {
return bs
}
if !bs[0].Persisted() {
if len(bs) == 2 || !compactionMatch(bs[:3]) {
return bs[:1]
// Make sure we always compact the last block if unpersisted.
if !bs[last].Persisted() {
if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) {
return last - 2, last, true
}
return last, last, true
}
for i := 0; i+2 < len(bs); i += 3 {
tpl := bs[i : i+3]
if compactionMatch(tpl) {
return tpl
return i, i + 2, true
}
}
return nil
return 0, 0, false
}
func compactionMatch(blocks []Block) bool {
@ -106,7 +99,7 @@ func compactionMatch(blocks []Block) bool {
for _, b := range blocks[1:] {
m := float64(b.Stats().SampleCount)
if m < 0.8*n || m > 1.2*n {
if m < 0.7*n || m > 1.3*n {
return false
}
t += m
@ -184,6 +177,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error {
var set compactionSet
for i, b := range blocks {
all, err := b.Index().Postings("", "")
if err != nil {

246
db.go
View File

@ -27,15 +27,14 @@ import (
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestampdb.
var DefaultOptions = &Options{
Retention: 15 * 24 * 3600 * 1000, // 15 days
DisableWAL: false,
WALFlushInterval: 5 * time.Second,
MaxBlockRange: 24 * 60 * 60 * 1000, // 1 day in milliseconds
}
// Options of the DB storage.
type Options struct {
Retention int64
DisableWAL bool
WALFlushInterval time.Duration
MaxBlockRange uint64
}
// Appender allows appending a batch of data. It must be completed with a
@ -66,6 +65,7 @@ type DB struct {
dir string
logger log.Logger
metrics *dbMetrics
opts *Options
mtx sync.RWMutex
persisted []*persistedBlock
@ -107,8 +107,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
}
// Open returns a new DB in the given directory.
func Open(dir string, logger log.Logger) (db *DB, err error) {
// Create directory if partition is new.
func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
if !fileutil.Exist(dir) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
@ -117,22 +116,29 @@ func Open(dir string, logger log.Logger) (db *DB, err error) {
var r prometheus.Registerer
// r := prometheus.DefaultRegisterer
if opts == nil {
opts = DefaultOptions
}
db = &DB{
dir: dir,
logger: logger,
metrics: newDBMetrics(r),
opts: opts,
compactc: make(chan struct{}, 1),
cutc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
db.compactor = newCompactor(r, &compactorOptions{
maxBlockRange: opts.MaxBlockRange,
maxBlocks: 3,
maxSize: 1 << 29, // 512MB
})
if err := db.initBlocks(); err != nil {
return nil, err
}
if db.compactor, err = newCompactor(db, r); err != nil {
return nil, err
}
go db.run()
@ -166,26 +172,18 @@ func (db *DB) run() {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
for {
blocks := db.compactor.pick()
if len(blocks) == 0 {
break
}
// TODO(fabxc): pick emits blocks in order. compact acts on
// inverted order. Put inversion into compactor?
var bs []Block
for _, b := range blocks {
bs = append([]Block{b}, bs...)
}
select {
case <-db.stopc:
return
default:
}
if err := db.compact(bs); err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
i, j, ok := db.compactor.pick(db.compactable())
if !ok {
continue
}
if err := db.compact(i, j); err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
continue
}
// Trigger another compaction in case there's more work to do.
select {
case db.compactc <- struct{}{}:
default:
}
case <-db.stopc:
@ -194,41 +192,80 @@ func (db *DB) run() {
}
}
func (db *DB) compact(blocks []Block) error {
if len(blocks) == 0 {
return nil
func (db *DB) getBlock(i int) Block {
if i < len(db.persisted) {
return db.persisted[i]
}
tmpdir := blocks[0].Dir() + ".tmp"
return db.heads[i-len(db.persisted)]
}
// TODO(fabxc): find a better place to do this transparently.
for _, b := range blocks {
if h, ok := b.(*headBlock); ok {
h.updateMapping()
// removeBlocks removes the blocks in range [i, j] from the list of persisted
// and head blocks. The blocks are not closed and their files not deleted.
func (db *DB) removeBlocks(i, j int) {
for k := i; k <= j; k++ {
if i < len(db.persisted) {
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
} else {
l := i - len(db.persisted)
db.heads = append(db.heads[:l], db.heads[l+1:]...)
}
}
}
func (db *DB) blocks() (bs []Block) {
for _, b := range db.persisted {
bs = append(bs, b)
}
for _, b := range db.heads {
bs = append(bs, b)
}
return bs
}
func (db *DB) compact(i, j int) error {
if j < i {
return errors.New("invalid compaction block range")
}
var blocks []Block
for k := i; k <= j; k++ {
blocks = append(blocks, db.getBlock(k))
}
var (
dir = blocks[0].Dir()
tmpdir = dir + ".tmp"
)
if err := db.compactor.compact(tmpdir, blocks...); err != nil {
return err
}
pb, err := newPersistedBlock(tmpdir)
if err != nil {
return err
}
db.mtx.Lock()
defer db.mtx.Unlock()
if err := renameDir(tmpdir, blocks[0].Dir()); err != nil {
if err := renameDir(tmpdir, dir); err != nil {
return errors.Wrap(err, "rename dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.Dir()); err != nil {
return errors.Wrap(err, "delete dir")
pb.dir = dir
db.removeBlocks(i, j)
db.persisted = append(db.persisted, pb)
for i, b := range blocks {
if err := b.Close(); err != nil {
return errors.Wrap(err, "close old block")
}
if i > 0 {
if err := os.RemoveAll(b.Dir()); err != nil {
return errors.Wrap(err, "removing old block")
}
}
}
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(db.reinit(b.Dir()), "reinit block at %q", b.Dir()))
}
return merr.Err()
return nil
}
func isBlockDir(fi os.FileInfo) bool {
@ -244,23 +281,33 @@ func isBlockDir(fi os.FileInfo) bool {
return true
}
func blockDirs(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var dirs []string
for _, fi := range files {
if isBlockDir(fi) {
dirs = append(dirs, filepath.Join(dir, fi.Name()))
}
}
return dirs, nil
}
func (db *DB) initBlocks() error {
var (
pbs []*persistedBlock
heads []*headBlock
persisted []*persistedBlock
heads []*headBlock
)
files, err := ioutil.ReadDir(db.dir)
dirs, err := blockDirs(db.dir)
if err != nil {
return err
}
for _, fi := range files {
if !isBlockDir(fi) {
continue
}
dir := filepath.Join(db.dir, fi.Name())
for _, dir := range dirs {
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := openHeadBlock(dir, db.logger)
if err != nil {
@ -269,31 +316,14 @@ func (db *DB) initBlocks() error {
heads = append(heads, h)
continue
}
b, err := newPersistedBlock(dir)
if err != nil {
return err
}
pbs = append(pbs, b)
persisted = append(persisted, b)
}
// Validate that blocks are sequential in time.
lastTime := int64(math.MinInt64)
for _, b := range pbs {
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.Stats().MaxTime
}
for _, b := range heads {
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.Stats().MaxTime
}
db.persisted = pbs
db.persisted = persisted
db.heads = heads
if len(heads) == 0 {
@ -322,6 +352,7 @@ func (db *DB) Close() error {
return merr.Err()
}
// Appender returns a new Appender on the database.
func (db *DB) Appender() Appender {
db.mtx.RLock()
@ -403,68 +434,16 @@ func (db *DB) persistedForDir(dir string) (int, bool) {
return -1, false
}
func (db *DB) reinit(dir string) error {
if !fileutil.Exist(dir) {
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
}
if i, ok := db.persistedForDir(dir); ok {
if err := db.persisted[i].Close(); err != nil {
return err
}
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
}
return nil
}
// Remove a previous head block.
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
}
// Close an old persisted block.
i, ok := db.persistedForDir(dir)
if ok {
if err := db.persisted[i].Close(); err != nil {
return err
}
}
pb, err := newPersistedBlock(dir)
if err != nil {
return errors.Wrap(err, "open persisted block")
}
if i >= 0 {
db.persisted[i] = pb
} else {
db.persisted = append(db.persisted, pb)
}
return nil
}
func (db *DB) compactable() []Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
var blocks []Block
for _, pb := range db.persisted {
blocks = append([]Block{pb}, blocks...)
blocks = append(blocks, pb)
}
// threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod
// for _, hb := range db.heads {
// if hb.bstatdb.MaxTime < threshold {
// blocks = append(blocks, hb)
// }
// }
for _, hb := range db.heads[:len(db.heads)-1] {
blocks = append([]Block{hb}, blocks...)
blocks = append(blocks, hb)
}
return blocks
@ -505,9 +484,6 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
return bs
}
// TODO(fabxc): make configurable.
const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut() error {
@ -548,7 +524,6 @@ func (db *DB) nextBlockDir() (string, error) {
// PartitionedDB is a time series storage.
type PartitionedDB struct {
logger log.Logger
opts *Options
dir string
partitionPow uint
@ -577,7 +552,6 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition
}
c := &PartitionedDB{
logger: l,
opts: opts,
dir: dir,
partitionPow: uint(math.Log2(float64(n))),
}
@ -589,7 +563,7 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition
l := log.NewContext(l).With("partition", i)
d := partitionDir(dir, i)
s, err := Open(d, l)
s, err := Open(d, l, opts)
if err != nil {
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
}