Added storage size based retention method and new metrics (#343)

Added methods needed to retain data based on a byte limitation rather than time. Limitation is only applied if the flag is set (defaults to 0). Both blocks that are older than the retention period and the blocks that make the size of the storage too large are removed.

2 new metrics for keeping track of the size of the local storage folder and the amount of times data has been deleted because the size restriction was exceeded.
Signed-off-by: Mark Knapp <mknapp@hudson-trading.com>
This commit is contained in:
mknapphrt 2019-01-16 05:03:52 -05:00 committed by Krasi Georgiev
parent bff5aa4d21
commit ebf5d74325
11 changed files with 360 additions and 187 deletions

View File

@ -1,5 +1,9 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.

View File

@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}
// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}
// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}
// BlockDesc describes a block by ULID and time range.
@ -257,7 +266,10 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}
tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}
// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}
pb := &Block{
dir: dir,
meta: *meta,
@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}
func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

View File

@ -46,14 +46,14 @@ func TestSetCompactionFailed(t *testing.T) {
defer os.RemoveAll(tmpdir)
blockDir := createBlock(t, tmpdir, 0, 0, 0)
b, err := OpenBlock(blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
b, err = OpenBlock(blockDir, nil)
b, err = OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())

View File

@ -285,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a SeriesReader for a serialized byte stream
// of series data.
type Reader struct {
// The underlying bytes holding the encoded series data.
bs []ByteSlice
// Closers for resources behind the byte slices.
cs []io.Closer
bs []ByteSlice // The underlying bytes holding the encoded series data.
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs {
if b.Len() < 4 {
@ -305,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m)
}
totalSize += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}
@ -328,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
pool = chunkenc.NewPool()
}
var bs []ByteSlice
var cs []io.Closer
var (
bs []ByteSlice
cs []io.Closer
)
for _, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
@ -346,6 +347,11 @@ func (s *Reader) Close() error {
return closeAll(s.cs...)
}
// Size returns the size of the chunks.
func (s *Reader) Size() int64 {
return s.size
}
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var (
seq = int(ref >> 32)

View File

@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil {
return uid, err
}

286
db.go
View File

@ -58,6 +58,13 @@ type Options struct {
// Duration of persisted data to keep.
RetentionDuration uint64
// Maximum number of bytes in blocks to be retained.
// 0 or less means disabled.
// NOTE: For proper storage calculations need to consider
// the size of the WAL folder which is not added when calculating
// the current size of the database.
MaxBytes int64
// The sizes of the Blocks.
BlockRanges []int64
@ -127,11 +134,12 @@ type dbMetrics struct {
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
timeRetentionCount prometheus.Counter
compactionsSkipped prometheus.Counter
cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
sizeRetentionCount prometheus.Counter
}
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_time_retentions_total",
Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.",
})
m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_skipped_total",
Help: "Total number of skipped compactions due to disabled auto compaction.",
})
m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_total",
Help: "Number of times the database cut off block data from disk.",
})
m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.",
})
m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp",
Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.",
@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
})
m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_storage_blocks_bytes_total",
Help: "The number of bytes that are currently used for local storage by all blocks.",
})
m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_size_retentions_total",
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
if r != nil {
r.MustRegister(
@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.symbolTableSize,
m.reloads,
m.reloadsFailed,
m.cutoffs,
m.cutoffsFailed,
m.timeRetentionCount,
m.compactionsTriggered,
m.startTime,
m.tombCleanTimer,
m.blocksBytes,
m.sizeRetentionCount,
)
}
return m
@ -340,25 +353,6 @@ func (db *DB) run() {
}
}
func (db *DB) beyondRetention(meta *BlockMeta) bool {
if db.opts.RetentionDuration == 0 {
return false
}
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
if len(blocks) == 0 {
return false
}
last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return meta.MaxTime < mint
}
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
@ -474,8 +468,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
return nil, false
}
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
// a list of block directories which should be deleted during reload.
// reload blocks and trigger head truncation if new blocks appeared.
// Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {
defer func() {
@ -485,112 +478,187 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
dirs, err := blockDirs(db.dir)
loadable, corrupted, err := db.openBlocks()
if err != nil {
return errors.Wrap(err, "find blocks")
return err
}
// We delete old blocks that have been superseded by new ones by gathering all parents
// from existing blocks. Those parents all have newer replacements and can be safely deleted
// after we loaded the other blocks.
// This makes us resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
// blocks with their parents, we can pick up the deletion where it left off during a crash.
var (
blocks []*Block
corrupted = map[ulid.ULID]error{}
opened = map[ulid.ULID]struct{}{}
deleteable = map[ulid.ULID]struct{}{}
)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
// The block was potentially in the middle of being deleted during a crash.
// Skip it since we may delete it properly further down again.
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
ulid, err2 := ulid.Parse(filepath.Base(dir))
if err2 != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
corrupted[ulid] = err
continue
}
if db.beyondRetention(meta) {
deleteable[meta.ULID] = struct{}{}
continue
}
for _, b := range meta.Compaction.Parents {
deleteable[b.ULID] = struct{}{}
deletable := db.deletableBlocks(loadable)
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
deletable[b.ULID] = nil
}
}
// Blocks we failed to open should all be those we are want to delete anyway.
for c, err := range corrupted {
if _, ok := deleteable[c]; !ok {
return errors.Wrapf(err, "unexpected corrupted block %s", c)
}
if len(corrupted) > 0 {
return errors.Wrap(err, "unexpected corrupted block")
}
// Load new blocks into memory.
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return errors.Wrapf(err, "read meta information %s", dir)
}
// Don't load blocks that are scheduled for deletion.
if _, ok := deleteable[meta.ULID]; ok {
// All deletable blocks should not be loaded.
var (
bb []*Block
blocksSize int64
)
for _, block := range loadable {
if _, ok := deletable[block.Meta().ULID]; ok {
deletable[block.Meta().ULID] = block
continue
}
// See if we already have the block in memory or open it otherwise.
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = OpenBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
}
}
blocks = append(blocks, b)
opened[meta.ULID] = struct{}{}
bb = append(bb, block)
blocksSize += block.Size()
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
loadable = bb
db.metrics.blocksBytes.Set(float64(blocksSize))
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime
})
if err := validateBlockSequence(blocks); err != nil {
if err := validateBlockSequence(loadable); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks
db.blocks = loadable
db.mtx.Unlock()
// Drop old blocks from memory.
for _, b := range oldBlocks {
if _, ok := opened[b.Meta().ULID]; ok {
continue
}
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
if _, ok := deletable[b.Meta().ULID]; ok {
deletable[b.Meta().ULID] = b
}
}
// Delete all obsolete blocks. None of them are opened any longer.
for ulid := range deleteable {
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
if err := db.deleteBlocks(deletable); err != nil {
return err
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
if len(loadable) == 0 {
return nil
}
maxt := blocks[len(blocks)-1].Meta().MaxTime
maxt := loadable[len(loadable)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
dirs, err := blockDirs(db.dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
}
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
// See if we already have the block in memory or open it otherwise.
block, ok := db.getBlock(meta.ULID)
if !ok {
block, err = OpenBlock(db.logger, dir, db.chunkPool)
if err != nil {
corrupted[meta.ULID] = err
continue
}
}
blocks = append(blocks, block)
}
return blocks, corrupted, nil
}
// deletableBlocks returns all blocks past retention policy.
func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
deletable := make(map[ulid.ULID]*Block)
// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime
})
for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
}
for ulid, block := range db.beyondSizeRetention(blocks) {
deletable[ulid] = block
}
return deletable
}
func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Time retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deleteable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) {
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.timeRetentionCount.Inc()
break
}
}
return deleteable
}
func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Size retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
blocksSize := int64(0)
for i, block := range blocks {
blocksSize += block.Size()
if blocksSize > db.opts.MaxBytes {
// Add this and all following blocks for deletion.
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.sizeRetentionCount.Inc()
break
}
}
return deleteable
}
// deleteBlocks closes and deletes blocks from the disk.
// When the map contains a non nil block object it means it is loaded in memory
// so needs to be closed first as it might need to wait for pending readers to complete.
func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
for ulid, block := range blocks {
if block != nil {
if err := block.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
}
}
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
}
return nil
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {

View File

@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) {
testutil.Ok(t, db.reload())
blocks := db.Blocks()
for _, b := range blocks {
b.meta.Stats.NumBytes = 0
}
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
@ -834,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) {
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 0, 0, 0)
block, err := OpenBlock(blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
tomb := newMemTombstones()
@ -877,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}
block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil)
testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
@ -901,59 +904,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block)
}
func TestDB_Retention(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
app := db.Appender()
_, err := app.Add(lbls, 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
// create snapshot to make it create a block.
// TODO(gouthamve): Add a method to compact headblock.
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))
app = db.Appender()
_, err = app.Add(lbls, 100, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
// Snapshot again to create another block.
snap, err = ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, &Options{
RetentionDuration: 10,
BlockRanges: []int64{50},
func TestTimeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{1000},
})
testutil.Ok(t, err)
defer close()
defer db.Close()
testutil.Equals(t, 2, len(db.blocks))
blocks := []*BlockMeta{
{MinTime: 500, MaxTime: 900}, // Oldest block
{MinTime: 1000, MaxTime: 1500},
{MinTime: 1500, MaxTime: 2000}, // Newest Block
}
// Reload blocks, which should drop blocks beyond the retention boundary.
for _, m := range blocks {
createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime)
}
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime)
testutil.Ok(t, db.reload())
testutil.Equals(t, 1, len(db.blocks))
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
expBlocks := blocks[1:]
actBlocks := db.Blocks()
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch")
testutil.Equals(t, len(expBlocks), len(actBlocks))
testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime)
testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime)
}
func TestSizeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
blocks := []*BlockMeta{
{MinTime: 100, MaxTime: 200}, // Oldest block
{MinTime: 200, MaxTime: 300},
{MinTime: 300, MaxTime: 400},
{MinTime: 400, MaxTime: 500},
{MinTime: 500, MaxTime: 600}, // Newest Block
}
for _, m := range blocks {
createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime)
}
// Test that registered size matches the actual disk size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
actSize := dbDiskSize(db.Dir())
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
// Decrease the max bytes limit so that a delete is triggered.
// Check total size, total count and check that the oldest block was deleted.
firstBlockSize := db.Blocks()[0].Size()
sizeLimit := actSize - firstBlockSize
db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
expBlocks := blocks[1:]
actBlocks := db.Blocks()
expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
actSize = dbDiskSize(db.Dir())
testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit)
testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1)
testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block")
testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block")
}
func dbDiskSize(dir string) int64 {
var statSize int64
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == tombstoneFilename {
statSize += info.Size()
}
return nil
})
return statSize
}
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {

View File

@ -914,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings {
return p
}
// Size returns the size of an index file.
func (r *Reader) Size() int64 {
return int64(r.b.Len())
}
// LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{}, len(r.labels))

View File

@ -1227,12 +1227,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
func BenchmarkPersistedQueries(b *testing.B) {
for _, nSeries := range []int{10, 100} {
for _, nSamples := range []int{1000, 10000, 100000} {
for _, nSamples := range []int64{1000, 10000, 100000} {
b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil)
block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil)
testutil.Ok(b, err)
defer block.Close()

View File

@ -113,37 +113,41 @@ type Stone struct {
intervals Intervals
}
func readTombstones(dir string) (TombstoneReader, error) {
func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), nil
return newMemTombstones(), nil, nil
} else if err != nil {
return nil, err
return nil, nil, err
}
sr := &TombstoneFile{
size: int64(len(b)),
}
if len(b) < 5 {
return nil, errors.Wrap(errInvalidSize, "tombstones header")
return nil, sr, errors.Wrap(errInvalidSize, "tombstones header")
}
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
if mg := d.be32(); mg != MagicTombstone {
return nil, fmt.Errorf("invalid magic number %x", mg)
return nil, sr, fmt.Errorf("invalid magic number %x", mg)
}
if flag := d.byte(); flag != tombstoneFormatV1 {
return nil, fmt.Errorf("invalid tombstone format %x", flag)
return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
}
if d.err() != nil {
return nil, d.err()
return nil, sr, d.err()
}
// Verify checksum.
hash := newCRC32()
if _, err := hash.Write(d.get()); err != nil {
return nil, errors.Wrap(err, "write to hash")
return nil, sr, errors.Wrap(err, "write to hash")
}
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, errors.New("checksum did not match")
return nil, sr, errors.New("checksum did not match")
}
stonesMap := newMemTombstones()
@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) {
mint := d.varint64()
maxt := d.varint64()
if d.err() != nil {
return nil, d.err()
return nil, sr, d.err()
}
stonesMap.addInterval(k, Interval{mint, maxt})
}
return stonesMap, nil
return stonesMap, sr, nil
}
type memTombstones struct {
@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}
// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
}
// Size returns the tombstone file size.
func (t *TombstoneFile) Size() int64 {
return t.size
}
func (*memTombstones) Close() error {
return nil
}

View File

@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
testutil.Ok(t, writeTombstoneFile(tmpdir, stones))
restr, err := readTombstones(tmpdir)
restr, _, err := readTombstones(tmpdir)
testutil.Ok(t, err)
// Compare the two readers.