diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 9b332f971..20b76ce8d 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -190,7 +190,7 @@ func (w *Writer) cut() error { return err } - n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize) + n, f, _, err := cutSegmentFile(w.dirFile, MagicChunks, chunksFormatV1, w.segmentSize) if err != nil { return err } @@ -206,7 +206,7 @@ func (w *Writer) cut() error { return nil } -func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) { +func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, err error) { p, seq, err := nextSequenceFile(dirFile.Name()) if err != nil { return 0, nil, 0, err @@ -215,8 +215,8 @@ func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (hea if err != nil { return 0, nil, 0, err } - if segmentSize > 0 { - if err = fileutil.Preallocate(f, segmentSize, true); err != nil { + if allocSize > 0 { + if err = fileutil.Preallocate(f, allocSize, true); err != nil { return 0, nil, 0, err } } @@ -226,7 +226,7 @@ func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (hea // Write header metadata for new file. metab := make([]byte, SegmentHeaderSize) - binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) + binary.BigEndian.PutUint32(metab[:MagicChunksSize], magicNumber) metab[4] = chunksFormat n, err := f.Write(metab) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go new file mode 100644 index 000000000..fd111c2df --- /dev/null +++ b/tsdb/chunks/head_chunks.go @@ -0,0 +1,779 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "bufio" + "bytes" + "encoding/binary" + "hash" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" +) + +// Head chunk file header fields constants. +const ( + // MagicHeadChunks is 4 bytes at the beginning of a head chunk file. + MagicHeadChunks = 0x0130BC91 + + headChunksFormatV1 = 1 + writeBufferSize = 4 * 1024 * 1024 // 4 MiB. +) + +var ( + // ErrChunkDiskMapperClosed returned by any method indicates + // that the ChunkDiskMapper was closed. + ErrChunkDiskMapperClosed = errors.New("ChunkDiskMapper closed") +) + +const ( + // DefaultHeadChunkFileMaxTimeRange is the default head chunk file time range. + // Assuming a general scrape interval of 15s, a chunk with 120 samples would + // be cut every 30m, so anything <30m will cause lots of empty files. And keeping + // it exactly 30m also has a chance of having empty files as its near that border. + // Hence keeping it a little more than 30m, i.e. 40m. + DefaultHeadChunkFileMaxTimeRange = 40 * int64(time.Minute/time.Millisecond) + // MintMaxtSize is the size of the mint/maxt for head chunk file and chunks. + MintMaxtSize = 8 + // SeriesRefSize is the size of series reference on disk. + SeriesRefSize = 8 + // HeadChunkFileHeaderSize is the total size of the header for the head chunk file. + HeadChunkFileHeaderSize = SegmentHeaderSize + // MaxHeadChunkFileSize is the max size of a head chunk file. + MaxHeadChunkFileSize = 512 * 1024 * 1024 // 512 MiB. + // CRCSize is the size of crc32 sum on disk. + CRCSize = 4 + // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. + // Max because the uvarint size can be smaller. + MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize +) + +// corruptionErr is an error that's returned when corruption is encountered. +type corruptionErr struct { + Dir string + FileIndex int + Err error +} + +func (e *corruptionErr) Error() string { + return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() +} + +// ChunkDiskMapper is for writing the Head block chunks to the disk +// and access chunks via mmapped file. +type ChunkDiskMapper struct { + /// Writer. + dir *os.File + + curFile *os.File // File being written to. + curFileSequence int // Index of current open file being appended to. + curFileMint int64 // Used to check for a chunk crossing the max file time range. + curFileMaxt int64 // Used for the size retention. + curFileNumBytes int64 // Bytes written in current open file. + maxFileTime int64 // Max time range (curFileMaxt-curFileMint) for a file. + + byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. + chkWriter *bufio.Writer // Writer for the current open file. + crc32 hash.Hash + writePathMtx sync.RWMutex + + /// Reader. + // The int key in the map is the file number on the disk. + mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. + closers map[int]io.Closer // Closers for resources behind the byte slices. + readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps. + pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk. + + // Writer and Reader. + // We flush chunks to disk in batches. Hence, we store them in this buffer + // from which chunks are served till they are flushed and are ready for m-mapping. + chunkBuffer *chunkBuffer + + // The total size of bytes in the closed files. + // Needed to calculate the total size of all segments on disk. + size int64 + + // If 'true', it indicated that the maxt of all the on-disk files were set + // after iterating through all the chunks in those files. + fileMaxtSet bool + + closed bool +} + +type mmappedChunkFile struct { + byteSlice ByteSlice + maxt int64 +} + +// NewChunkDiskMapper returns a new writer against the given directory +// using the default head chunk file duration. +// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper +// to set the maxt of all the file. +func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) { + return newChunkDiskMapper(dir, DefaultHeadChunkFileMaxTimeRange, pool) +} + +func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) (*ChunkDiskMapper, error) { + if maxFileDuration <= 0 { + maxFileDuration = DefaultHeadChunkFileMaxTimeRange + } + + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + dirFile, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + + m := &ChunkDiskMapper{ + dir: dirFile, + maxFileTime: maxFileDuration, + pool: pool, + crc32: newCRC32(), + chunkBuffer: newChunkBuffer(), + } + + if m.pool == nil { + m.pool = chunkenc.NewPool() + } + + return m, m.openMMapFiles() +} + +func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { + cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} + cdm.closers = map[int]io.Closer{} + defer func() { + if returnErr != nil { + var merr tsdb_errors.MultiError + merr.Add(returnErr) + merr.Add(closeAllFromMap(cdm.closers)) + returnErr = merr.Err() + + cdm.mmappedChunkFiles = nil + cdm.closers = nil + } + }() + + files, err := listChunkFiles(cdm.dir.Name()) + if err != nil { + return err + } + + chkFileIndices := make([]int, 0, len(files)) + for seq, fn := range files { + f, err := fileutil.OpenMmapFile(fn) + if err != nil { + return errors.Wrap(err, "mmap files") + } + cdm.closers[seq] = f + cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} + chkFileIndices = append(chkFileIndices, seq) + } + + cdm.size = 0 + + // Check for gaps in the files. + sort.Ints(chkFileIndices) + if len(chkFileIndices) == 0 { + return nil + } + lastSeq := chkFileIndices[0] + for _, seq := range chkFileIndices[1:] { + if seq != lastSeq+1 { + return errors.Errorf("found unsequential head chunk files %d and %d", lastSeq, seq) + } + lastSeq = seq + } + + for i, b := range cdm.mmappedChunkFiles { + if b.byteSlice.Len() < HeadChunkFileHeaderSize { + return errors.Wrapf(errInvalidSize, "invalid head chunk file header in file %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { + return errors.Errorf("invalid magic number %x", m) + } + + // Verify chunk format version. + if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { + return errors.Errorf("invalid chunk format version %d", v) + } + + cdm.size += int64(b.byteSlice.Len()) + } + + return nil +} + +func listChunkFiles(dir string) (map[int]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + res := map[int]string{} + for _, fi := range files { + seq, err := strconv.ParseUint(fi.Name(), 10, 64) + if err != nil { + continue + } + res[int(seq)] = filepath.Join(dir, fi.Name()) + } + return res, nil +} + +// WriteChunk writes the chunk to the disk. +// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. +func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + if cdm.closed { + return 0, ErrChunkDiskMapperClosed + } + + if cdm.shouldCutNewFile(len(chk.Bytes()), maxt) { + if err := cdm.cut(mint); err != nil { + return 0, err + } + } + + // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; + // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). + if len(chk.Bytes())+MaxHeadChunkMetaSize < writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { + if err := cdm.flushBuffer(); err != nil { + return 0, err + } + } + + cdm.crc32.Reset() + bytesWritten := 0 + + // The upper 4 bytes are for the head chunk file index and + // the lower 4 bytes are for the head chunk file offset where to start reading this chunk. + chkRef = chunkRef(uint64(cdm.curFileSequence), uint64(cdm.curFileNumBytes)) + + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], seriesRef) + bytesWritten += SeriesRefSize + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) + bytesWritten += MintMaxtSize + binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) + bytesWritten += MintMaxtSize + cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) + bytesWritten += ChunkEncodingSize + n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) + bytesWritten += n + + if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { + return 0, err + } + if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { + return 0, err + } + if err := cdm.writeCRC32(); err != nil { + return 0, err + } + + if maxt > cdm.curFileMaxt { + cdm.curFileMaxt = maxt + } + if mint < cdm.curFileMint { + cdm.curFileMint = mint + } + + cdm.chunkBuffer.put(chkRef, chk) + + if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize { + // The chunk was bigger than the buffer itself. + // Flushing to not keep partial chunks in buffer. + if err := cdm.flushBuffer(); err != nil { + return 0, err + } + } + + return chkRef, nil +} + +func chunkRef(seq, offset uint64) (chunkRef uint64) { + return (seq << 32) | offset +} + +// shouldCutNewFile decides the cutting of a new file based on time and size retention. +// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. +// Time retention: so that we can delete old chunks with some time guarantee in low load environments. +func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int, maxt int64) bool { + return cdm.curFileNumBytes == 0 || // First head chunk file. + (maxt-cdm.curFileMint > cdm.maxFileTime && cdm.curFileNumBytes > HeadChunkFileHeaderSize) || // Time duration reached for the existing file. + cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. +} + +func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) { + // Sync current tail to disk and close. + if err := cdm.finalizeCurFile(); err != nil { + return err + } + + n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, int64(MaxHeadChunkFileSize)) + if err != nil { + return err + } + defer func() { + // The file should not be closed if there is no error, + // its kept open in the ChunkDiskMapper. + if returnErr != nil { + var merr tsdb_errors.MultiError + merr.Add(returnErr) + merr.Add(newFile.Close()) + returnErr = merr.Err() + } + }() + + cdm.size += cdm.curFileNumBytes + atomic.StoreInt64(&cdm.curFileNumBytes, int64(n)) + + if cdm.curFile != nil { + cdm.readPathMtx.Lock() + cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt + cdm.readPathMtx.Unlock() + } + + mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), int(MaxHeadChunkFileSize)) + if err != nil { + return err + } + + cdm.curFileSequence = seq + cdm.curFileMint = mint + cdm.curFile = newFile + if cdm.chkWriter != nil { + cdm.chkWriter.Reset(newFile) + } else { + cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize) + } + + cdm.readPathMtx.Lock() + cdm.closers[cdm.curFileSequence] = mmapFile + cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} + cdm.readPathMtx.Unlock() + + cdm.curFileMaxt = 0 + + return nil +} + +// finalizeCurFile writes all pending data to the current tail file, +// truncates its size, and closes it. +func (cdm *ChunkDiskMapper) finalizeCurFile() error { + if cdm.curFile == nil { + return nil + } + + if err := cdm.flushBuffer(); err != nil { + return err + } + + if err := cdm.curFile.Sync(); err != nil { + return err + } + + return cdm.curFile.Close() +} + +func (cdm *ChunkDiskMapper) write(b []byte) error { + n, err := cdm.chkWriter.Write(b) + atomic.AddInt64(&cdm.curFileNumBytes, int64(n)) + return err +} + +func (cdm *ChunkDiskMapper) writeAndAppendToCRC32(b []byte) error { + if err := cdm.write(b); err != nil { + return err + } + _, err := cdm.crc32.Write(b) + return err +} + +func (cdm *ChunkDiskMapper) writeCRC32() error { + return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0])) +} + +// flushBuffer flushes the current in-memory chunks. +// Assumes that writePathMtx is _write_ locked before calling this method. +func (cdm *ChunkDiskMapper) flushBuffer() error { + if err := cdm.chkWriter.Flush(); err != nil { + return err + } + cdm.chunkBuffer.clear() + return nil +} + +// Chunk returns a chunk from a given reference. +// Note: The returned chunk will turn invalid after closing ChunkDiskMapper. +func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { + cdm.readPathMtx.RLock() + // We hold this read lock for the entire duration because if the Close() + // is called, the data in the byte slice will get corrupted as the mmapped + // file will be closed. + defer cdm.readPathMtx.RUnlock() + + var ( + // Get the upper 4 bytes. + // These contain the head chunk file index. + sgmIndex = int(ref >> 32) + // Get the lower 4 bytes. + // These contain the head chunk file offset where the chunk starts. + // We skip the series ref and the mint/maxt beforehand. + chkStart = int((ref<<32)>>32) + SeriesRefSize + (2 * MintMaxtSize) + chkCRC32 = newCRC32() + ) + + if cdm.closed { + return nil, ErrChunkDiskMapperClosed + } + + // If it is the current open file, then the chunks can be in the buffer too. + if sgmIndex == cdm.curFileSequence { + chunk := cdm.chunkBuffer.get(ref) + if chunk != nil { + return chunk, nil + } + } + + mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] + if !ok { + if sgmIndex > cdm.curFileSequence { + return nil, errors.Errorf("head chunk file index %d more than current open file", sgmIndex) + } + return nil, errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex) + } + + if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { + return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v, file:%d", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len(), sgmIndex) + } + + // Encoding. + chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] + + // Data length. + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + chkDataLenStart := chkStart + ChunkEncodingSize + c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) + chkDataLen, n := binary.Uvarint(c) + if n <= 0 { + return nil, errors.Errorf("reading chunk length failed with %d", n) + } + + // Verify the chunk data end. + chkDataEnd := chkDataLenStart + n + int(chkDataLen) + if chkDataEnd > mmapFile.byteSlice.Len() { + return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()) + } + + // Check the CRC. + sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) + if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { + return nil, err + } + if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { + return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) + } + + // The chunk data itself. + chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) + return cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData) +} + +// IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence +// and runs the provided function on each chunk. It returns on the first error encountered. +// NOTE: This method needs to be called at least once after creating ChunkDiskMapper +// to set the maxt of all the file. +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) { + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + + defer func() { + if err == nil { + cdm.fileMaxtSet = true + } + }() + + chkCRC32 := newCRC32() + + // Iterate files in ascending order. + segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) + for seg := range cdm.mmappedChunkFiles { + segIDs = append(segIDs, seg) + } + sort.Ints(segIDs) + for _, segID := range segIDs { + mmapFile := cdm.mmappedChunkFiles[segID] + fileEnd := mmapFile.byteSlice.Len() + if segID == cdm.curFileSequence { + fileEnd = int(cdm.curFileNumBytes) + } + idx := HeadChunkFileHeaderSize + for idx < fileEnd { + if fileEnd-idx < MaxHeadChunkMetaSize { + // Check for all 0s which marks the end of the file. + allZeros := true + for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) { + if b != byte(0) { + allZeros = false + break + } + } + if allZeros { + break + } + return &corruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), + } + } + chkCRC32.Reset() + chunkRef := chunkRef(uint64(segID), uint64(idx)) + + startIdx := idx + seriesRef := binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)) + idx += SeriesRefSize + mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) + idx += MintMaxtSize + maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) + idx += MintMaxtSize + + // We preallocate file to help with m-mapping (especially windows systems). + // As series ref always starts from 1, we assume it being 0 to be the end of the actual file data. + // We are not considering possible file corruption that can cause it to be 0. + // Additionally we are checking mint and maxt just to be sure. + if seriesRef == 0 && mint == 0 && maxt == 0 { + break + } + + idx += ChunkEncodingSize // Skip encoding. + dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) + idx += n + int(dataLen) // Skip the data. + + // In the beginning we only checked for the chunk meta size. + // Now that we have added the chunk data length, we check for sufficient bytes again. + if idx+CRCSize > fileEnd { + return &corruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), + } + } + + // Check CRC. + sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) + if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { + return err + } + if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { + return &corruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + } + } + idx += CRCSize + + if maxt > mmapFile.maxt { + mmapFile.maxt = maxt + } + + if err := f(seriesRef, chunkRef, mint, maxt); err != nil { + return err + } + } + + if idx > fileEnd { + // It should be equal to the slice length. + return &corruptionErr{ + Dir: cdm.dir.Name(), + FileIndex: segID, + Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), + } + } + } + + return nil +} + +// Truncate deletes the head chunk files which are strictly below the mint. +// mint should be in milliseconds. +func (cdm *ChunkDiskMapper) Truncate(mint int64) error { + if !cdm.fileMaxtSet { + return errors.New("maxt of the files are not set") + } + cdm.readPathMtx.RLock() + + // Sort the file indices, else if files deletion fails in between, + // it can lead to unsequential files as the map is not sorted. + chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles)) + for seq := range cdm.mmappedChunkFiles { + chkFileIndices = append(chkFileIndices, seq) + } + sort.Ints(chkFileIndices) + + var removedFiles []int + for _, seq := range chkFileIndices { + if seq == cdm.curFileSequence { + continue + } + if cdm.mmappedChunkFiles[seq].maxt < mint { + removedFiles = append(removedFiles, seq) + } + } + cdm.readPathMtx.RUnlock() + + return cdm.deleteFiles(removedFiles) +} + +func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { + cdm.readPathMtx.Lock() + for _, seq := range removedFiles { + if err := cdm.closers[seq].Close(); err != nil { + cdm.readPathMtx.Unlock() + return err + } + cdm.size -= int64(cdm.mmappedChunkFiles[seq].byteSlice.Len()) + delete(cdm.mmappedChunkFiles, seq) + delete(cdm.closers, seq) + } + cdm.readPathMtx.Unlock() + + // We actually delete the files separately to not block the readPathMtx for long. + for _, seq := range removedFiles { + if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { + return err + } + } + + return nil +} + +// Repair deletes all the head chunk files after the one which had the corruption +// (including the corrupt file). +func (cdm *ChunkDiskMapper) Repair(originalErr error) error { + err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. + cerr, ok := err.(*corruptionErr) + if !ok { + return errors.Wrap(originalErr, "cannot handle error") + } + + // Delete all the head chunk files following the corrupt head chunk file. + segs := []int{} + for seg := range cdm.mmappedChunkFiles { + if seg >= cerr.FileIndex { + segs = append(segs, seg) + } + } + return cdm.deleteFiles(segs) +} + +// Size returns the size of the chunk files. +func (cdm *ChunkDiskMapper) Size() int64 { + n := atomic.LoadInt64(&cdm.curFileNumBytes) + return cdm.size + n +} + +// Close closes all the open files in ChunkDiskMapper. +// It is not longer safe to access chunks from this struct after calling Close. +func (cdm *ChunkDiskMapper) Close() error { + // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. + // The lock order should not be reversed here else it can cause deadlocks. + cdm.writePathMtx.Lock() + defer cdm.writePathMtx.Unlock() + cdm.readPathMtx.Lock() + defer cdm.readPathMtx.Unlock() + + if cdm.closed { + return nil + } + cdm.closed = true + + var merr tsdb_errors.MultiError + merr.Add(closeAllFromMap(cdm.closers)) + merr.Add(cdm.finalizeCurFile()) + merr.Add(cdm.dir.Close()) + + cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} + cdm.closers = map[int]io.Closer{} + + return merr.Err() +} + +func closeAllFromMap(cs map[int]io.Closer) error { + var merr tsdb_errors.MultiError + for _, c := range cs { + merr.Add(c.Close()) + } + return merr.Err() +} + +const inBufferShards = 128 // 128 is a randomly chosen number. + +// chunkBuffer is a thread safe buffer for chunks. +type chunkBuffer struct { + inBufferChunks [inBufferShards]map[uint64]chunkenc.Chunk + inBufferChunksMtxs [inBufferShards]sync.RWMutex +} + +func newChunkBuffer() *chunkBuffer { + cb := &chunkBuffer{} + for i := 0; i < inBufferShards; i++ { + cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk) + } + return cb +} + +func (cb *chunkBuffer) put(ref uint64, chk chunkenc.Chunk) { + shardIdx := ref % inBufferShards + + cb.inBufferChunksMtxs[shardIdx].Lock() + cb.inBufferChunks[shardIdx][ref] = chk + cb.inBufferChunksMtxs[shardIdx].Unlock() +} + +func (cb *chunkBuffer) get(ref uint64) chunkenc.Chunk { + shardIdx := ref % inBufferShards + + cb.inBufferChunksMtxs[shardIdx].RLock() + defer cb.inBufferChunksMtxs[shardIdx].RUnlock() + + return cb.inBufferChunks[shardIdx][ref] +} + +func (cb *chunkBuffer) clear() { + for i := 0; i < inBufferShards; i++ { + cb.inBufferChunksMtxs[i].Lock() + cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk) + cb.inBufferChunksMtxs[i].Unlock() + } +} diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go new file mode 100644 index 000000000..fbea37aee --- /dev/null +++ b/tsdb/chunks/head_chunks_test.go @@ -0,0 +1,275 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "encoding/binary" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { + hrw, close := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + close() + }() + + expectedBytes := []byte{} + nextChunkOffset := uint64(HeadChunkFileHeaderSize) + chkCRC32 := newCRC32() + + type expectedDataType struct { + seriesRef, chunkRef uint64 + mint, maxt int64 + chunk chunkenc.Chunk + } + expectedData := []expectedDataType{} + + var buf [MaxHeadChunkMetaSize]byte + totalChunks := 0 + var firstFileName string + for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { + for i := 0; i < 100; i++ { + seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) + totalChunks++ + expectedData = append(expectedData, expectedDataType{ + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chunkRef: chkRef, + chunk: chunk, + }) + + if hrw.curFileSequence != 1 { + // We are checking for bytes written only for the first file. + continue + } + + // Calculating expected bytes written on disk for first file. + firstFileName = hrw.curFile.Name() + testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef) + + bytesWritten := 0 + chkCRC32.Reset() + + binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef) + bytesWritten += SeriesRefSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) + bytesWritten += MintMaxtSize + binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) + bytesWritten += MintMaxtSize + buf[bytesWritten] = byte(chunk.Encoding()) + bytesWritten += ChunkEncodingSize + n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) + bytesWritten += n + + expectedBytes = append(expectedBytes, buf[:bytesWritten]...) + _, err := chkCRC32.Write(buf[:bytesWritten]) + testutil.Ok(t, err) + expectedBytes = append(expectedBytes, chunk.Bytes()...) + _, err = chkCRC32.Write(chunk.Bytes()) + testutil.Ok(t, err) + + expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) + + // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. + nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize + } + } + + // Checking on-disk bytes for the first file. + testutil.Assert(t, len(hrw.mmappedChunkFiles) == 3 && len(hrw.closers) == 3, "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) + + actualBytes, err := ioutil.ReadFile(firstFileName) + testutil.Ok(t, err) + + // Check header of the segment file. + testutil.Equals(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) + testutil.Equals(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) + + // Remaining chunk data. + fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) + testutil.Equals(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) + + // Test for the next chunk header to be all 0s. That marks the end of the file. + for _, b := range actualBytes[fileEnd : fileEnd+MaxHeadChunkMetaSize] { + testutil.Equals(t, byte(0), b) + } + + // Testing reading of chunks. + for _, exp := range expectedData { + actChunk, err := hrw.Chunk(exp.chunkRef) + testutil.Ok(t, err) + testutil.Equals(t, exp.chunk.Bytes(), actChunk.Bytes()) + } + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + + idx := 0 + err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error { + t.Helper() + + expData := expectedData[idx] + testutil.Equals(t, expData.seriesRef, seriesRef) + testutil.Equals(t, expData.chunkRef, chunkRef) + testutil.Equals(t, expData.maxt, maxt) + testutil.Equals(t, expData.maxt, maxt) + + actChunk, err := hrw.Chunk(expData.chunkRef) + testutil.Ok(t, err) + testutil.Equals(t, expData.chunk.Bytes(), actChunk.Bytes()) + + idx++ + return nil + }) + testutil.Ok(t, err) + testutil.Equals(t, len(expectedData), idx) + +} + +func TestHeadReadWriter_Truncate(t *testing.T) { + hrw, close := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + close() + }() + + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") + + timeRange := 0 + fileTimeStep := 100 + totalFiles, after1stTruncation, after2ndTruncation := 7, 5, 3 + var timeToTruncate, timeToTruncateAfterRestart int64 + + cutFile := func(i int) { + testutil.Ok(t, hrw.cut(int64(timeRange))) + + mint := timeRange + 1 // Just after the the new file cut. + maxt := timeRange + fileTimeStep - 1 // Just before the next file. + + // Write a chunks to set maxt for the segment. + _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + testutil.Ok(t, err) + + if i == totalFiles-after1stTruncation+1 { + // Truncate the segment files before the 5th segment. + timeToTruncate = int64(mint) + } else if i == totalFiles-after2ndTruncation+1 { + // Truncate the segment files before the 3rd segment after restart. + timeToTruncateAfterRestart = int64(mint) + } + + timeRange += fileTimeStep + } + + // Cut segments. + for i := 1; i <= totalFiles; i++ { + cutFile(i) + } + + // Verifying the the remaining files. + verifyRemainingFiles := func(remainingFiles int) { + t.Helper() + + files, err := ioutil.ReadDir(hrw.dir.Name()) + testutil.Ok(t, err) + testutil.Equals(t, remainingFiles, len(files)) + testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles)) + testutil.Equals(t, remainingFiles, len(hrw.closers)) + + for i := 1; i <= totalFiles; i++ { + _, ok := hrw.mmappedChunkFiles[i] + if i < totalFiles-remainingFiles+1 { + testutil.Equals(t, false, ok) + } else { + testutil.Equals(t, true, ok) + } + } + } + + // Verify the number of segments. + verifyRemainingFiles(totalFiles) + + // Truncating files. + testutil.Ok(t, hrw.Truncate(timeToTruncate)) + verifyRemainingFiles(after1stTruncation) + + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + + // Restarted. + var err error + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") + + // Truncating files after restart. + testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart)) + verifyRemainingFiles(after2ndTruncation) + + // Add another file to have an active file. + totalFiles++ + cutFile(totalFiles) + // Truncating till current time should not delete the current active file. + testutil.Ok(t, hrw.Truncate(time.Now().UnixNano()/1e6)) + verifyRemainingFiles(1) +} + +func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { + tmpdir, err := ioutil.TempDir("", "data") + testutil.Ok(t, err) + hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool()) + testutil.Ok(t, err) + return hrw, func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + } +} + +func randomChunk(t *testing.T) chunkenc.Chunk { + chunk := chunkenc.NewXORChunk() + len := rand.Int() % 120 + app, err := chunk.Appender() + testutil.Ok(t, err) + for i := 0; i < len; i++ { + app.Append(rand.Int63(), rand.Float64()) + } + return chunk +} + +func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef uint64, mint, maxt int64, chunk chunkenc.Chunk) { + var err error + seriesRef = uint64(rand.Int63()) + mint = int64((idx)*1000 + 1) + maxt = int64((idx + 1) * 1000) + chunk = randomChunk(t) + chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk) + testutil.Ok(t, err) + return +} diff --git a/tsdb/docs/format/head_chunks.md b/tsdb/docs/format/head_chunks.md new file mode 100644 index 000000000..32657e804 --- /dev/null +++ b/tsdb/docs/format/head_chunks.md @@ -0,0 +1,36 @@ +# Head Chunks on Disk Format + +The following describes the format of a chunks file, +which is created in the `wal/chunks/` inside the data directory. + +Chunks in the files are referenced from the index by uint64 composed of +in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes). + +``` +┌──────────────────────────────┐ +│ magic(0x85BD40DD) <4 byte> │ +├──────────────────────────────┤ +│ version(1) <1 byte> │ +├──────────────────────────────┤ +│ padding(0) <3 byte> │ +├──────────────────────────────┤ +│ ┌──────────────────────────┐ │ +│ │ Chunk 1 │ │ +│ ├──────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────┤ │ +│ │ Chunk N │ │ +│ └──────────────────────────┘ │ +└──────────────────────────────┘ +``` + + +# Chunk + +Unlike chunks in the on-disk blocks, here we additionally store series reference that the chunks belongs to and the mint/maxt of the chunks. This is because we don't have an index associated with these chunks, hence these meta information are used while replaying the chunks. + +``` +┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐ +| series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len | data │ CRC32 <4 byte> │ +└─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘ +``` diff --git a/tsdb/fileutil/mmap.go b/tsdb/fileutil/mmap.go index 896b27789..5cb123810 100644 --- a/tsdb/fileutil/mmap.go +++ b/tsdb/fileutil/mmap.go @@ -28,11 +28,16 @@ func OpenMmapFile(path string) (*MmapFile, error) { return OpenMmapFileWithSize(path, 0) } -func OpenMmapFileWithSize(path string, size int) (*MmapFile, error) { +func OpenMmapFileWithSize(path string, size int) (mf *MmapFile, retErr error) { f, err := os.Open(path) if err != nil { return nil, errors.Wrap(err, "try lock file") } + defer func() { + if retErr != nil { + f.Close() + } + }() if size <= 0 { info, err := f.Stat() if err != nil {