mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 17:13:22 +00:00
Merge pull request #8147 from roidelapluie/duc
Calculate head chunk size based on actual disk usage (#8139)
This commit is contained in:
commit
f3da817de9
@ -104,10 +104,6 @@ type ChunkDiskMapper struct {
|
||||
// from which chunks are served till they are flushed and are ready for m-mapping.
|
||||
chunkBuffer *chunkBuffer
|
||||
|
||||
// The total size of bytes in the closed files.
|
||||
// Needed to calculate the total size of all segments on disk.
|
||||
size atomic.Int64
|
||||
|
||||
// If 'true', it indicated that the maxt of all the on-disk files were set
|
||||
// after iterating through all the chunks in those files.
|
||||
fileMaxtSet bool
|
||||
@ -178,8 +174,6 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||
chkFileIndices = append(chkFileIndices, seq)
|
||||
}
|
||||
|
||||
cdm.size.Store(int64(0))
|
||||
|
||||
// Check for gaps in the files.
|
||||
sort.Ints(chkFileIndices)
|
||||
if len(chkFileIndices) == 0 {
|
||||
@ -206,8 +200,6 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
|
||||
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
|
||||
}
|
||||
|
||||
cdm.size.Add(int64(b.byteSlice.Len()))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -340,7 +332,6 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||
}
|
||||
}()
|
||||
|
||||
cdm.size.Add(cdm.curFileSize())
|
||||
cdm.curFileNumBytes.Store(int64(n))
|
||||
|
||||
if cdm.curFile != nil {
|
||||
@ -696,7 +687,6 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
||||
cdm.readPathMtx.Unlock()
|
||||
return err
|
||||
}
|
||||
cdm.size.Sub(int64(cdm.mmappedChunkFiles[seq].byteSlice.Len()))
|
||||
delete(cdm.mmappedChunkFiles, seq)
|
||||
delete(cdm.closers, seq)
|
||||
}
|
||||
@ -735,8 +725,8 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
|
||||
}
|
||||
|
||||
// Size returns the size of the chunk files.
|
||||
func (cdm *ChunkDiskMapper) Size() int64 {
|
||||
return cdm.size.Load() + cdm.curFileSize()
|
||||
func (cdm *ChunkDiskMapper) Size() (int64, error) {
|
||||
return fileutil.DirSize(cdm.dir.Name())
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) curFileSize() int64 {
|
||||
|
@ -1104,7 +1104,7 @@ func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc
|
||||
deletable = make(map[ulid.ULID]struct{})
|
||||
|
||||
walSize, _ := db.Head().wal.Size()
|
||||
headChunksSize := db.Head().chunkDiskMapper.Size()
|
||||
headChunksSize, _ := db.Head().chunkDiskMapper.Size()
|
||||
// Initializing size counter with WAL size and Head chunks
|
||||
// written to disk, as that is part of the retention strategy.
|
||||
blocksSize := walSize + headChunksSize
|
||||
|
@ -1235,7 +1235,7 @@ func TestSizeRetention(t *testing.T) {
|
||||
// Add some data to the WAL.
|
||||
headApp := db.Head().Appender(context.Background())
|
||||
for _, m := range headBlocks {
|
||||
series := genSeries(100, 10, m.MinTime, m.MaxTime)
|
||||
series := genSeries(100, 10, m.MinTime, m.MaxTime+1)
|
||||
for _, s := range series {
|
||||
it := s.Iterator()
|
||||
for it.Next() {
|
||||
@ -1254,8 +1254,12 @@ func TestSizeRetention(t *testing.T) {
|
||||
blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
|
||||
walSize, err := db.Head().wal.Size()
|
||||
testutil.Ok(t, err)
|
||||
// Expected size should take into account block size + WAL size
|
||||
expSize := blockSize + walSize
|
||||
cdmSize, err := db.Head().chunkDiskMapper.Size()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize)
|
||||
// Expected size should take into account block size + WAL size + Head
|
||||
// chunks size
|
||||
expSize := blockSize + walSize + cdmSize
|
||||
actSize, err := fileutil.DirSize(db.Dir())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
|
||||
@ -1268,7 +1272,20 @@ func TestSizeRetention(t *testing.T) {
|
||||
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
|
||||
walSize, err = db.Head().wal.Size()
|
||||
testutil.Ok(t, err)
|
||||
expSize = blockSize + walSize
|
||||
cdmSize, err = db.Head().chunkDiskMapper.Size()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize)
|
||||
expSize = blockSize + walSize + cdmSize
|
||||
actSize, err = fileutil.DirSize(db.Dir())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
|
||||
|
||||
// Truncate Chunk Disk Mapper and compare sizes.
|
||||
testutil.Ok(t, db.Head().chunkDiskMapper.Truncate(900))
|
||||
cdmSize, err = db.Head().chunkDiskMapper.Size()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize)
|
||||
expSize = blockSize + walSize + cdmSize
|
||||
actSize, err = fileutil.DirSize(db.Dir())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
|
||||
@ -1285,8 +1302,11 @@ func TestSizeRetention(t *testing.T) {
|
||||
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
|
||||
walSize, err = db.Head().wal.Size()
|
||||
testutil.Ok(t, err)
|
||||
cdmSize, err = db.Head().chunkDiskMapper.Size()
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, cdmSize > 0, "cdmSize is not greater than 0, value is %d", cdmSize)
|
||||
// Expected size should take into account block size + WAL size
|
||||
expSize = blockSize + walSize
|
||||
expSize = blockSize + walSize + cdmSize
|
||||
actRetentionCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
|
||||
actSize, err = fileutil.DirSize(db.Dir())
|
||||
testutil.Ok(t, err)
|
||||
|
Loading…
Reference in New Issue
Block a user