Refactor tsdb/chunks/chunks.go for future PRs (#6754)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-02-05 19:09:40 +05:30 committed by GitHub
parent 56bf0ee4dc
commit 0a27df92f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 43 additions and 28 deletions

View File

@ -190,27 +190,7 @@ func (w *Writer) cut() error {
return err
}
p, _, err := nextSequenceFile(w.dirFile.Name())
if err != nil {
return err
}
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return err
}
if err = w.dirFile.Sync(); err != nil {
return err
}
// Write header metadata for new file.
metab := make([]byte, SegmentHeaderSize)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormatV1
n, err := f.Write(metab)
n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize)
if err != nil {
return err
}
@ -226,6 +206,36 @@ func (w *Writer) cut() error {
return nil
}
func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) {
p, seq, err := nextSequenceFile(dirFile.Name())
if err != nil {
return 0, nil, 0, err
}
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return 0, nil, 0, err
}
if segmentSize > 0 {
if err = fileutil.Preallocate(f, segmentSize, true); err != nil {
return 0, nil, 0, err
}
}
if err = dirFile.Sync(); err != nil {
return 0, nil, 0, err
}
// Write header metadata for new file.
metab := make([]byte, SegmentHeaderSize)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormat
n, err := f.Write(metab)
if err != nil {
return 0, nil, 0, err
}
return n, f, seq, nil
}
func (w *Writer) write(b []byte) error {
n, err := w.wbuf.Write(b)
w.n += int64(n)
@ -464,8 +474,6 @@ type Reader struct {
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs {
if b.Len() < SegmentHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
@ -479,9 +487,8 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return nil, errors.Errorf("invalid chunk format version %d", v)
}
totalSize += int64(b.Len())
cr.size += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}
@ -594,9 +601,18 @@ func nextSequenceFile(dir string) (string, int, error) {
if err != nil {
continue
}
// It is not necessary that we find the files in number order,
// for example with '1000000' and '200000', '1000000' would come first.
// Though this is a very very race case, we check anyway for the max id.
if j > i {
i = j
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
return segmentFile(dir, int(i+1)), int(i + 1), nil
}
func segmentFile(baseDir string, index int) string {
return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index))
}
func sequenceFiles(dir string) ([]string, error) {
@ -605,7 +621,6 @@ func sequenceFiles(dir string) ([]string, error) {
return nil, err
}
var res []string
for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue