diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 1a11924d1..9b332f971 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -190,27 +190,7 @@ func (w *Writer) cut() error { return err } - p, _, err := nextSequenceFile(w.dirFile.Name()) - if err != nil { - return err - } - f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err - } - if err = w.dirFile.Sync(); err != nil { - return err - } - - // Write header metadata for new file. - metab := make([]byte, SegmentHeaderSize) - binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) - metab[4] = chunksFormatV1 - - n, err := f.Write(metab) + n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize) if err != nil { return err } @@ -226,6 +206,36 @@ func (w *Writer) cut() error { return nil } +func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) { + p, seq, err := nextSequenceFile(dirFile.Name()) + if err != nil { + return 0, nil, 0, err + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return 0, nil, 0, err + } + if segmentSize > 0 { + if err = fileutil.Preallocate(f, segmentSize, true); err != nil { + return 0, nil, 0, err + } + } + if err = dirFile.Sync(); err != nil { + return 0, nil, 0, err + } + + // Write header metadata for new file. + metab := make([]byte, SegmentHeaderSize) + binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) + metab[4] = chunksFormat + + n, err := f.Write(metab) + if err != nil { + return 0, nil, 0, err + } + return n, f, seq, nil +} + func (w *Writer) write(b []byte) error { n, err := w.wbuf.Write(b) w.n += int64(n) @@ -464,8 +474,6 @@ type Reader struct { func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} - var totalSize int64 - for i, b := range cr.bs { if b.Len() < SegmentHeaderSize { return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i) @@ -479,9 +487,8 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { return nil, errors.Errorf("invalid chunk format version %d", v) } - totalSize += int64(b.Len()) + cr.size += int64(b.Len()) } - cr.size = totalSize return &cr, nil } @@ -594,9 +601,18 @@ func nextSequenceFile(dir string) (string, int, error) { if err != nil { continue } - i = j + // It is not necessary that we find the files in number order, + // for example with '1000000' and '200000', '1000000' would come first. + // Though this is a very very race case, we check anyway for the max id. + if j > i { + i = j + } } - return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil + return segmentFile(dir, int(i+1)), int(i + 1), nil +} + +func segmentFile(baseDir string, index int) string { + return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index)) } func sequenceFiles(dir string) ([]string, error) { @@ -605,7 +621,6 @@ func sequenceFiles(dir string) ([]string, error) { return nil, err } var res []string - for _, fi := range files { if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { continue