Pool Chunk objects during compaction

This commit is contained in:
Fabian Reinartz 2017-08-08 17:35:34 +02:00
parent 2644c8665c
commit 66ff7b12e9
6 changed files with 142 additions and 73 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
@ -181,13 +182,13 @@ type persistedBlock struct {
tombstones tombstoneReader
}
func newPersistedBlock(dir string) (*persistedBlock, error) {
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
cr, err := newChunkReader(chunkDir(dir))
cr, err := newChunkReader(chunkDir(dir), pool)
if err != nil {
return nil, err
}

View File

@ -238,20 +238,22 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
}
}
b := make([]byte, binary.MaxVarintLen32)
seq := uint64(w.seq()) << 32
var (
b = [binary.MaxVarintLen32]byte{}
seq = uint64(w.seq()) << 32
)
for i := range chks {
chk := &chks[i]
chk.Ref = seq | uint64(w.n)
n := binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
if err := w.write(b[:n]); err != nil {
return err
}
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
b[0] = byte(chk.Chunk.Encoding())
if err := w.write(b[:1]); err != nil {
return err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
@ -262,7 +264,7 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
if err := chk.writeHash(w.crc32); err != nil {
return err
}
if err := w.write(w.crc32.Sum(nil)); err != nil {
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
return err
}
}
@ -295,15 +297,20 @@ type chunkReader struct {
// Closers for resources behind the byte slices.
cs []io.Closer
pool chunks.Pool
}
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string) (*chunkReader, error) {
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
if err != nil {
return nil, err
}
var cr chunkReader
if pool == nil {
pool = chunks.NewPool()
}
cr := chunkReader{pool: pool}
for _, fn := range files {
f, err := openMmapFile(fn)
@ -350,11 +357,6 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
return nil, fmt.Errorf("reading chunk length failed")
}
b = b[n:]
enc := chunks.Encoding(b[0])
c, err := chunks.FromData(enc, b[1:1+l])
if err != nil {
return nil, err
}
return c, nil
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
}

View File

@ -13,7 +13,12 @@
package chunks
import "fmt"
import (
"fmt"
"sync"
"github.com/pkg/errors"
)
// Encoding is the identifier for a chunk encoding.
type Encoding uint8
@ -63,3 +68,50 @@ type Iterator interface {
Err() error
Next() bool
}
type Pool interface {
Put(Chunk) error
Get(e Encoding, b []byte) (Chunk, error)
}
// Pool is a memory pool of chunk objects.
type pool struct {
xor sync.Pool
}
func NewPool() Pool {
return &pool{
xor: sync.Pool{
New: func() interface{} {
return &XORChunk{b: &bstream{}}
},
},
}
}
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
switch e {
case EncXOR:
c := p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
return c, nil
}
return nil, errors.Errorf("invalid encoding %q", e)
}
func (p *pool) Put(c Chunk) error {
switch c.Encoding() {
case EncXOR:
xc, ok := c.(*XORChunk)
if !ok {
return nil
}
xc.b.stream = nil
xc.b.count = 0
p.xor.Put(c)
default:
return errors.Errorf("invalid encoding %q", c.Encoding())
}
return nil
}

View File

@ -100,9 +100,15 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
type compactorOptions struct {
blockRanges []int64
chunkPool chunks.Pool
}
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
func NewCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
if opts == nil {
opts = &compactorOptions{
chunkPool: chunks.NewPool(),
}
}
return &compactor{
dir: dir,
opts: opts,
@ -288,7 +294,7 @@ func (c *compactor) Compact(dirs ...string) (err error) {
var blocks []Block
for _, d := range dirs {
b, err := newPersistedBlock(d)
b, err := newPersistedBlock(d, c.opts.chunkPool)
if err != nil {
return err
}
@ -350,7 +356,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
return errors.Wrap(err, "open index writer")
}
meta, err := populateBlock(blocks, indexw, chunkw)
meta, err := c.populateBlock(blocks, indexw, chunkw)
if err != nil {
return errors.Wrap(err, "write compaction")
}
@ -397,7 +403,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
func (c *compactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
var (
set compactionSet
metas []BlockMeta
@ -474,7 +480,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
}
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
return nil, err
}
@ -489,6 +494,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range chks {
c.opts.chunkPool.Put(chk.Chunk)
}
for _, l := range lset {
valset, ok := values[l.Name]
if !ok {
@ -685,6 +694,7 @@ func (c *compactionMerger) Next() bool {
c.aok = c.a.Next()
c.bok = c.b.Next()
}
return true
}

14
db.go
View File

@ -37,6 +37,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
@ -95,9 +96,10 @@ type DB struct {
dir string
lockf *lockfile.Lockfile
logger log.Logger
metrics *dbMetrics
opts *Options
logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunks.Pool
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
@ -203,6 +205,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}),
stopc: make(chan struct{}),
compactionsEnabled: true,
chunkPool: chunks.NewPool(),
}
db.metrics = newDBMetrics(db, r)
@ -223,6 +226,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
copts := &compactorOptions{
blockRanges: opts.BlockRanges,
chunkPool: db.chunkPool,
}
if len(copts.blockRanges) == 0 {
@ -238,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
}
db.compactor = newCompactor(dir, r, l, copts)
db.compactor = NewCompactor(dir, r, l, copts)
if err := db.reloadBlocks(); err != nil {
return nil, err
@ -508,7 +512,7 @@ func (db *DB) reloadBlocks() (err error) {
if meta.Compaction.Generation == 0 {
b, err = db.openHeadBlock(dir)
} else {
b, err = newPersistedBlock(dir)
b, err = newPersistedBlock(dir, db.chunkPool)
}
if err != nil {
return errors.Wrapf(err, "open block %s", dir)

92
head.go
View File

@ -270,62 +270,62 @@ Outer:
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
// be removed in the future.
func (h *HeadBlock) Snapshot(snapshotDir string) error {
if h.meta.Stats.NumSeries == 0 {
return nil
}
// if h.meta.Stats.NumSeries == 0 {
// return nil
// }
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
// entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
// uid := ulid.MustNew(ulid.Now(), entropy)
dir := filepath.Join(snapshotDir, uid.String())
tmp := dir + ".tmp"
// dir := filepath.Join(snapshotDir, uid.String())
// tmp := dir + ".tmp"
if err := os.RemoveAll(tmp); err != nil {
return err
}
// if err := os.RemoveAll(tmp); err != nil {
// return err
// }
if err := os.MkdirAll(tmp, 0777); err != nil {
return err
}
// if err := os.MkdirAll(tmp, 0777); err != nil {
// return err
// }
// Populate chunk and index files into temporary directory with
// data of all blocks.
chunkw, err := newChunkWriter(chunkDir(tmp))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
indexw, err := newIndexWriter(tmp)
if err != nil {
return errors.Wrap(err, "open index writer")
}
// // Populate chunk and index files into temporary directory with
// // data of all blocks.
// chunkw, err := newChunkWriter(chunkDir(tmp))
// if err != nil {
// return errors.Wrap(err, "open chunk writer")
// }
// indexw, err := newIndexWriter(tmp)
// if err != nil {
// return errors.Wrap(err, "open index writer")
// }
meta, err := populateBlock([]Block{h}, indexw, chunkw)
if err != nil {
return errors.Wrap(err, "write snapshot")
}
meta.ULID = uid
meta.MaxTime = h.highTimestamp
// meta, err := h.compactor.populateBlock([]Block{h}, indexw, chunkw, nil)
// if err != nil {
// return errors.Wrap(err, "write snapshot")
// }
// meta.ULID = uid
// meta.MaxTime = h.highTimestamp
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// if err = writeMetaFile(tmp, meta); err != nil {
// return errors.Wrap(err, "write merged meta")
// }
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer")
}
// if err = chunkw.Close(); err != nil {
// return errors.Wrap(err, "close chunk writer")
// }
// if err = indexw.Close(); err != nil {
// return errors.Wrap(err, "close index writer")
// }
// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
// // Create an empty tombstones file.
// if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
// return errors.Wrap(err, "write new tombstones file")
// }
// Block successfully written, make visible
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
// // Block successfully written, make visible
// if err := renameFile(tmp, dir); err != nil {
// return errors.Wrap(err, "rename block dir")
// }
return nil
}