[ENHANCEMENT] TSDB: Check CRC without allocating (#13742)

Use the existing utility function which does this.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-03-12 11:24:27 +00:00 committed by GitHub
parent ca57bd6352
commit d08f054950
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 3 additions and 18 deletions

View File

@ -15,7 +15,6 @@ package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
@ -690,7 +689,6 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
chkCRC32 := newCRC32()
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm.curFileSequence {
@ -755,20 +753,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
if err := checkCRC32(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd), sum); err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
@ -802,8 +793,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
cdm.fileMaxtSet = true
}()
chkCRC32 := newCRC32()
// Iterate files in ascending order.
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
for seg := range cdm.mmappedChunkFiles {
@ -838,7 +827,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
}
}
chkCRC32.Reset()
chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
startIdx := idx
@ -877,14 +865,11 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
// Check CRC.
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
return err
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
if err := checkCRC32(mmapFile.byteSlice.Range(startIdx, idx), sum); err != nil {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
Err: err,
}
}
idx += CRCSize