From bb5c6b38e2a4db0b363da187a4f5d9b3e7c0c687 Mon Sep 17 00:00:00 2001 From: Max Neverov <1296281+mneverov@users.noreply.github.com> Date: Wed, 26 Aug 2020 10:52:48 +0200 Subject: [PATCH] Fix Possible Race Condition in TSDB (#7815) * Replace tsdb chunk mapper size with atomic; protect mmappedChunkFiles with read path mutex on DeleteCorrupted Signed-off-by: Max Neverov * PR fixes Signed-off-by: Max Neverov --- tsdb/chunks/head_chunks.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index e211b23d5..f63b1ab60 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -106,7 +106,7 @@ type ChunkDiskMapper struct { // The total size of bytes in the closed files. // Needed to calculate the total size of all segments on disk. - size int64 + size atomic.Int64 // If 'true', it indicated that the maxt of all the on-disk files were set // after iterating through all the chunks in those files. @@ -178,7 +178,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { chkFileIndices = append(chkFileIndices, seq) } - cdm.size = 0 + cdm.size.Store(int64(0)) // Check for gaps in the files. sort.Ints(chkFileIndices) @@ -207,7 +207,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { return errors.Errorf("%s: invalid chunk format version %d", files[i], v) } - cdm.size += int64(b.byteSlice.Len()) + cdm.size.Add(int64(b.byteSlice.Len())) } return nil @@ -340,7 +340,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { } }() - cdm.size += cdm.curFileSize() + cdm.size.Add(cdm.curFileSize()) cdm.curFileNumBytes.Store(int64(n)) if cdm.curFile != nil { @@ -354,6 +354,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { return err } + cdm.readPathMtx.Lock() cdm.curFileSequence = seq cdm.curFile = newFile if cdm.chkWriter != nil { @@ -362,7 +363,6 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize) } - cdm.readPathMtx.Lock() cdm.closers[cdm.curFileSequence] = mmapFile cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} cdm.readPathMtx.Unlock() @@ -693,7 +693,7 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { cdm.readPathMtx.Unlock() return err } - cdm.size -= int64(cdm.mmappedChunkFiles[seq].byteSlice.Len()) + cdm.size.Sub(int64(cdm.mmappedChunkFiles[seq].byteSlice.Len())) delete(cdm.mmappedChunkFiles, seq) delete(cdm.closers, seq) } @@ -720,17 +720,20 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { // Delete all the head chunk files following the corrupt head chunk file. segs := []int{} + cdm.readPathMtx.RLock() for seg := range cdm.mmappedChunkFiles { if seg >= cerr.FileIndex { segs = append(segs, seg) } } + cdm.readPathMtx.RUnlock() + return cdm.deleteFiles(segs) } // Size returns the size of the chunk files. func (cdm *ChunkDiskMapper) Size() int64 { - return cdm.size + cdm.curFileSize() + return cdm.size.Load() + cdm.curFileSize() } func (cdm *ChunkDiskMapper) curFileSize() int64 {