refactor block size calculation (#637)

* refactor block size calculation
The block size is kept in memory and not in the meta file anymore.
It now includes the size of the meta file itself for a more
correct block size.
It fixes a bug where the size didn't change when calling `block.Delete()`.
Adds a dedicated test to ensure correct block sizes.
This allows opening a db in a read only mode as it doesn't write to the meta file anymore.

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>
This commit is contained in:
Krasi Georgiev 2019-06-24 18:42:29 +03:00 committed by GitHub
parent b5f9f9f0b4
commit 8d86e921cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 160 additions and 121 deletions

View File

@ -1,5 +1,7 @@
## master / unreleased
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
- [BUGFIX] Re-calculate block size when calling `block.Delete`.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.

111
block.go
View File

@ -151,12 +151,6 @@ type Appendable interface {
Appender() Appender
}
// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}
// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
@ -183,7 +177,6 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}
// BlockDesc describes a block by ULID and time range.
@ -214,24 +207,24 @@ const metaFilename = "meta.json"
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func readMetaFile(dir string) (*BlockMeta, error) {
func readMetaFile(dir string) (*BlockMeta, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
return nil, 0, err
}
var m BlockMeta
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
return nil, 0, err
}
if m.Version != 1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
return nil, 0, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
return &m, int64(len(b)), nil
}
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error) {
meta.Version = 1
// Make any changes to the file appear atomic.
@ -245,26 +238,32 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}
enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
jsonMeta, err := json.MarshalIndent(meta, "", "\t")
if err != nil {
return 0, err
}
var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
n, err := f.Write(jsonMeta)
if err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
if merr.Add(f.Sync()); merr.Err() != nil {
if err := f.Sync(); err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
if err := f.Close(); err != nil {
return err
return 0, err
}
return fileutil.Replace(tmp, path)
return int64(n), fileutil.Replace(tmp, path)
}
// Block represents a directory of time series data covering a continuous time range.
@ -285,6 +284,11 @@ type Block struct {
tombstones TombstoneReader
logger log.Logger
numBytesChunks int64
numBytesIndex int64
numBytesTombstone int64
numBytesMeta int64
}
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
@ -302,7 +306,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
err = merr.Err()
}
}()
meta, err := readMetaFile(dir)
meta, sizeMeta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
@ -319,43 +323,28 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)
tr, tsr, err := readTombstones(dir)
tr, sizeTomb, err := readTombstones(dir)
if err != nil {
return nil, err
}
closers = append(closers, tr)
// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(logger, dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}
pb = &Block{
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
numBytesChunks: cr.Size(),
numBytesIndex: ir.Size(),
numBytesTombstone: sizeTomb,
numBytesMeta: sizeMeta,
}
return pb, nil
}
func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
@ -390,7 +379,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
func (pb *Block) Size() int64 {
return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta
}
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
@ -437,7 +428,12 @@ func (pb *Block) GetSymbolTableSize() uint64 {
func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
n, err := writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
return nil
}
type blockIndexReader struct {
@ -561,10 +557,17 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()
if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones)
if err != nil {
return err
}
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
pb.numBytesTombstone = n
n, err = writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
return nil
}
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).

View File

@ -26,6 +26,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
)
@ -40,9 +41,10 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()
testutil.Ok(t, writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{}))
_, err = writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{})
testutil.Ok(t, err)
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
testutil.Ok(t, err)
testutil.Assert(t, meta.Version != 2, "meta.json version must never be 2")
}
@ -149,6 +151,60 @@ func TestCorruptedChunk(t *testing.T) {
}
}
// TestBlockSize ensures that the block size is calculated correctly.
func TestBlockSize(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_blockSize")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
var (
blockInit *Block
expSizeInit int64
blockDirInit string
)
// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockInit.Close())
}()
expSizeInit = blockInit.Size()
actSizeInit, err := testutil.DirSize(blockInit.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expSizeInit, actSizeInit)
}
// Delete some series and check the sizes again.
{
testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*")))
expAfterDelete := blockInit.Size()
testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
actAfterDelete, err := testutil.DirSize(blockDirInit)
testutil.Ok(t, err)
testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil)
testutil.Ok(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
testutil.Ok(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockAfterCompact.Close())
}()
expAfterCompact := blockAfterCompact.Size()
actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir())
testutil.Ok(t, err)
testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact)
testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size")
}
}
// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head := createHead(tb, series)

View File

@ -178,7 +178,7 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
var dms []dirMeta
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
return nil, err
}
@ -380,7 +380,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
start := time.Now()
for _, d := range dirs {
meta, err := readMetaFile(d)
meta, _, err := readMetaFile(d)
if err != nil {
return uid, err
}
@ -420,12 +420,14 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil {
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
if err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
b.numBytesMeta = n
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
@ -600,12 +602,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return nil
}
if err = writeMetaFile(c.logger, tmp, meta); err != nil {
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file.
if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

2
db.go
View File

@ -629,7 +629,7 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue

View File

@ -100,9 +100,6 @@ func TestDB_reloadOrder(t *testing.T) {
testutil.Ok(t, db.reload())
blocks := db.Blocks()
for _, b := range blocks {
b.meta.Stats.NumBytes = 0
}
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
@ -1060,7 +1057,8 @@ func TestSizeRetention(t *testing.T) {
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
actSize := dbDiskSize(db.Dir())
actSize, err := testutil.DirSize(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
// Decrease the max bytes limit so that a delete is triggered.
@ -1074,7 +1072,8 @@ func TestSizeRetention(t *testing.T) {
actBlocks := db.Blocks()
expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
actSize = dbDiskSize(db.Dir())
actSize, err = testutil.DirSize(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
@ -1085,20 +1084,6 @@ func TestSizeRetention(t *testing.T) {
}
func dbDiskSize(dir string) int64 {
var statSize int64
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == tombstoneFilename {
statSize += info.Size()
}
return nil
})
return statSize
}
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
db, delete := openTestDB(t, nil)
defer func() {

View File

@ -109,7 +109,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
}
// Reset version of meta.json to 1.
meta.Version = 1
if err := writeMetaFile(logger, d, meta); err != nil {
if _, err := writeMetaFile(logger, d, meta); err != nil {
return wrapErr(err, d)
}
}

View File

@ -65,7 +65,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
// Check the current db.
// In its current state, lookups should fail with the fixed code.
_, err := readMetaFile(dbDir)
_, _, err := readMetaFile(dbDir)
testutil.NotOk(t, err)
// Touch chunks dir in block.
@ -121,7 +121,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
{{"a", "2"}, {"b", "1"}},
}, res)
meta, err := readMetaFile(tmpDbDir)
meta, _, err := readMetaFile(tmpDbDir)
testutil.Ok(t, err)
testutil.Assert(t, meta.Version == 1, "unexpected meta version %d", meta.Version)
}

View File

@ -54,14 +54,15 @@ type TombstoneReader interface {
Close() error
}
func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error {
func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) {
path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp"
hash := newCRC32()
var size int
f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}
defer func() {
if f != nil {
@ -79,10 +80,11 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
// Write the meta.
buf.PutBE32(MagicTombstone)
buf.PutByte(tombstoneFormatV1)
_, err = f.Write(buf.Get())
n, err := f.Write(buf.Get())
if err != nil {
return err
return 0, err
}
size += n
mw := io.MultiWriter(f, hash)
@ -94,32 +96,34 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
buf.PutVarint64(iv.Mint)
buf.PutVarint64(iv.Maxt)
_, err = mw.Write(buf.Get())
n, err = mw.Write(buf.Get())
if err != nil {
return err
}
size += n
}
return nil
}); err != nil {
return fmt.Errorf("error writing tombstones: %v", err)
return 0, fmt.Errorf("error writing tombstones: %v", err)
}
_, err = f.Write(hash.Sum(nil))
n, err = f.Write(hash.Sum(nil))
if err != nil {
return err
return 0, err
}
size += n
var merr tsdb_errors.MultiError
if merr.Add(f.Sync()); merr.Err() != nil {
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
if err = f.Close(); err != nil {
return err
return 0, err
}
f = nil
return fileutil.Replace(tmp, path)
return int64(size), fileutil.Replace(tmp, path)
}
// Stone holds the information on the posting and time-range
@ -129,41 +133,37 @@ type Stone struct {
intervals Intervals
}
func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
func readTombstones(dir string) (TombstoneReader, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), nil, nil
return newMemTombstones(), 0, nil
} else if err != nil {
return nil, nil, err
}
sr := &TombstoneFile{
size: int64(len(b)),
return nil, 0, err
}
if len(b) < 5 {
return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header")
return nil, 0, errors.Wrap(encoding.ErrInvalidSize, "tombstones header")
}
d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum.
if mg := d.Be32(); mg != MagicTombstone {
return nil, sr, fmt.Errorf("invalid magic number %x", mg)
return nil, 0, fmt.Errorf("invalid magic number %x", mg)
}
if flag := d.Byte(); flag != tombstoneFormatV1 {
return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
return nil, 0, fmt.Errorf("invalid tombstone format %x", flag)
}
if d.Err() != nil {
return nil, sr, d.Err()
return nil, 0, d.Err()
}
// Verify checksum.
hash := newCRC32()
if _, err := hash.Write(d.Get()); err != nil {
return nil, sr, errors.Wrap(err, "write to hash")
return nil, 0, errors.Wrap(err, "write to hash")
}
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, sr, errors.New("checksum did not match")
return nil, 0, errors.New("checksum did not match")
}
stonesMap := newMemTombstones()
@ -173,13 +173,13 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
mint := d.Varint64()
maxt := d.Varint64()
if d.Err() != nil {
return nil, sr, d.Err()
return nil, 0, d.Err()
}
stonesMap.addInterval(k, Interval{mint, maxt})
}
return stonesMap, sr, nil
return stonesMap, int64(len(b)), nil
}
type memTombstones struct {
@ -230,16 +230,6 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}
// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
}
// Size returns the tombstone file size.
func (t *TombstoneFile) Size() int64 {
return t.size
}
func (*memTombstones) Close() error {
return nil
}

View File

@ -47,7 +47,8 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
stones.addInterval(ref, dranges...)
}
testutil.Ok(t, writeTombstoneFile(log.NewNopLogger(), tmpdir, stones))
_, err := writeTombstoneFile(log.NewNopLogger(), tmpdir, stones)
testutil.Ok(t, err)
restr, _, err := readTombstones(tmpdir)
testutil.Ok(t, err)